Re: aggregating over triggered results

2019-11-01 Thread Robert Bradshaw
On Thu, Oct 31, 2019 at 8:48 PM Aaron Dixon  wrote:
>
> First of all thank you for taking the time on this very clear and helpful 
> message. Much appreciated.
>
> >I suppose one could avoid doing any pre-aggregation, and emit all of
> the events (with reified timestamp) in 60/30-day windows, then have a
> DoFn that filters on the events and computes each of the 10-minute
> aggregates over the "true" sliding window (4320 outputs). This could
> be cheaper if your events are very sparse, will be more expensive if
> they're very dense, and it's unclear what the tradeoff will be.
>
> This is exactly what I was doing (trying to do), reify the events and filter 
> them out to compute my own desired window for the trigger. I have lots of 
> events but each key has few events (in the thousands) but I think your point 
> is that even this is not a win, the events overall would have to be quite 
> sparse for it to be a win and by how much. So I can see why this is perhaps 
> not a great thread to pursue.
>
> On another note, trying to use *periodic* triggers like this in 
> *intermediate* pipeline stages and leverage them in downstream aggregations 
> was something I was trying to do here and in a few other cases. (I'm new to 
> Beam and triggers seemed fundamental so I expected to not get so lost trying 
> to use them this way.) But at least at this stage of my understanding I think 
> this was misplaced... periodic triggers seem primarily important say at the 
> last stage of a pipeline where you may be writing updates to an actual 
> sink/table.
>
> In other words suppose the above (60/30 day sliding) approach turned out to 
> be more efficient... I still have no idea if, using Beam, I'd be able to 
> properly regroup on the other side and pick out all the "latest triggered" 
> events from the rest... or even know when I've got them. This was the source 
> of my original question, but I'm now just thinking this is just not what 
> people do in Beam pipelines... periodically trigger windows _in the middle_ 
> of a pipeline. Am I on the right track in this thinking? If so, I wonder if 
> the API would better reflect this? If it's a doomed strategy to try to 
> periodically trigger 'into' downstream aggregations, why is the API so 
> friendly to doing just this?

Yes, see e.g. 
https://docs.google.com/document/d/17H2sBEtnoTSxjzlrz7rmKtX5E3F0mW1NpFQzWzSYOpY
. As an intermediate point (and stepping stone) we want to at least
have retractions:
https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE
. Triggering is an advanced, and somewhat thorny (and not as fleshed
out) concept (e.g. it introduced non-determinism). It's basically
trying to solve them problem of seeing versions of aggregations that
are not gated by the watermark (either early, before the watermark has
declared that you've seen all the data, or late, in case the watermark
was wrong (watermarks can be heuristic as perfect certainty might be
to slow/expensive)).

> On Wed, Oct 30, 2019 at 5:37 PM Robert Bradshaw  wrote:
>>
>> On Tue, Oct 29, 2019 at 7:01 PM Aaron Dixon  wrote:
>> >
>> > Thank you, Luke and Robert. Sorry for hitting dev@, I criss-crossed and 
>> > meant to hit user@, but as we're here could you clarify your two points, 
>> > however--
>>
>> No problem. This is veering into dev@ territory anyway :).
>>
>> > 1) I am under the impression that the 4,000 sliding windows approach (30 
>> > days every 10m) will re-evaluate my combine aggregation every 10m whereas 
>> > with the two-window approach my Combine aggregation would evolve 
>> > iteratively, only merging new results into the aggregation.
>> >
>> > If there's a cross-window optimization occurring that would allow 
>> > iterative combining _across windows_, given the substantial order of 
>> > magnitude difference in scale at play, is it safe to consider such 
>> > 'internal optimization detail' part of the platform contract (Dataflow's, 
>> > say)? Otherwise it would be hard to lean on this from a production system 
>> > that will live into the future.
>>
>> OK, let's first define exactly what (I think) you're trying to
>> compute. Let's label windows by their (upper) endpoint. So, every 10
>> minutes you have a window W_t and an aggregate Aggr(e for e in
>> all_events if t - 60days <= timestamp(e) < t).
>>
>> The way this is computed in Beam is by storing a map W_t ->
>> RunningAggregate and whenever we see an element with timestamp T we
>> assign it to the set of windows S = {W_t : T in W_t} (in this case
>

Re: Python SDK timestamp precision

2019-11-01 Thread Robert Bradshaw
On Fri, Nov 1, 2019 at 2:17 AM Jan Lukavský  wrote:
>
>  > Yes, this is the "minus epsilon" idea, but assigning this as a bit on
> the WindowedValue rather than on the Timestamp itself. This means that
> pulling the timestamp out then re-assigning it would be lossy. (As a
> basic example, imaging the batching DoFn that batches up elements (with
> their respective metadata), calls an external service on the full batch,
> and then emits the results (with the appropriate, cached, metadata).
>
> I'm not sure it I follow - I'd say that in the example the "limit bit"
> could be part of the metadata, so that if the external service returns
> response, the limiting bit could be be added back. Actually a PTransform
> for manipulating of this limit bit should be accessible by users,
> because it can be useful when reading data from external service (e.g.
> kafka).
>
> Yes, the current approach of subtracting 1 (millisecond, nanosecond,
> picosecond, whatever) ensures that we don't have to hold this metadata,
> because it is part of the timestamp. But we are not modeling the reality
> correctly. In reality the output for a window with range
> <02:00:00,03:00:00) (half-closed) cannot happen *before* 03:00:00. Yes,
> it's validity interval might depend on a precision -
> <02:00:00-02:59:59.999> in case of milliseconds
> <02:00:00-02:59:59.9> in case of nanoseconds - that is only due
> to numerical limitations. Neither of that is correct and that is what
> brings this timestamp precision into question. This only arises from the
> fact, that we cannot preserve causality of events with the same
> (numerical value of) timestamp correctly. That's why I'd say if would be
> better to actually fix how we treat the reality. Whether it would be by
> creating Beam's Timestamp with this ability, or adding this to
> WindowedValue along with the other metadata (timestamp, window, pane,
> ...) is an implementation detail. Still there is a question if we can
> actually do that, because that would affect output timestamps of user
> transforms (correct them actually, but even though it is a breaking change).

This is point (3) above--I think it would be very surprising if
extracting and then immediately setting the timestamp is anything but
a no-op. If native libraries (like Java Instant) had a notion of
minus-epsilon than I would want to use it.

> On 10/31/19 6:07 PM, Robert Bradshaw wrote:
> > On Thu, Oct 31, 2019 at 1:49 AM Jan Lukavský  wrote:
> >
> >>   > This is quite an interesting idea. In some sense, timestamps become
> >> like interval windows, and window assignment is similar to the window
> >> mapping fns that we use for side inputs. I still think the idea of a
> >> timestmap for an element and a window for an element is needed (e.g. one
> >> can have elements in a single window, especially the global window that
> >> have different timestamps), but this could be interesting to explore. It
> >> could definitely get rid of the "minus epsilon" weirdness, though I
> >> don't think it completely solves the granularity issues.
> >>
> >> Thinking about this a little more - maybe we actually don't need a time
> >> interval (not sure), it might be sufficient to actually add a single bit
> >> to the WindowedValue. That bit would be "the timestamp it PRECISELY this
> >> one" or "the timestamp is limiting value". This flag would have to
> >> propagate to timer settings (that is "set timer to exactly timestamp T"
> >> or "set timer as close to T as possible"). Then window timers at the end
> >> of window would set timers with this "limiting" setting (note that
> >> window.maxTimestamp() would have to change definition to be the first
> >> timestamp strictly greater than any timestamp that belongs to the window
> >> - it will actually be the first timestamp of new window with "limit"
> >> flag on). The impact on timers would be that events fired from @OnTimer
> >> would just propagate the flag to the WindowedValue being output.
> >>
> >> That way it seems to not matter how SDK internally handles time
> >> precision, as it would be transparent (at least seems to me). It is
> >> actually precisely what you proposed as "minus epsilon", only taken to
> >> the extreme. Looks useful to me and seems not that hard to implement.
> >> Although it would be of course a somewhat breaking change, because
> >> outputs of windows would become "3:00:00.000" instead of "2:59:59.999"
> >> (but I

Re: Rethinking the Flink Runner modes

2019-10-31 Thread Robert Bradshaw
Yes. If someone starts up the job server manually, they would have to
manually specify LOOPBACK if they want it. Python's FlinkRunner does
not use a pre-configured job server, it starts one up itself (making
the default scenario simple).

On Thu, Oct 31, 2019 at 10:19 AM Thomas Weise  wrote:
>
> True, but it would probably be a good tradeoff to make the default scenario 
> simple (just for FlinkRunner, not for PortableRunner). If someone configures 
> the job server with options, they probably also know how to control the 
> environment?
>
>
> On Thu, Oct 31, 2019 at 9:09 AM Maximilian Michels  wrote:
>>
>> > When the FlinkRunner (python client) sees flink_master as [auto] or 
>> > [local], then it could set the default environment to LOOPBACK before the 
>> > pipeline is constructed and provide the loopback environment. Isn't that 
>> > fully client side controlled?
>>
>> There is the case of a pre-configured job server with a master url. In
>> this case we probably do not want LOOPBACK execution because the job
>> server will submit the pipeline to a an actual cluster. If we make the
>> assumption that we do not allow that for the Python FlinkRunner, setting
>> LOOPBACK in this case seems fair.
>>
>> On 31.10.19 16:04, Thomas Weise wrote:
>> >
>> >
>> > On Thu, Oct 31, 2019 at 3:55 AM Maximilian Michels > > > wrote:
>> >
>> >  > Thanks for clarifying. So when I run "./flink my_pipeline.jar"  or
>> >  > upload the jar via the REST API (and its main method invoked on the
>> >  > master) then [auto] reads the config and does the right thing, but 
>> > if
>> >  > I do java my_pipeline.jar it'll run locally.
>> >
>> > Correct.
>> >
>> >  > Python needs to know even whether to start up the loopback
>> > server, and
>> >  > provide the address when submitting the pipeline.
>> >
>> > I was thinking, it could do this anyway and tear down that server if
>> > the
>> > Runner does not need it. Clearly not the ideal solution.
>> >
>> >
>> > When the FlinkRunner (python client) sees flink_master as [auto] or
>> > [local], then it could set the default environment to LOOPBACK before
>> > the pipeline is constructed and provide the loopback environment. Isn't
>> > that fully client side controlled?
>> >


Re: Python SDK timestamp precision

2019-10-31 Thread Robert Bradshaw
On Thu, Oct 31, 2019 at 1:49 AM Jan Lukavský  wrote:

>  > This is quite an interesting idea. In some sense, timestamps become
> like interval windows, and window assignment is similar to the window
> mapping fns that we use for side inputs. I still think the idea of a
> timestmap for an element and a window for an element is needed (e.g. one
> can have elements in a single window, especially the global window that
> have different timestamps), but this could be interesting to explore. It
> could definitely get rid of the "minus epsilon" weirdness, though I
> don't think it completely solves the granularity issues.
>
> Thinking about this a little more - maybe we actually don't need a time
> interval (not sure), it might be sufficient to actually add a single bit
> to the WindowedValue. That bit would be "the timestamp it PRECISELY this
> one" or "the timestamp is limiting value". This flag would have to
> propagate to timer settings (that is "set timer to exactly timestamp T"
> or "set timer as close to T as possible"). Then window timers at the end
> of window would set timers with this "limiting" setting (note that
> window.maxTimestamp() would have to change definition to be the first
> timestamp strictly greater than any timestamp that belongs to the window
> - it will actually be the first timestamp of new window with "limit"
> flag on). The impact on timers would be that events fired from @OnTimer
> would just propagate the flag to the WindowedValue being output.
>
> That way it seems to not matter how SDK internally handles time
> precision, as it would be transparent (at least seems to me). It is
> actually precisely what you proposed as "minus epsilon", only taken to
> the extreme. Looks useful to me and seems not that hard to implement.
> Although it would be of course a somewhat breaking change, because
> outputs of windows would become "3:00:00.000" instead of "2:59:59.999"
> (but I like the first one much more! :))

Yes, this is the "minus epsilon" idea, but assigning this as a bit on
the WindowedValue rather than on the Timestamp itself. This means that
pulling the timestamp out then re-assigning it would be lossy. (As a
basic example, imaging the batching DoFn that batches up elements
(with their respective metadata), calls an external service on the
full batch, and then emits the results (with the appropriate, cached,
metadata).

> On 10/30/19 10:32 PM, Robert Bradshaw wrote:
> > On Wed, Oct 30, 2019 at 2:00 AM Jan Lukavský  wrote:
> >> TL;DR - can we solve this by representing aggregations as not point-wise
> >> events in time, but time ranges? Explanation below.
> >>
> >> Hi,
> >>
> >> this is pretty interesting from a theoretical point of view. The
> >> question generally seems to be - having two events, can I reliably order
> >> them? One event might be end of one window and the other event might be
> >> start of another window. There is strictly required causality in this
> >> (although these events in fact have the same limiting timestamp). On the
> >> other hand, when I have two (random) point-wise events (one with
> >> timestamp T1 and the other with timestamp T2), then causality of these
> >> two depend on distance in space. If these two events differ by single
> >> nanosecond, then if they do not originate very "close" to each other,
> >> then there is high probability that different observers will see them in
> >> different order (and hence there is no causality possible and the order
> >> doesn't matter).
> >>
> >> That is to say - if I'm dealing with single external event (someone
> >> tells me something has happened at time T), then - around the boundaries
> >> of windows - it doesn't matter which window is this event placed into.
> >> There is no causal connection between start of window and the event.
> > I resolve this theoretical issue by working in a space with a single
> > time dimension and zero spatial dimensions. That is, the location of
> > an event is completely determined by a single coordinate in time, and
> > distance and causality are hence well defined for all possible
> > observers :). Realistically, there is both error and ambiguity in
> > assigning absolute timestamps to real-world events, but it's important
> > that this choice of ordering should be preserved (e.g. not compressed
> > away) by the system.
> >
> >> Different situation is when we actually perform an aggregation on
> >> multiple elements (GBK) - then although we produce output at 

Re: Rethinking the Flink Runner modes

2019-10-30 Thread Robert Bradshaw
On Wed, Oct 30, 2019 at 3:34 PM Maximilian Michels  wrote:
>
> > One thing I don't understand is what it means for "CLI or REST API
> > context [to be] present." Where does this context come from? A config
> > file in a standard location on the user's machine? Or is this
> > something that is only present when a user uploads a jar and then
> > Flink runs it in a specific context? Or both?
>
> When you upload a Jar to Flink, it can be run by the Flink master.
> Running a Jar on the job manager will just invoke the main method. The
> same happens when you use the Flink CLI tool, the only difference being
> that the Jar runs on the user machine vs on the Flink master. In both
> cases, the Flink config file will be present in a standard location,
> i.e. "/conf/flink_conf.yaml".
>
> What Flink users typically do, is to call this method to construct a
> Flink dataflow (a.k.a. job):
>env = ExecutionEnvironment.getExecutionEnvironment()
>env.addSource().flatMap()
>env.execute()
> This registers an environment which holds the Flink dataflow definition.
>
> The Flink Runner also does this when flink_master is set to "[auto]".
> When it is set to "[local]", it will attempt to execute the whole
> pipeline locally (LocalEnvironment). When an address has been specified
> it will submit against a remote Flink cluster (RemoteEnvironment). The
> last two use cases do not make any sense for the user when they use the
> Python FlinkRunner which uses the jar upload feature of Flink. That's
> why "[auto]" is our choice for the FlinkUberJarJobServer feature of the
> Python FlinkRunner.
>
> In the case of the FlinkJarJobServer to use "[auto]" can also make sense
> because you can even specify a Flink config directory for the Flink job
> server to use. Without a config, auto will always fall back to local
> execution.

Thanks for clarifying. So when I run "./flink my_pipeline.jar"  or
upload the jar via the REST API (and its main method invoked on the
master) then [auto] reads the config and does the right thing, but if
I do java my_pipeline.jar it'll run locally.

> > One more question: https://issues.apache.org/jira/browse/BEAM-8396
> > still seems valuable, but with [auto] as the default, how should we
> > detect whether LOOPBACK is safe to enable from Python?
>
> Yes, it is valuable. I suspect we only want to enable it for local
> execution?

Yes.

> We could let the actual Runner handle this by falling back to
> the default environment in case it detects that the execution will not
> be local. It can simply tell Python then to shutdown the loopback
> server, or it shuts itself down after a timeout.

Python needs to know even whether to start up the loopback server, and
provide the address when submitting the pipeline. If I understood
correctly above, the only time that the job server interprets [auto]
as something other than [local] is when creating the jar for later
submission. (In this case the flink master isn't even used, other than
being baked into the jar, right? And baking anything in but [auto]
seems wrong...) So it seems we could guard using LOOPBACK it on this
flag + [local] or [auto].

> Another option would
> be, to only support it when the mode is set to "[local]".

Well, I'd really like to support it by default...

> On 30.10.19 21:05, Robert Bradshaw wrote:
> > One more question: https://issues.apache.org/jira/browse/BEAM-8396
> > still seems valuable, but with [auto] as the default, how should we
> > detect whether LOOPBACK is safe to enable from Python?
> >
> > On Wed, Oct 30, 2019 at 11:53 AM Robert Bradshaw  
> > wrote:
> >>
> >> Sounds good to me.
> >>
> >> One thing I don't understand is what it means for "CLI or REST API
> >> context [to be] present." Where does this context come from? A config
> >> file in a standard location on the user's machine? Or is this
> >> something that is only present when a user uploads a jar and then
> >> Flink runs it in a specific context? Or both?
> >>
> >> On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels  wrote:
> >>>
> >>> tl;dr:
> >>>
> >>> - I see consensus for inferring "http://"; in Python to align it with the
> >>> Java behavior which currently requires leaving out the protocol scheme.
> >>> Optionally, Java could also accept a scheme which gets removed as
> >>> required by the Flink Java Rest client.
> >>>
> >>> - We won't support "https://"; in Python for now, because it requires
> >>> add

Re: aggregating over triggered results

2019-10-30 Thread Robert Bradshaw
 windowing/triggering strategies at
https://window-explorer.appspot.com/ (though unfortunately processing
time triggers are not represented).

> 2) When you say "regardless of the how the problem is structured" there are 
> 4,000 stored 'sub-aggregations', even in the two-window approach--why is that 
> so? Isn't the volume of panes produced by a trigger a function of what keys 
> have actually received new values *in the window*?

True, if most 10-minute intervals that have no event then there are
further optimizations one can do.

> Thanks for help in understanding these details. I want to make good use of 
> Beam and hope to contribute back at some point (docs/writing etc), once I can 
> come to terms with all of these pieces.
>
> On 2019/10/29 20:39:18, Robert Bradshaw  wrote:
> > No matter how the problem is structured, computing 30 day aggregations
> > for every 10 minute window requires storing at least 30day/10min =
> > ~4000 sub-aggregations. In Beam, the elements themselves are not
> > stored in every window, only the intermediate aggregates.
> >
> > I second Luke's suggestion to try it out and see if this is indeed a
> > prohibitive bottleneck.
> >
> > On Tue, Oct 29, 2019 at 1:29 PM Luke Cwik  wrote:
> > >
> > > You should first try the obvious answer of using a sliding window of 30 
> > > days every 10 minutes before you try the 60 days every 30 days.
> > > Beam has some optimizations which will assign a value to multiple windows 
> > > and only process that value once even if its in many windows. If that 
> > > doesn't perform well, then come back to dev@ and look to optimize.
> > >
> > > On Tue, Oct 29, 2019 at 1:22 PM Aaron Dixon  wrote:
> > >>
> > >> Hi I am new to Beam.
> > >>
> > >> I would like to accumulate data over 30 day period and perform a running 
> > >> aggregation over this data, say every 10 minutes.
> > >>
> > >> I could use a sliding window of 30 days every 10 minutes (triggering at 
> > >> end of window) but this seems grossly inefficient (both in terms of # of 
> > >> windows at play and # of events duplicated across these windows).
> > >>
> > >> A more efficient strategy seems to be to use a sliding window of 60 days 
> > >> every 30 days -- triggering every 10 minutes -- so that I'm guaranteed 
> > >> to have 30 days worth of data aggregated/combined in at least one of the 
> > >> 2 at-play sliding windows.
> > >>
> > >> The last piece of this puzzle however would be to do a final global 
> > >> aggregation over only the keys from the latest trigger of the earlier 
> > >> sliding window.
> > >>
> > >> But Beam does not seem to offer a way to orchestrate this. Even though 
> > >> this seems like it would be a pretty common or fundamental ask.
> > >>
> > >> One thought I had was to re-window in a way that would isolate keys 
> > >> triggered at the same time, in the same window but I don't see any 
> > >> contracts from Beam that would allow an approach like that.
> > >>
> > >> What am I missing?
> > >>
> > >>
> >


Re: RFC: python static typing PR

2019-10-30 Thread Robert Bradshaw
On Wed, Oct 30, 2019 at 1:26 PM Chad Dombrova  wrote:
>
>> Do you believe that a future mypy plugin could replace pipeline type checks 
>> in Beam, or are there limits to what it can do?
>
> mypy will get us quite far on its own once we completely annotate the beam 
> code.  That said, my PR does not include my efforts to turn PTransforms into 
> Generics, which will be required to properly analyze pipelines, so there's 
> still a lot more work to do.  I've experimented with a mypy plugin to smooth 
> over some of the rough spots in that workflow and I will just say that the 
> mypy API has a very steep learning curve.
>
> Another thing to note: mypy is very explicit about function annotations.  It 
> does not do the "implicit" inference that Beam does, such as automatically 
> detecting function return types.  I think it should be possible to do a lot 
> of that as a mypy plugin, and in fact, since it has little to do with Beam it 
> could grow into its own project with outside contributors.

Yeah, I don't think, as is, it can replace what we do, but with
plugins I think it could possibly come closer. Certainly there is
information that is only available at runtime (e.g. reading from a
database or avro/parquet file could provide the schema which can be
used for downstream checking) which may limit the ability to do
everything statically (even Beam Java is moving this direction). Mypy
clearly has an implementation of the "is compatible with" operator
that I would love to borrow, but unfortunately it's not (easily?)
exposed.

That being said, we should leverage what we can for pipeline
authoring, and it'll be a great development too regardless.


Re: Python SDK timestamp precision

2019-10-30 Thread Robert Bradshaw
On Wed, Oct 30, 2019 at 2:00 AM Jan Lukavský  wrote:
>
> TL;DR - can we solve this by representing aggregations as not point-wise
> events in time, but time ranges? Explanation below.
>
> Hi,
>
> this is pretty interesting from a theoretical point of view. The
> question generally seems to be - having two events, can I reliably order
> them? One event might be end of one window and the other event might be
> start of another window. There is strictly required causality in this
> (although these events in fact have the same limiting timestamp). On the
> other hand, when I have two (random) point-wise events (one with
> timestamp T1 and the other with timestamp T2), then causality of these
> two depend on distance in space. If these two events differ by single
> nanosecond, then if they do not originate very "close" to each other,
> then there is high probability that different observers will see them in
> different order (and hence there is no causality possible and the order
> doesn't matter).
>
> That is to say - if I'm dealing with single external event (someone
> tells me something has happened at time T), then - around the boundaries
> of windows - it doesn't matter which window is this event placed into.
> There is no causal connection between start of window and the event.

I resolve this theoretical issue by working in a space with a single
time dimension and zero spatial dimensions. That is, the location of
an event is completely determined by a single coordinate in time, and
distance and causality are hence well defined for all possible
observers :). Realistically, there is both error and ambiguity in
assigning absolute timestamps to real-world events, but it's important
that this choice of ordering should be preserved (e.g. not compressed
away) by the system.

> Different situation is when we actually perform an aggregation on
> multiple elements (GBK) - then although we produce output at certain
> event-time timestamp, this event is not "point-wise external", but is a
> running aggregation on multiple (and possibly long term) data. That is
> of course causally preceding opening a new (tumbling) window.
>
> The question now is - could we extend WindowedValue so that it doesn't
> contain only single point-wise event in time, but a time range in case
> of aggregations? Then the example of Window.into -> GBK -> Window.into
> could actually behave correctly (respecting causality) and another
> positive thing could be, that the resulting timestamp of the aggregation
> would no longer have to be window.maxTimestamp - 1 (which is kind of
> strange).
>
> This might be actually equivalent to the "minus epsilon" approach, but
> maybe better understandable. Moreover, this should be probably available
> to users, because one can easily get to the same problems when using
> stateful ParDo to perform a GBK-like aggregation.
>
> Thoughts?

This is quite an interesting idea. In some sense, timestamps become
like interval windows, and window assignment is similar to the window
mapping fns that we use for side inputs. I still think the idea of a
timestmap for an element and a window for an element is needed (e.g.
one can have elements in a single window, especially the global window
that have different timestamps), but this could be interesting to
explore. It could definitely get rid of the "minus epsilon" weirdness,
though I don't think it completely solves the granularity issues.

> On 10/30/19 1:05 AM, Robert Bradshaw wrote:
> > On Tue, Oct 29, 2019 at 4:20 PM Kenneth Knowles  wrote:
> >> Point (1) is compelling. Solutions to the "minus epsilon" seem a bit 
> >> complex. On the other hand, an opaque and abstract Timestamp type (in each 
> >> SDK) going forward seems like a Pretty Good Idea (tm). Would you really 
> >> have to go floating point? Could you just have a distinguished 
> >> representation for non-inclusive upper/lower bounds? These could be at the 
> >> same reduced resolution as timestamps in element metadata, since that is 
> >> all they are compared against.
> > If I were coming up with an abstract, opaque representation of
> > Timestamp (and Duration) for Beam, I would explicitly include the
> > "minus epsilon" concept. One could still do arithmetic with these.
> > This would make any conversion to standard datetime libraries lossy
> > though.
> >
> >> Point (2) is also good, though it seems like something that could be 
> >> cleverly engineered and/or we just provide one implementation and it is 
> >> easy to make your own for finer granularity, since a WindowFn separately 
> >> receives the Timestamp (here I'm pretending i

Re: Rethinking the Flink Runner modes

2019-10-30 Thread Robert Bradshaw
One more question: https://issues.apache.org/jira/browse/BEAM-8396
still seems valuable, but with [auto] as the default, how should we
detect whether LOOPBACK is safe to enable from Python?

On Wed, Oct 30, 2019 at 11:53 AM Robert Bradshaw  wrote:
>
> Sounds good to me.
>
> One thing I don't understand is what it means for "CLI or REST API
> context [to be] present." Where does this context come from? A config
> file in a standard location on the user's machine? Or is this
> something that is only present when a user uploads a jar and then
> Flink runs it in a specific context? Or both?
>
> On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels  wrote:
> >
> > tl;dr:
> >
> > - I see consensus for inferring "http://"; in Python to align it with the
> > Java behavior which currently requires leaving out the protocol scheme.
> > Optionally, Java could also accept a scheme which gets removed as
> > required by the Flink Java Rest client.
> >
> > - We won't support "https://"; in Python for now, because it requires
> > additional SSL setup, i.e. parsing the Flink config file and setting up
> > the truststore
> >
> > - We want to keep "[auto]"/"[local]" but fix the current broken behavior
> > via https://issues.apache.org/jira/browse/BEAM-8507
> >
> >
> > Additional comments below:
> >
> > > One concern with this is that just supplying host:port is the existing
> > > behavior, so we can't start requiring the http://
> >
> > I wouldn't require it but optionally support it, otherwise add it
> > automatically.
> >
> > > One question I have is if there are other
> > > authentication parameters that may be required for speaking to a flink
> > > endpoint that we should be aware of (that would normally be buried in
> > > the config file).
> >
> > Yes, essentially all the SSL configuration is in the config file,
> > including the location of the truststore, the password, certificates, etc.
> >
> > For now, I would say we cannot properly support SSL in Python, unless we
> > find a way to load the truststore from Python.
> >
> > > I do like being explicit with something like [local] rather than
> > > treating the empty string in a magical way.
> >
> > Fine, we can keep "[local]" and throw an error in case the address is
> > empty. Let's also throw an error in case the Flink CLI tool is used with
> > local execution because that is clearly not what the user wants.
> >
> > >> I'd like to see this issue resolved before 2.17 as changing the public 
> > >> API once it's released will be harder.
> > >
> > > +1. In particular, I misunderstood that [auto] is not supported by 
> > > `FlinkUberJarJobServer`. Since [auto] is now the default, it's broken for 
> > > Python 3.6+.
> >
> > +1 Let's fix this in time for the release.
> >
> > > The user shouldn't have to specify a protocol for Python, I think it's 
> > > preferable and reasonable to handle that for them in order to maintain 
> > > existing behavior and align with Java SDK.
> >
> > +1
> >
> > > Looks like we also have a [collection] configuration value [1].
> >
> > Yeah, I think it is acceptable to remove this entirely. This has never
> > been used by anyone and is an unmaintained Flink feature.
> >
> > > I would find it acceptable to interpret absence of the option as 
> > > "[auto]", which really means use CLI or REST API context when present, or 
> > > local. I would prefer to not have an empty string value default (but 
> > > rather None/null) and no additional magic values.
> >
> > Let's keep "[auto]" then to keep it explicit. An empty string should
> > throw an error.
> >
> >
> > > One more reason that was not part of this discussion yet.
> >
> > @Jan: Supporting context classloaders in local mode is a new feature and
> > for keeping it simple, I'd start a new thread for it.
> >
> >
> > On 29.10.19 10:55, Jan Lukavský wrote:
> > > Hi,
> > >
> > > +1 for empty string being interpreted as [auto] and anything else having
> > > explicit notation.
> > >
> > > One more reason that was not part of this discussion yet. In [1] there
> > > was a discussion about LocalEnvironment (that is the one that is
> > > responsible for spawning in process Flink cluster) not using context
> > > classloade

Re: Rethinking the Flink Runner modes

2019-10-30 Thread Robert Bradshaw
Sounds good to me.

One thing I don't understand is what it means for "CLI or REST API
context [to be] present." Where does this context come from? A config
file in a standard location on the user's machine? Or is this
something that is only present when a user uploads a jar and then
Flink runs it in a specific context? Or both?

On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels  wrote:
>
> tl;dr:
>
> - I see consensus for inferring "http://"; in Python to align it with the
> Java behavior which currently requires leaving out the protocol scheme.
> Optionally, Java could also accept a scheme which gets removed as
> required by the Flink Java Rest client.
>
> - We won't support "https://"; in Python for now, because it requires
> additional SSL setup, i.e. parsing the Flink config file and setting up
> the truststore
>
> - We want to keep "[auto]"/"[local]" but fix the current broken behavior
> via https://issues.apache.org/jira/browse/BEAM-8507
>
>
> Additional comments below:
>
> > One concern with this is that just supplying host:port is the existing
> > behavior, so we can't start requiring the http://
>
> I wouldn't require it but optionally support it, otherwise add it
> automatically.
>
> > One question I have is if there are other
> > authentication parameters that may be required for speaking to a flink
> > endpoint that we should be aware of (that would normally be buried in
> > the config file).
>
> Yes, essentially all the SSL configuration is in the config file,
> including the location of the truststore, the password, certificates, etc.
>
> For now, I would say we cannot properly support SSL in Python, unless we
> find a way to load the truststore from Python.
>
> > I do like being explicit with something like [local] rather than
> > treating the empty string in a magical way.
>
> Fine, we can keep "[local]" and throw an error in case the address is
> empty. Let's also throw an error in case the Flink CLI tool is used with
> local execution because that is clearly not what the user wants.
>
> >> I'd like to see this issue resolved before 2.17 as changing the public API 
> >> once it's released will be harder.
> >
> > +1. In particular, I misunderstood that [auto] is not supported by 
> > `FlinkUberJarJobServer`. Since [auto] is now the default, it's broken for 
> > Python 3.6+.
>
> +1 Let's fix this in time for the release.
>
> > The user shouldn't have to specify a protocol for Python, I think it's 
> > preferable and reasonable to handle that for them in order to maintain 
> > existing behavior and align with Java SDK.
>
> +1
>
> > Looks like we also have a [collection] configuration value [1].
>
> Yeah, I think it is acceptable to remove this entirely. This has never
> been used by anyone and is an unmaintained Flink feature.
>
> > I would find it acceptable to interpret absence of the option as "[auto]", 
> > which really means use CLI or REST API context when present, or local. I 
> > would prefer to not have an empty string value default (but rather 
> > None/null) and no additional magic values.
>
> Let's keep "[auto]" then to keep it explicit. An empty string should
> throw an error.
>
>
> > One more reason that was not part of this discussion yet.
>
> @Jan: Supporting context classloaders in local mode is a new feature and
> for keeping it simple, I'd start a new thread for it.
>
>
> On 29.10.19 10:55, Jan Lukavský wrote:
> > Hi,
> >
> > +1 for empty string being interpreted as [auto] and anything else having
> > explicit notation.
> >
> > One more reason that was not part of this discussion yet. In [1] there
> > was a discussion about LocalEnvironment (that is the one that is
> > responsible for spawning in process Flink cluster) not using context
> > classloader and thus can fail loading some user code (if this code was
> > added to context classloader *after* application has been run).
> > LocalEnvironment on the other hand supposes that all classes can be
> > loaded by applicaiton's classloader and doesn't accept any "client
> > jars". Therefore - when application generates classes dynamically during
> > runtime it is currently impossible to run those using local flink
> > runner. There is a nasty hack for JDK <= 8 (injecting URL into
> > applications URLClassLoader), but that fails hard on JDK >= 9 (obviously).
> >
> > The conclusion from that thread is that it could be solved by manually
> > running MiniCluster (which will run on localhost:8081 by default) and
> > then use this REST address for RemoteEnvironment so that the application
> > would be actually submitted as if it would be run on remote cluster and
> > therefore a dynamically generated JAR can be attached to it.
> >
> > That would mean, that we can actually have two "local" modes - one using
> > LocalEnvironment and one with manual MiniCluster + RemoteEnvironment (if
> > for whatever reason we would like to keep both mode of local operation).
> > That could mean two masters - e.g. [local] and [local-over-remote] or

Re: Python SDK timestamp precision

2019-10-29 Thread Robert Bradshaw
On Tue, Oct 29, 2019 at 4:20 PM Kenneth Knowles  wrote:
>
> Point (1) is compelling. Solutions to the "minus epsilon" seem a bit complex. 
> On the other hand, an opaque and abstract Timestamp type (in each SDK) going 
> forward seems like a Pretty Good Idea (tm). Would you really have to go 
> floating point? Could you just have a distinguished representation for 
> non-inclusive upper/lower bounds? These could be at the same reduced 
> resolution as timestamps in element metadata, since that is all they are 
> compared against.

If I were coming up with an abstract, opaque representation of
Timestamp (and Duration) for Beam, I would explicitly include the
"minus epsilon" concept. One could still do arithmetic with these.
This would make any conversion to standard datetime libraries lossy
though.

> Point (2) is also good, though it seems like something that could be cleverly 
> engineered and/or we just provide one implementation and it is easy to make 
> your own for finer granularity, since a WindowFn separately receives the 
> Timestamp (here I'm pretending it is abstract and opaque and likely 
> approximate) and the original element with whatever precision the original 
> data included.

Yes, but I don't see how a generic WindowFn would reach into the
(arbitrary) element and pull out this original data. One of the
benefits of the Beam model is that the WindowFn does not have to
depend on the element type.

> Point (3) the model/runner owns the timestamp metadata so I feel fine about 
> it being approximated as long as any original user data is still present. I 
> don't recall seeing a compelling case where the timestamp metadata that the 
> runner tracks and understands is required to be exactly the same as a user 
> value (assuming users understand this distinction, which is another issue 
> that I would separate from whether it is technically feasible).

As we provide the ability to designate user data as the runner
timestamp against which to window, and promote the runner timestamp
back to user data (people are going to want to get DateTime or Instant
objects out of it), it seems tricky to explain to users that one or
both of these operations may be lossy (and, in addition, I don't think
there's a consistently safe direction to round).

> The more I think about the very real problems you point out, the more I think 
> that our backwards-incompatible move should be to our own abstract Timestamp 
> type, putting the design decision behind a minimal interface. If we see a 
> concrete design for that data type, we might be inspired how to support more 
> possibilities.
>
> As for the rest of the speculation... moving to nanos immediately helps users 
> so I am now +1 on just doing it, or moving ahead with an abstract data type 
> under the assumption that it will basically be nanos under the hood.

If the fact that it's stored as nanos under the hood leaks out (and I
have trouble seeing how it won't) I'd lean towards just using them
directly (e.g. Java Instant) rather than wrapping it.

> Having a cleverly resolution-independent system is interesting and maybe 
> extremely future proof but maybe preparing for a very distant future that may 
> never come.
>
> Kenn
>
> On Fri, Oct 18, 2019 at 11:35 AM Robert Bradshaw  wrote:
>>
>> TL;DR: We should just settle on nanosecond precision ubiquitously for 
>> timestamp/windowing in Beam.
>>
>>
>> Re-visiting this discussion in light of cross-language transforms and 
>> runners, and trying to tighten up testing. I've spent some more time 
>> thinking about how we could make these operations granularity-agnostic, but 
>> just can't find a good solution. In particular, the sticklers seem to be:
>>
>> (1) Windows are half-open intervals, and the timestamp associated with a 
>> window coming out of a GBK is (by default) as large as possible but must 
>> live in that window. (Otherwise WindowInto + GBK + WindowInto would have the 
>> unforunate effect of moving aggregate values into subsequent windows, which 
>> is clearly not the intent.) In other words, the timestamp of a grouped value 
>> is basically End(Window) - epsilon. Unless we choose a representation able 
>> to encode "minus epsilon" we must agree on a granularity.
>>
>> (2) Unless we want to have multiple vairants of all our WindowFns (e.g. 
>> FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a 
>> granularity with which to parameterize these well-known operations. There 
>> are cases (e.g. side input window mapping, merging) where these Fns may be 
>> used downstream in contexts other than where they are applied/defined.
>>
>> (3) Reification 

Re: aggregating over triggered results

2019-10-29 Thread Robert Bradshaw
No matter how the problem is structured, computing 30 day aggregations
for every 10 minute window requires storing at least 30day/10min =
~4000 sub-aggregations. In Beam, the elements themselves are not
stored in every window, only the intermediate aggregates.

I second Luke's suggestion to try it out and see if this is indeed a
prohibitive bottleneck.

On Tue, Oct 29, 2019 at 1:29 PM Luke Cwik  wrote:
>
> You should first try the obvious answer of using a sliding window of 30 days 
> every 10 minutes before you try the 60 days every 30 days.
> Beam has some optimizations which will assign a value to multiple windows and 
> only process that value once even if its in many windows. If that doesn't 
> perform well, then come back to dev@ and look to optimize.
>
> On Tue, Oct 29, 2019 at 1:22 PM Aaron Dixon  wrote:
>>
>> Hi I am new to Beam.
>>
>> I would like to accumulate data over 30 day period and perform a running 
>> aggregation over this data, say every 10 minutes.
>>
>> I could use a sliding window of 30 days every 10 minutes (triggering at end 
>> of window) but this seems grossly inefficient (both in terms of # of windows 
>> at play and # of events duplicated across these windows).
>>
>> A more efficient strategy seems to be to use a sliding window of 60 days 
>> every 30 days -- triggering every 10 minutes -- so that I'm guaranteed to 
>> have 30 days worth of data aggregated/combined in at least one of the 2 
>> at-play sliding windows.
>>
>> The last piece of this puzzle however would be to do a final global 
>> aggregation over only the keys from the latest trigger of the earlier 
>> sliding window.
>>
>> But Beam does not seem to offer a way to orchestrate this. Even though this 
>> seems like it would be a pretty common or fundamental ask.
>>
>> One thought I had was to re-window in a way that would isolate keys 
>> triggered at the same time, in the same window but I don't see any contracts 
>> from Beam that would allow an approach like that.
>>
>> What am I missing?
>>
>>


Re: Python Precommit duration pushing 2 hours

2019-10-29 Thread Robert Bradshaw
https://github.com/apache/beam/pull/9925

On Tue, Oct 29, 2019 at 10:24 AM Udi Meiri  wrote:
>
> I don't have the bandwidth right now to tackle this. Feel free to take it.
>
> On Tue, Oct 29, 2019 at 10:16 AM Robert Bradshaw  wrote:
>>
>> The Python SDK does as well. These calls are coming from
>> to_runner_api, is_stateful_dofn, and validate_stateful_dofn which are
>> invoked once per pipene or bundle. They are, however, surprisingly
>> expensive. Even memoizing across those three calls should save a
>> significant amount of time. Udi, did you want to tackle this?
>>
>> Looking at the profile, Pipeline.to_runner_api() is being called 30
>> times in this test, and [Applied]PTransform.to_fn_api being called
>> 3111 times, so that in itself might be interesting to investigate.
>>
>> On Tue, Oct 29, 2019 at 8:26 AM Robert Burke  wrote:
>> >
>> > As does the Go SDK. Invokers are memoized and when possible code is 
>> > generated to avoid reflection.
>> >
>> > On Tue, Oct 29, 2019, 6:46 AM Kenneth Knowles  wrote:
>> >>
>> >> Noting for the benefit of the thread archive in case someone goes digging 
>> >> and wonders if this affects other SDKs: the Java SDK memoizes 
>> >> DoFnSignatures and generated DoFnInvoker classes.
>> >>
>> >> Kenn
>> >>
>> >> On Mon, Oct 28, 2019 at 6:59 PM Udi Meiri  wrote:
>> >>>
>> >>> Re: #9283 slowing down tests, ideas for slowness:
>> >>> 1. I added a lot of test cases, some with locally run pipelines.
>> >>> 2. The PR somehow changed how coders are selected, and now we're using 
>> >>> less efficient ones.
>> >>> 3. New dependency funcsigs is slowing things down? (py2 only)
>> >>>
>> >>> I ran "pytest -k PipelineAnalyzerTest --profile-svg" on 2.7 and 3.7 and 
>> >>> got these cool graphs (attached).
>> >>> 2.7: core:294:get_function_arguments takes 56.66% of CPU time (IIUC), 
>> >>> gets called ~230k times
>> >>> 3.7: core:294:get_function_arguments 30.88%, gets called ~200k times
>> >>>
>> >>> After memoization of get_function_args_defaults:
>> >>> 2.7: core:294:get_function_arguments 20.02%
>> >>> 3.7: core:294:get_function_arguments 8.11%
>> >>>
>> >>>
>> >>> On Mon, Oct 28, 2019 at 5:38 PM Pablo Estrada  wrote:
>> >>>>
>> >>>> *not deciles, but 9-percentiles : )
>> >>>>
>> >>>> On Mon, Oct 28, 2019 at 5:31 PM Pablo Estrada  
>> >>>> wrote:
>> >>>>>
>> >>>>> I've ran the tests in Python 2 (without cython), and used a utility to 
>> >>>>> track runtime for each test method. I found some of the following 
>> >>>>> things:
>> >>>>> - Total test methods run: 2665
>> >>>>> - Total test runtime: 990 seconds
>> >>>>> - Deciles of time spent:
>> >>>>>   - 1949 tests run in the first 9% of time
>> >>>>>   - 173 in the 9-18% rang3e
>> >>>>>   - 130 in the 18-27% range
>> >>>>>   - 95 in the 27-36% range
>> >>>>>   - 77
>> >>>>>   - 66
>> >>>>>   - 55
>> >>>>>   - 46
>> >>>>>   - 37
>> >>>>>   - 24
>> >>>>>   - 13 tests run in the last 9% of time. This represents about 1 
>> >>>>> minute and a half.
>> >>>>>
>> >>>>> We may be able to look at the slowest X tests, and get gradual 
>> >>>>> improvements from there. Although it seems .. not dramatic ones : )
>> >>>>>
>> >>>>> FWIW I uploaded the results here: 
>> >>>>> https://storage.googleapis.com/apache-beam-website-pull-requests/python-tests/nosetimes.json
>> >>>>>
>> >>>>> The slowest 13 tests were:
>> >>>>>
>> >>>>> [('apache_beam.runners.interactive.pipeline_analyzer_test.PipelineAnalyzerTest.test_basic',
>> >>>>>   5.253582000732422),
>> >>>>>  
>> >>>>> ('apache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest.test_wordcount',
>> >>>>>   7.90

Re: Python Precommit duration pushing 2 hours

2019-10-29 Thread Robert Bradshaw
gt;>>   13.993916988372803),
>>>>>  
>>>>> ('apache_beam.runners.interactive.pipeline_analyzer_test.PipelineAnalyzerTest.test_read_cache_expansion',
>>>>>   6.3383049964904785),
>>>>>  
>>>>> ('apache_beam.runners.interactive.pipeline_analyzer_test.PipelineAnalyzerTest.test_word_count',
>>>>>   9.157485008239746),
>>>>>  
>>>>> ('apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses.test_pardo_side_and_main_outputs',
>>>>>   5.191173076629639),
>>>>>  
>>>>> ('apache_beam.io.vcfio_test.VcfSourceTest.test_pipeline_read_file_pattern_large',
>>>>>   6.2221620082855225),
>>>>>  
>>>>> ('apache_beam.io.fileio_test.WriteFilesTest.test_streaming_complex_timing',
>>>>>   7.7187910079956055)]
>>>>>
>>>>> On Mon, Oct 28, 2019 at 3:10 PM Pablo Estrada  wrote:
>>>>>>
>>>>>> I have written https://github.com/apache/beam/pull/9910 to reduce 
>>>>>> FnApiRunnerTest variations.
>>>>>> I'm not in a rush to merge, but rather happy to start a discussion.
>>>>>> I'll also try to figure out if there are other tests slowing down the 
>>>>>> suite significantly.
>>>>>> Best
>>>>>> -P.
>>>>>>
>>>>>> On Fri, Oct 25, 2019 at 7:41 PM Valentyn Tymofieiev 
>>>>>>  wrote:
>>>>>>>
>>>>>>> Thanks, Brian.
>>>>>>> +Udi Meiri
>>>>>>> As next step, it would be good to know whether slowdown is caused by 
>>>>>>> tests in this PR, or its effect on other tests, and to confirm that 
>>>>>>> only Python 2 codepaths were affected.
>>>>>>>
>>>>>>> On Fri, Oct 25, 2019 at 6:35 PM Brian Hulette  
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> I did a bisect based on the runtime of `./gradlew 
>>>>>>>> :sdks:python:test-suites:tox:py2:testPy2Gcp` around the commits 
>>>>>>>> between 9/1 and 9/15 to see if I could find the source of the spike 
>>>>>>>> that happened around 9/6. It looks like it was due to PR#9283 [1]. I 
>>>>>>>> thought maybe this search would reveal some mis-guided configuration 
>>>>>>>> change, but as far as I can tell 9283 just added a well-tested 
>>>>>>>> feature. I don't think there's anything to learn from that... I just 
>>>>>>>> wanted to circle back about it in case others are curious about that 
>>>>>>>> spike.
>>>>>>>>
>>>>>>>> I'm +1 on bumping some FnApiRunner configurations.
>>>>>>>>
>>>>>>>> Brian
>>>>>>>>
>>>>>>>> [1] https://github.com/apache/beam/pull/9283
>>>>>>>>
>>>>>>>> On Fri, Oct 25, 2019 at 4:49 PM Pablo Estrada  
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> I think it makes sense to remove some of the extra FnApiRunner 
>>>>>>>>> configurations. Perhaps some of the multiworkers and some of the grpc 
>>>>>>>>> versions?
>>>>>>>>> Best
>>>>>>>>> -P.
>>>>>>>>>
>>>>>>>>> On Fri, Oct 25, 2019 at 12:27 PM Robert Bradshaw 
>>>>>>>>>  wrote:
>>>>>>>>>>
>>>>>>>>>> It looks like fn_api_runner_test.py is quite expensive, taking 10-15+
>>>>>>>>>> minutes on each version of Python. This test consists of a base class
>>>>>>>>>> that is basically a validates runner suite, and is then run in 
>>>>>>>>>> several
>>>>>>>>>> configurations, many more of which (including some expensive ones)
>>>>>>>>>> have been added lately.
>>>>>>>>>>
>>>>>>>>>> class FnApiRunnerTest(unittest.TestCase):
>>>>>>>>>> class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
>>>>>>>>>> class FnApiRunnerTestWithGrpcMultiThreade

Re: Rethinking the Flink Runner modes

2019-10-28 Thread Robert Bradshaw
Thanks for bringing this to the list. Some comments below, though it
would be good to get additional feedback beyond those that have been
participating on the PR, if any. I'd like to see this issue resolved
before 2.17 as changing the public API once it's released will be
harder.

On Mon, Oct 28, 2019 at 6:36 AM Maximilian Michels  wrote:
>
> Hi,
>
> Robert and Kyle have been doing great work to simplify submitting
> portable pipelines with the Flink Runner. Part of this is having a
> Python "FlinkRunner" which handles bringing up a Beam job server and
> submitting the pipeline directly via the Flink REST API. One building
> block is the creation of "executable Jars" which contain the
> materialized / translated Flink pipeline and do not require the Beam job
> server or the Python driver anymore.
>
> While unifying a newly introduced option "flink_master_url" with the
> pre-existing "flink_master" [1][2], some questions came up about Flink's
> execution modes. (The two options are meant to do the same thing:
> provide the address of the Flink master to hand-over the translated
> pipeline.)
>
> Historically, Flink had a proprietary protocol for submitting pipelines,
> running on port 9091. This has since been replaced with a REST protocol
> at port 8081. To this date, this has implications how you submit
> programs, e.g. the Flink client libraries expects the address to be of
> form "host:port", without a protocol scheme. On the other hand, external
> Rest libraries typically expect a protocol scheme.
>
> But this is only half of the fun. There are also special addresses for
> "flink_master" that influence submission of the pipeline. If you specify
> "[local]" as the address, the pipeline won't be submitted but executed
> in a local in-process Flink cluster. If you specify "[auto]" and you use
> the CLI tool that comes bundled with Flink, then the master address will
> be loaded from the Flink config, including any configuration like SSL.
> If none is found, then it falls back to "[local]".
>
> This is a bit odd, and after a discussion with Robert and Thomas in [1],
> we figured that this needs to be changed:
>
> 1. Make the master address a URL. Add "http://"; to "flink_master" in
> Python if no scheme is specified. Similarly, remove any "http://"; in
> Java, since the Java rest client does not expect a scheme. In case of
> "http_s_://", we have a special treatment to load the SSL settings from
> the Flink config.

One concern with this is that just supplying host:port is the existing
behavior, so we can't start requiring the http://. The question
becomes whether we should allow it, or infer http[s] (e.g. by trying
out https first). One question I have is if there are other
authentication parameters that may be required for speaking to a flink
endpoint that we should be aware of (that would normally be buried in
the config file).

> 2. Deprecate the "[auto]" and "[local]" values. It should be sufficient
> to have either a non-empty address string or an empty one. The empty
> string would either mean local execution or, in the context of the Flink
> CLI tool, loading the master address from the config. The non-empty
> string would be interpreted as a cluster address.

I do like being explicit with something like [local] rather than
treating the empty string in a magical way. Perhaps we could have
[config] be an alias for [auto] indicating to read the config to get
the parameter (if any). The tricky bit it seems is that if running as
CLI (where, as I understand it, the jar is then executed on the
cluster) one does not want to run it truly locally but on that
cluster.

For non-CLI, the two options for default are to always run locally
(the least surprising to me, but it'd be good to get feedback from
others) or to read the config file and submit it to the value there,
if any.

> Any opinions on this?
>
>
> Thanks,
> Max
>
>
> [1] https://github.com/apache/beam/pull/9803
> [2] https://github.com/apache/beam/pull/9844


Re: RFC: python static typing PR

2019-10-28 Thread Robert Bradshaw
Thanks, Chad, this has been a herculean task. I'm excited for the
additional tooling and documentation explicit types can bring to our
code, even if tooling such as mypy isn't able to do as much inference
for obvious cases as I would like.

This will, of course, put another burden on developers in contributing
code, similar to lint and writing unit tests. IMHO this will be worth
the cost in terms of encouraging better code, but I think it's
important to establish consensus if this is the direction we're
moving. So if you have any thoughts or questions, positive or
negative, please comment.


On Mon, Oct 28, 2019 at 10:34 AM Chad Dombrova  wrote:
>
> Hi all,
> I've been working on a PR to add static typing to the beam python sdk for the 
> past 4 months or so.  This has been an epic journey which has required 
> chasing down numerous fixes across several other projects (mypy, pylint, 
> python-future), but the mypy tests are now passing!
>
> I'm not sure how much convincing I need to do with this group on the benefits 
> of static typing, especially considering the heavy Java influence on this 
> project, but for those who are curious, there's some good info here:  
> https://realpython.com/python-type-checking/#pros-and-cons.  Suffice it to 
> say, it's a game changer for projects like Beam (large code base, high level 
> of quality, good testing infrastructure), and it dovetails perfectly into the 
> efforts surrounding pipeline type analysis and serialization.  Anecdotally 
> speaking, all of the developers I've worked with who were originally 
> resistant to static typing in python ("it's ugly!"  "it's not pythonic!") 
> have changed their tune and fully embraced it because of the time that it 
> saves them every day.
>
> More details over at the PR:  https://github.com/apache/beam/pull/9056
>
> I look forward to your feedback!
>
> -chad
>


Re: JIRA priorities explaination

2019-10-25 Thread Robert Bradshaw
+1 to both.

On Fri, Oct 25, 2019 at 3:58 PM Valentyn Tymofieiev  wrote:
>
> On Fri, Oct 25, 2019 at 3:39 PM Kenneth Knowles  wrote:
>>
>> Suppose, hypothetically, we say that if Fix Version is set, then P0/Blocker 
>> and P1/Critical block release and lower priorities get bumped.
>
>
> +1 to Kenn's suggestion.  In addition, we can discourage setting Fix version 
> for non-critical issues before issues are fixed.
>
>>
>>
>> Most likely the release manager still pings and asks about all those before 
>> bumping. Which means that in effect they were part of the burn down and do 
>> block the release in the sense that they must be re-triaged away to the next 
>> release. I would prefer less work for the release manager and more emphasis 
>> on the default being nonblocking.
>>
>> One very different possibility is to ignore Fix Version on open bugs and use 
>> a different search query as the burndown, auto bump everything that didn't 
>> make it.
>
> This may create a situation where an issue will eventually be closed, but Fix 
> Version not updated, and confuse the users who will rely "Fix Version" to  
> find which release actually fixes the issue. A pass over open bugs with a Fix 
> Version set to next release (as currently done by a release manager) helps to 
> make sure that unfixed bugs won't have Fix Version tag of the upcoming 
> release.
>
>>
>> Kenn
>>
>> On Fri, Oct 25, 2019, 14:16 Robert Bradshaw  wrote:
>>>
>>> I'm fine with that, but in that case we should have a priority for
>>> release blockers, below which bugs get automatically bumped to the
>>> next release (and which becomes the burndown list).
>>>
>>> On Fri, Oct 25, 2019 at 1:58 PM Kenneth Knowles  wrote:
>>> >
>>> > My takeaway from this thread is that priorities should have a shared 
>>> > community intuition and/or policy around how they are treated, which 
>>> > could eventually be formalized into SLOs.
>>> >
>>> > At a practical level, I do think that build breaks are higher priority 
>>> > than release blockers. If you are on this thread but not looking at the 
>>> > PR, here is the verbiage I added about urgency:
>>> >
>>> > P0/Blocker: "A P0 issue is more urgent than simply blocking the next 
>>> > release"
>>> > P1/Critical: "Most critical bugs should block release"
>>> > P2/Major: "No special urgency is associated"
>>> > ...
>>> >
>>> > Kenn
>>> >
>>> > On Fri, Oct 25, 2019 at 11:46 AM Robert Bradshaw  
>>> > wrote:
>>> >>
>>> >> We cut a release every 6 weeks, according to schedule, making it easy
>>> >> to plan for, and the release manager typically sends out a warning
>>> >> email to remind everyone. I don't think it makes sense to do that for
>>> >> every ticket. Blockers should be reserved for things we really
>>> >> shouldn't release without.
>>> >>
>>> >> On Fri, Oct 25, 2019 at 11:33 AM Pablo Estrada  
>>> >> wrote:
>>> >> >
>>> >> > I mentioned on the PR that I had been using the 'blocker' priority 
>>> >> > along with the 'fix version' field to mark issues that I want to get 
>>> >> > in the release.
>>> >> > Of course, this little practice of mine only matters much around 
>>> >> > release branch cutting time - and has been useful for me to track 
>>> >> > which things I want to ensure getting into the release / bump to the 
>>> >> > next /etc.
>>> >> > I've also found it to be useful as a way to communicate with the 
>>> >> > release manager without having to sync directly.
>>> >> >
>>> >> > What would be a reasonable way to tell the release manager "I'd like 
>>> >> > to get this feature in. please talk to me if you're about to cut the 
>>> >> > branch" - that also uses the priorities appropriately? - and that 
>>> >> > allows the release manager to know when a fix version is "more 
>>> >> > optional" / "less optional"?
>>> >> >
>>> >> > On Wed, Oct 23, 2019 at 12:20 PM Kenneth Knowles  
>>> >> > wrote:
>>> >> >>
>>> >> >> I finally got around to 

Re: Multiple Outputs from Expand in Python

2019-10-25 Thread Robert Bradshaw
You can literally return a Python tuple of outputs from a composite
transform as well. (Dicts with PCollections as values are also
supported, if you want things to be named rather than referenced by
index.)

On Fri, Oct 25, 2019 at 4:06 PM Ahmet Altay  wrote:
>
> Is DoOutputsTuple what you are looking for? [1] You can look at this expand 
> function using it [2].
>
> [1] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pvalue.py#L204
> [2] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py#L1283
>
> On Fri, Oct 25, 2019 at 3:51 PM Luke Cwik  wrote:
>>
>> My example is about multiple inputs and not multiple outputs from further 
>> investigation it seems as I don't know.
>>
>> Looking at the documentation online[1] doesn't seem to specify how to do 
>> this either for composite transforms. All the examples are of the single 
>> output variety as well[2].
>>
>> 1: 
>> https://beam.apache.org/documentation/programming-guide/#composite-transforms
>> 2: 
>> https://github.com/apache/beam/blob/4ba731fe93f7f8385c771caf576745d14edf34b8/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
>>
>> On Fri, Oct 25, 2019 at 10:24 AM Luke Cwik  wrote:
>>>
>>> I believe PCollectionTuple should be unnecessary since Python has first 
>>> class support for tuples as shown in the example below[1]. Can we use 
>>> tuples to solve your issue?
>>>
>>> wordsStartingWithA = \
>>> p | 'Words starting with A' >> beam.Create(['apple', 'ant', 'arrow'])
>>>
>>> wordsStartingWithB = \
>>> p | 'Words starting with B' >> beam.Create(['ball', 'book', 'bow'])
>>>
>>> ((wordsStartingWithA, wordsStartingWithB)
>>> | beam.Flatten()
>>> | LogElements())
>>>
>>> 1: 
>>> https://github.com/apache/beam/blob/238659bce8043e6a64619a959ab44453dbe22dff/learning/katas/python/Core%20Transforms/Flatten/Flatten/task.py#L29
>>>
>>> On Fri, Oct 25, 2019 at 10:11 AM Sam Rohde  wrote:

 Talked to Daniel offline and it looks like the Python SDK is missing 
 PCollection Tuples like the one Java has: 
 https://github.com/rohdesamuel/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java.

 I'll go ahead and implement that for the Python SDK.

 On Thu, Oct 24, 2019 at 5:20 PM Sam Rohde  wrote:
>
> Hey All,
>
> I'm trying to implement an expand override with multiple output 
> PCollections. The kicker is that I want to insert a new transform for 
> each output PCollection. How can I do this?
>
> Regards,
> Sam


Re: JIRA priorities explaination

2019-10-25 Thread Robert Bradshaw
I'm fine with that, but in that case we should have a priority for
release blockers, below which bugs get automatically bumped to the
next release (and which becomes the burndown list).

On Fri, Oct 25, 2019 at 1:58 PM Kenneth Knowles  wrote:
>
> My takeaway from this thread is that priorities should have a shared 
> community intuition and/or policy around how they are treated, which could 
> eventually be formalized into SLOs.
>
> At a practical level, I do think that build breaks are higher priority than 
> release blockers. If you are on this thread but not looking at the PR, here 
> is the verbiage I added about urgency:
>
> P0/Blocker: "A P0 issue is more urgent than simply blocking the next release"
> P1/Critical: "Most critical bugs should block release"
> P2/Major: "No special urgency is associated"
> ...
>
> Kenn
>
> On Fri, Oct 25, 2019 at 11:46 AM Robert Bradshaw  wrote:
>>
>> We cut a release every 6 weeks, according to schedule, making it easy
>> to plan for, and the release manager typically sends out a warning
>> email to remind everyone. I don't think it makes sense to do that for
>> every ticket. Blockers should be reserved for things we really
>> shouldn't release without.
>>
>> On Fri, Oct 25, 2019 at 11:33 AM Pablo Estrada  wrote:
>> >
>> > I mentioned on the PR that I had been using the 'blocker' priority along 
>> > with the 'fix version' field to mark issues that I want to get in the 
>> > release.
>> > Of course, this little practice of mine only matters much around release 
>> > branch cutting time - and has been useful for me to track which things I 
>> > want to ensure getting into the release / bump to the next /etc.
>> > I've also found it to be useful as a way to communicate with the release 
>> > manager without having to sync directly.
>> >
>> > What would be a reasonable way to tell the release manager "I'd like to 
>> > get this feature in. please talk to me if you're about to cut the branch" 
>> > - that also uses the priorities appropriately? - and that allows the 
>> > release manager to know when a fix version is "more optional" / "less 
>> > optional"?
>> >
>> > On Wed, Oct 23, 2019 at 12:20 PM Kenneth Knowles  wrote:
>> >>
>> >> I finally got around to writing some of this up. It is minimal. Feedback 
>> >> is welcome, especially if what I have written does not accurately 
>> >> represent the community's approach.
>> >>
>> >> https://github.com/apache/beam/pull/9862
>> >>
>> >> Kenn
>> >>
>> >> On Mon, Feb 11, 2019 at 3:21 PM Daniel Oliveira  
>> >> wrote:
>> >>>
>> >>> Ah, sorry, I missed that Alex was just quoting from our Jira 
>> >>> installation (didn't read his email closely enough). Also I wasn't aware 
>> >>> about those pages on our website.
>> >>>
>> >>> Seeing as we do have definitions for our priorities, I guess my main 
>> >>> request would be that they be made more discoverable somehow. I don't 
>> >>> think the tooltips are reliable, and the pages on the website are 
>> >>> informative, but hard to find. Since it feels a bit lazy to say "this 
>> >>> isn't discoverable enough" without suggesting any improvements, I'd like 
>> >>> to propose these two changes:
>> >>>
>> >>> 1. We should write a Beam Jira Guide with basic information about our 
>> >>> Jira. I think the bug priorities should go in here, but also anything 
>> >>> else we would want someone to know before filing any Jira issues, like 
>> >>> how our components are organized or what the different issue types mean. 
>> >>> This guide could either be written in the website or the wiki, but I 
>> >>> think it should definitely be linked in 
>> >>> https://beam.apache.org/contribute/ so that newcomers read it before 
>> >>> getting their Jira account approved. The goal here being to have a 
>> >>> reference for the basics of our Jira since at the moment it doesn't seem 
>> >>> like we have anything for this.
>> >>>
>> >>> 2. The existing info on Post-commit and pre-commit policies doesn't seem 
>> >>> very discoverable to someone monitoring the Pre/Post-commits. I've 
>> >&g

Re: [DISCUSS] How to stopp SdkWorker in SdkHarness

2019-10-25 Thread Robert Bradshaw
I think we'll still need approach (2) for when the pipeline finishes
and a runner is tearing down workers.

On Fri, Oct 25, 2019 at 10:36 AM Maximilian Michels  wrote:
>
> Hi Jincheng,
>
> Thanks for bringing this up and capturing the ideas in the doc.
>
> Intuitively, I would have also considered adding a new Proto message for
> the teardown, but I think the idea to trigger this logic when the SDK
> Harness evicts process bundle descriptors is more elegant.
>
> Thanks,
> Max
>
> On 25.10.19 17:23, Luke Cwik wrote:
> > I like approach 3 since it doesn't add additional complexity to the API
> > and individual SDKs can choose to implement any clean-up strategy they
> > want or none at all which is the simplest.
> >
> > On Thu, Oct 24, 2019 at 8:46 PM jincheng sun  > > wrote:
> >
> > Hi,
> >
> > Thanks for your comments in doc, I have add Approach 3 which you
> > mentioned! @Luke
> >
> > For now, we should do a decision for Approach 3 and Approach 1.
> > Detail can be found in doc [1]
> >
> > Welcome anyone's feedback :)
> >
> > Regards,
> > Jincheng
> >
> > [1]
> > 
> > https://docs.google.com/document/d/1sCgy9VQPf9zVXKRquK8P6N4x7aB62GEO8ozkujRSHZg/edit?usp=sharing
> >
> > jincheng sun  > > 于2019年10月25日周五 上午10:40写道:
> >
> > Hi,
> >
> > Functionally capable of `abort`, but it will be called at the
> > end of operator. So, I prefer `dispose` semantics. i.e., all
> > normal logic has been executed.
> >
> > Best,
> > Jincheng
> >
> > Harsh Vardhan mailto:anan...@google.com>>
> > 于2019年10月23日周三 上午12:14写道:
> >
> > Would approach 1 be akin to abort semantics?
> >
> > On Mon, Oct 21, 2019 at 8:01 PM jincheng sun
> > mailto:sunjincheng...@gmail.com>>
> > wrote:
> >
> > Hi Luke,
> >
> > Thanks a lot for your reply. Since it allows to share
> > one SDK harness between multiple executable stages, the
> > control service termination may occur much later than
> > the completion of an executable stage. This is the main
> > reason I prefer runners to control the teardown of DoFns.
> >
> > Regarding to "SDK harnesses can terminate instances any
> > time they want and start new instances anytime as
> > well.", personally I think it's not conflict with the
> > proposed Approach 1 as the SDK harness could decide what
> > to do when receiving the teardown request. It could do
> > nothing if the DoFns has already been teared down and
> > could also tear down the DoFns if needed.
> >
> > What do you think?
> >
> > Best,
> > Jincheng
> >
> > Luke Cwik mailto:lc...@google.com>>
> > 于2019年10月22日周二 上午2:05写道:
> >
> > Approach 2 is currently the suggested approach[1]
> > for DoFn's to shutdown.
> > Note that SDK harnesses can terminate instances any
> > time they want and start new instances anytime as well.
> >
> > Why do you want to expose this logic so that Runners
> > could control it?
> >
> > 1:
> > 
> > https://docs.google.com/document/d/1n6s3BOxOPct3uF4UgbbI9O9rpdiKWFH9R6mtVmR7xp0/edit#
> >
> > On Mon, Oct 21, 2019 at 4:27 AM jincheng sun
> >  > > wrote:
> >
> > Hi,
> > I found that in `SdkHarness` do not  stop the
> > `SdkWorker` when finish.  We should add the
> > logic for stop the `SdkWorker` in `SdkHarness`.
> > More detail can be found [1].
> >
> > There are two approaches to solve this issue:
> >
> > Approach 1:  We can add a Fn API for teardown
> > purpose and the runner will teardown a specific
> > bundle descriptor via this teardown Fn API
> > during disposing.
> > Approach 2: The control service termination
> > could be seen as a signal and once SDK harness
> > receives this signal, the teardown of the bundle
> > descriptor will be performed.
> >
> > More detail can be found in [2].
> >
> > As the Approach 2, SDK harness could be shared
> > between multiple executable stages. The control
> > service termination only occurs when all the
> >

Re: Python Precommit duration pushing 2 hours

2019-10-25 Thread Robert Bradshaw
It looks like fn_api_runner_test.py is quite expensive, taking 10-15+
minutes on each version of Python. This test consists of a base class
that is basically a validates runner suite, and is then run in several
configurations, many more of which (including some expensive ones)
have been added lately.

class FnApiRunnerTest(unittest.TestCase):
class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
class FnApiRunnerTestWithGrpcMultiThreaded(FnApiRunnerTest):
class FnApiRunnerTestWithDisabledCaching(FnApiRunnerTest):
class FnApiRunnerTestWithMultiWorkers(FnApiRunnerTest):
class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest):
class FnApiRunnerTestWithBundleRepeat(FnApiRunnerTest):
class FnApiRunnerTestWithBundleRepeatAndMultiWorkers(FnApiRunnerTest):

I'm not convinced we need to run all of these permutations, or at
least not all tests in all permutations.

On Fri, Oct 25, 2019 at 10:57 AM Valentyn Tymofieiev
 wrote:
>
> I took another look at this and precommit ITs are already running in 
> parallel, albeit in the same suite. However it appears Python precommits 
> became slower, especially Python 2 precommits [35 min per suite x 3 suites], 
> see [1]. Not sure yet what caused the increase, but precommits used to be 
> faster. Perhaps we have added a slow test or a lot of new tests.
>
> [1] https://scans.gradle.com/s/jvcw5fpqfc64k/timeline?task=ancsbov425524
>
> On Thu, Oct 24, 2019 at 4:53 PM Ahmet Altay  wrote:
>>
>> Ack. Separating precommit ITs to a different suite sounds good. Anyone is 
>> interested in doing that?
>>
>> On Thu, Oct 24, 2019 at 2:41 PM Valentyn Tymofieiev  
>> wrote:
>>>
>>> This should not increase the queue time substantially, since precommit ITs 
>>> are running sequentially with precommit tests, unlike multiple precommit 
>>> tests which run in parallel to each other.
>>>
>>> The precommit ITs we run are batch and streaming wordcount tests on Py2 and 
>>> one Py3 version, so it's not a lot of tests.
>>>
>>> On Thu, Oct 24, 2019 at 1:07 PM Ahmet Altay  wrote:

 +1 to separating ITs from precommit. Downside would be, when Chad tried to 
 do something similar [1] it was noted that the total time to run all 
 precommit tests would increase and also potentially increase the queue 
 time.

 Another alternative, we could run a smaller set of IT tests in precommits 
 and run the whole suite as part of post commit tests.

 [1] https://github.com/apache/beam/pull/9642

 On Thu, Oct 24, 2019 at 12:15 PM Valentyn Tymofieiev  
 wrote:
>
> One improvement could be move to Precommit IT tests into a separate suite 
> from precommit tests, and run it in parallel.
>
> On Thu, Oct 24, 2019 at 11:41 AM Brian Hulette  
> wrote:
>>
>> Python Precommits are taking quite a while now [1]. Just visually it 
>> looks like the average length is 1.5h or so, but it spikes up to 2h. 
>> I've had several precommit runs get aborted due to the 2 hour limit.
>>
>> It looks like there was a spike up above 1h back on 9/6 and the duration 
>> has been steadily rising since then. Is there anything we can do about 
>> this?
>>
>> Brian
>>
>> [1] 
>> http://104.154.241.245/d/_TNndF2iz/pre-commit-test-latency?orgId=1&from=now-90d&to=now&fullscreen&panelId=4


Re: JIRA priorities explaination

2019-10-25 Thread Robert Bradshaw
We cut a release every 6 weeks, according to schedule, making it easy
to plan for, and the release manager typically sends out a warning
email to remind everyone. I don't think it makes sense to do that for
every ticket. Blockers should be reserved for things we really
shouldn't release without.

On Fri, Oct 25, 2019 at 11:33 AM Pablo Estrada  wrote:
>
> I mentioned on the PR that I had been using the 'blocker' priority along with 
> the 'fix version' field to mark issues that I want to get in the release.
> Of course, this little practice of mine only matters much around release 
> branch cutting time - and has been useful for me to track which things I want 
> to ensure getting into the release / bump to the next /etc.
> I've also found it to be useful as a way to communicate with the release 
> manager without having to sync directly.
>
> What would be a reasonable way to tell the release manager "I'd like to get 
> this feature in. please talk to me if you're about to cut the branch" - that 
> also uses the priorities appropriately? - and that allows the release manager 
> to know when a fix version is "more optional" / "less optional"?
>
> On Wed, Oct 23, 2019 at 12:20 PM Kenneth Knowles  wrote:
>>
>> I finally got around to writing some of this up. It is minimal. Feedback is 
>> welcome, especially if what I have written does not accurately represent the 
>> community's approach.
>>
>> https://github.com/apache/beam/pull/9862
>>
>> Kenn
>>
>> On Mon, Feb 11, 2019 at 3:21 PM Daniel Oliveira  
>> wrote:
>>>
>>> Ah, sorry, I missed that Alex was just quoting from our Jira installation 
>>> (didn't read his email closely enough). Also I wasn't aware about those 
>>> pages on our website.
>>>
>>> Seeing as we do have definitions for our priorities, I guess my main 
>>> request would be that they be made more discoverable somehow. I don't think 
>>> the tooltips are reliable, and the pages on the website are informative, 
>>> but hard to find. Since it feels a bit lazy to say "this isn't discoverable 
>>> enough" without suggesting any improvements, I'd like to propose these two 
>>> changes:
>>>
>>> 1. We should write a Beam Jira Guide with basic information about our Jira. 
>>> I think the bug priorities should go in here, but also anything else we 
>>> would want someone to know before filing any Jira issues, like how our 
>>> components are organized or what the different issue types mean. This guide 
>>> could either be written in the website or the wiki, but I think it should 
>>> definitely be linked in https://beam.apache.org/contribute/ so that 
>>> newcomers read it before getting their Jira account approved. The goal here 
>>> being to have a reference for the basics of our Jira since at the moment it 
>>> doesn't seem like we have anything for this.
>>>
>>> 2. The existing info on Post-commit and pre-commit policies doesn't seem 
>>> very discoverable to someone monitoring the Pre/Post-commits. I've reported 
>>> a handful of test-failures already and haven't seen this link mentioned 
>>> much. We should try to find a way to funnel people towards this link when 
>>> there's an issue, the same way we try to funnel people towards the 
>>> contribution guide when they write a PR. As a note, while writing this 
>>> email I remembered this link that someone gave me before 
>>> (https://s.apache.org/beam-test-failure). That mentions the Post-commit 
>>> policies page, so maybe it's just a matter of pasting that all over our 
>>> Jenkins builds whenever we have a failing test?
>>>
>>> PS: I'm also definitely for SLOs, but I figure it's probably better 
>>> discussed in a separate thread so I'm trying to stick to the subject of 
>>> priority definitions.
>>>
>>> On Mon, Feb 11, 2019 at 9:17 AM Scott Wegner  wrote:

 Thanks for driving this discussion. I also was not aware of these existing 
 definitions. Once we agree on the terms, let's add them to our Contributor 
 Guide and start using them.

 +1 in general; I like both Alex and Kenn's definitions; Additional 
 wordsmithing could be moved to a Pull Request. Can we make the definitions 
 useful for both the person filing a bug, and the assignee, i.e.

 : . 
 

 On Sun, Feb 10, 2019 at 7:49 PM Kenneth Knowles  wrote:
>
> The content that Alex posted* is the definition from our Jira 
> installation anyhow.
>
> I just searched around, and there's 
> https://community.atlassian.com/t5/Jira-questions/According-to-Jira-What-is-Blocker-Critical-Major-Minor-and/qaq-p/668774
>  which makes clear that this is really user-defined, since Jira has many 
> deployments with their own configs.
>
> I guess what I want to know about this thread is what action is being 
> proposed?
>
> Previously, there was a thread that resulted in 
> https://beam.apache.org/contribute/precommit-policies/ and 
> https://beam.apache.org/contribute/postcommits-policies/. These

Re: Interactive Beam Example Failing [BEAM-8451]

2019-10-24 Thread Robert Bradshaw
Yes, there are plans to support streaming for interactive beam. David
Yan (cc'd) is leading this effort.

On Thu, Oct 24, 2019 at 1:50 PM Harsh Vardhan  wrote:
>
> Thanks, +1 to adding support for streaming on Interactive Beam (+David Yan)
>
>
> On Thu, Oct 24, 2019 at 1:45 PM Hai Lu  wrote:
>>
>> Hi Robert,
>>
>> We're trying out iBeam at LinkedIn for Python. As Igor mentioned, there 
>> seems to be some inconsistency in the behavior of interactive beam. We can 
>> suggest some fixes from our end but we would need some support from the 
>> community.
>>
>> Also, is there a plan to support iBeam for streaming mode? We're interested 
>> in that use case as well.
>>
>> Thanks,
>> Hai
>>
>> On Mon, Oct 21, 2019 at 4:45 PM Robert Bradshaw  wrote:
>>>
>>> Thanks for trying this out. Yes, this is definitely something that
>>> should be supported (and tested).
>>>
>>> On Mon, Oct 21, 2019 at 3:40 PM Igor Durovic  wrote:
>>> >
>>> > Hi everyone,
>>> >
>>> > The interactive beam example using the DirectRunner fails after execution 
>>> > of the last cell. The recursion limit is exceeded during the calculation 
>>> > of the cache label because of a circular reference in the PipelineInfo 
>>> > object.
>>> >
>>> > The constructor for the PipelineInfo class creates a mapping from each 
>>> > pcollection to the transforms that produce and consume it. The issue 
>>> > arises when there exists a transform that is both a producer and a 
>>> > consumer for the same pcollection. This occurs when a transform's expand 
>>> > method returns the same pcoll object that's passed into it. The specific 
>>> > transform causing the failure of the example is MaybeReshuffle, which is 
>>> > used in the Create transform. Replacing "return pcoll" with "return pcoll 
>>> > | Map(lambda x: x)" seems to fix the problem.
>>> >
>>> > A workaround for this issue on the interactive beam side would be fairly 
>>> > simple, but it seems to me that there should be more validation of 
>>> > pipelines to prevent the use of transforms that return the same pcoll 
>>> > that's passed in, or at least a mention of this in the transform style 
>>> > guide. My understanding is that pcollections are produced by a single 
>>> > transform (they even have a field called "producer" that references only 
>>> > one transform). If that's the case then that property of pcollections 
>>> > should be enforced.
>>> >
>>> > I made ticket BEAM-8451 to track this issue.
>>> >
>>> > I'm still new to beam so I apologize if I'm fundamentally 
>>> > misunderstanding something. I'm not exactly sure what the next step 
>>> > should be and would appreciate some recommendations. I can submit a PR to 
>>> > solve the immediate problem of the failing example but the underlying 
>>> > problem should also be addressed at some point. I also apologize if 
>>> > people are already aware of this problem.
>>> >
>>> > Thank You!
>>> > Igor Durovic


Re: Interactive Beam Example Failing [BEAM-8451]

2019-10-21 Thread Robert Bradshaw
Thanks for trying this out. Yes, this is definitely something that
should be supported (and tested).

On Mon, Oct 21, 2019 at 3:40 PM Igor Durovic  wrote:
>
> Hi everyone,
>
> The interactive beam example using the DirectRunner fails after execution of 
> the last cell. The recursion limit is exceeded during the calculation of the 
> cache label because of a circular reference in the PipelineInfo object.
>
> The constructor for the PipelineInfo class creates a mapping from each 
> pcollection to the transforms that produce and consume it. The issue arises 
> when there exists a transform that is both a producer and a consumer for the 
> same pcollection. This occurs when a transform's expand method returns the 
> same pcoll object that's passed into it. The specific transform causing the 
> failure of the example is MaybeReshuffle, which is used in the Create 
> transform. Replacing "return pcoll" with "return pcoll | Map(lambda x: x)" 
> seems to fix the problem.
>
> A workaround for this issue on the interactive beam side would be fairly 
> simple, but it seems to me that there should be more validation of pipelines 
> to prevent the use of transforms that return the same pcoll that's passed in, 
> or at least a mention of this in the transform style guide. My understanding 
> is that pcollections are produced by a single transform (they even have a 
> field called "producer" that references only one transform). If that's the 
> case then that property of pcollections should be enforced.
>
> I made ticket BEAM-8451 to track this issue.
>
> I'm still new to beam so I apologize if I'm fundamentally misunderstanding 
> something. I'm not exactly sure what the next step should be and would 
> appreciate some recommendations. I can submit a PR to solve the immediate 
> problem of the failing example but the underlying problem should also be 
> addressed at some point. I also apologize if people are already aware of this 
> problem.
>
> Thank You!
> Igor Durovic


Re: Test failures in python precommit: ZipFileArtifactServiceTest

2019-10-21 Thread Robert Bradshaw
I just merged https://github.com/apache/beam/pull/9845 which should
resolve the issue.

On Mon, Oct 21, 2019 at 12:58 PM Chad Dombrova  wrote:
>
> thanks!
>
> On Mon, Oct 21, 2019 at 12:47 PM Kyle Weaver  wrote:
>>
>> This issue is being tracked at 
>> https://issues.apache.org/jira/browse/BEAM-8416.
>>
>> On Mon, Oct 21, 2019 at 9:42 PM Chad Dombrova  wrote:
>>>
>>> Hi all,
>>> Is anyone else getting these errors in 
>>> apache_beam.runners.portability.artifact_service_test.ZipFileArtifactServiceTest?
>>>
>>> They seem to be taking two forms:
>>>
>>> zipfile.BadZipFile: Bad CRC-32 for file 
>>> '/3e3ff9aa4fe679c1bf76383e69bfb5e2167afb945aa30e15f05406cc8f55ad14/9367417d63903350aeb7e092bca792263d4fd82d4912252e014e073a8931b4c1'
>>>
>>> zipfile.BadZipFile: Bad magic number for file header
>>>
>>> Here are some gradle scans:
>>>
>>> https://scans.gradle.com/s/b7jd7oyu5f5f6/console-log?task=:sdks:python:test-suites:tox:py37:testPy37Cython#L14473
>>>
>>> https://scans.gradle.com/s/4iega3kyf5kw2/console-log?task=:sdks:python:test-suites:tox:py37:testPython37#L13749
>>>
>>> I got it to go through eventually after 4 tries.
>>>
>>> -chad


Re: Are empty bundles allowed by model?

2019-10-21 Thread Robert Bradshaw
Yes, the test should be fixed.

On Mon, Oct 21, 2019 at 11:20 AM Jan Lukavský  wrote:
>
> Hi Robert,
>
> I though it would be that case. ParDoLifecycleTest, however, does not
> currently allow for empty bundles. We have currently worked around this
> in Flink by avoiding the creation of these bundles, but maybe the test
> should be modified so that it adheres to the model [1].
>
> Jan
>
> [1] https://github.com/apache/beam/pull/9846
>
> On 10/21/19 6:00 PM, Robert Bradshaw wrote:
> > Yes, the model allows them.
> >
> > It also takes less work to avoid them in general (e.g. imagine one
> > reshuffles N elements to M > N workers. A priori, one would "start" a
> > bundle and then try to read all data destined for that
> > worker--postponing this until one knows that the set of data for this
> > worker could be an optimization (as could not doing so as a form of
> > speculative execution) but should not be necessary.
> >
> > - Robert
> >
> > On Mon, Oct 21, 2019 at 7:03 AM Jan Lukavský  wrote:
> >> Hi Max,
> >>
> >> that is true, but then we have two orthogonal issues:
> >>
> >>a) correctness - if empty bundles are aligned with the model, then
> >> validates runner tests should take that into account
> >>
> >>b) performance - that can be dealt with in separate JIRA issue, if 
> >> needed
> >>
> >> WDYT?
> >>
> >> Jan
> >>
> >> On 10/21/19 3:22 PM, Maximilian Michels wrote:
> >>> Hi Jan,
> >>>
> >>> I think it is aligned with the model to create empty bundles. The
> >>> question if course, whether it is preferable to avoid them, since the
> >>> Setup/Finish state might be costly, depending on the bundle size and
> >>> the type of DoFn used.
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On 21.10.19 14:13, Kyle Weaver wrote:
> >>>> Nevermind, this is discussed on the PR linked.
> >>>>
> >>>> On Mon, Oct 21, 2019 at 2:11 PM Kyle Weaver  >>>> <mailto:kcwea...@google.com>> wrote:
> >>>>
> >>>>  Do you know why an empty bundle might be created?
> >>>>
> >>>>  On Mon, Oct 21, 2019 at 1:42 PM Jan Lukavský  >>>>  <mailto:je...@seznam.cz>> wrote:
> >>>>
> >>>>  Hi,
> >>>>
> >>>>  when debugging a flaky ParDoLifecycleTest in FlinkRunner, I have
> >>>>  found a
> >>>>  situation, where Flink might create empty bundle - i.e. call
> >>>>  @StartBundle immediately followed by @FinishBundle, with no
> >>>>  elements
> >>>>  inside the bundle. That is what breaks the ParDoLifecycleTest,
> >>>>  because
> >>>>  the test explicitly assumes, that the sequence of lifecycle
> >>>> methods
> >>>>  should be StartBundle -> Process Element -> Finish Bundle. It is
> >>>>  easy to
> >>>>  modify the test to accept situation of StartBundle ->
> >>>>  FinishBundle with
> >>>>  no elements ([1]), but the question is, is this allowed by the
> >>>>  model? I
> >>>>  think there is no reason not to be, but I'd like to be sure.
> >>>>
> >>>>  Thanks,
> >>>>
> >>>> Jan
> >>>>
> >>>>  [1] https://github.com/apache/beam/pull/9841
> >>>>


Re: Are empty bundles allowed by model?

2019-10-21 Thread Robert Bradshaw
Yes, the model allows them.

It also takes less work to avoid them in general (e.g. imagine one
reshuffles N elements to M > N workers. A priori, one would "start" a
bundle and then try to read all data destined for that
worker--postponing this until one knows that the set of data for this
worker could be an optimization (as could not doing so as a form of
speculative execution) but should not be necessary.

- Robert

On Mon, Oct 21, 2019 at 7:03 AM Jan Lukavský  wrote:
>
> Hi Max,
>
> that is true, but then we have two orthogonal issues:
>
>   a) correctness - if empty bundles are aligned with the model, then
> validates runner tests should take that into account
>
>   b) performance - that can be dealt with in separate JIRA issue, if needed
>
> WDYT?
>
> Jan
>
> On 10/21/19 3:22 PM, Maximilian Michels wrote:
> > Hi Jan,
> >
> > I think it is aligned with the model to create empty bundles. The
> > question if course, whether it is preferable to avoid them, since the
> > Setup/Finish state might be costly, depending on the bundle size and
> > the type of DoFn used.
> >
> > Cheers,
> > Max
> >
> > On 21.10.19 14:13, Kyle Weaver wrote:
> >> Nevermind, this is discussed on the PR linked.
> >>
> >> On Mon, Oct 21, 2019 at 2:11 PM Kyle Weaver  >> > wrote:
> >>
> >> Do you know why an empty bundle might be created?
> >>
> >> On Mon, Oct 21, 2019 at 1:42 PM Jan Lukavský  >> > wrote:
> >>
> >> Hi,
> >>
> >> when debugging a flaky ParDoLifecycleTest in FlinkRunner, I have
> >> found a
> >> situation, where Flink might create empty bundle - i.e. call
> >> @StartBundle immediately followed by @FinishBundle, with no
> >> elements
> >> inside the bundle. That is what breaks the ParDoLifecycleTest,
> >> because
> >> the test explicitly assumes, that the sequence of lifecycle
> >> methods
> >> should be StartBundle -> Process Element -> Finish Bundle. It is
> >> easy to
> >> modify the test to accept situation of StartBundle ->
> >> FinishBundle with
> >> no elements ([1]), but the question is, is this allowed by the
> >> model? I
> >> think there is no reason not to be, but I'd like to be sure.
> >>
> >> Thanks,
> >>
> >>Jan
> >>
> >> [1] https://github.com/apache/beam/pull/9841
> >>


Re: Python SDK timestamp precision

2019-10-18 Thread Robert Bradshaw
On Fri, Oct 18, 2019 at 2:36 PM Luke Cwik  wrote:
>
> Robert it seems like your for Plan A. Assuming we go forward with nanosecond 
> and based upon your analysis in 3), wouldn't that mean we would have to make 
> a breaking change to the Java SDK to swap to nanosecond precision?

Yes. But regardless I think we want to migrate from JodaTime (millis)
to Java's Instant (nanos). We'll likely have to keep the JodaTime
variant around (deprecated, with the caveat that it truncates if any
of your pipeline has sub-millisecond handling).

> On Fri, Oct 18, 2019 at 11:35 AM Robert Bradshaw  wrote:
>>
>> TL;DR: We should just settle on nanosecond precision ubiquitously for 
>> timestamp/windowing in Beam.
>>
>>
>> Re-visiting this discussion in light of cross-language transforms and 
>> runners, and trying to tighten up testing. I've spent some more time 
>> thinking about how we could make these operations granularity-agnostic, but 
>> just can't find a good solution. In particular, the sticklers seem to be:
>>
>> (1) Windows are half-open intervals, and the timestamp associated with a 
>> window coming out of a GBK is (by default) as large as possible but must 
>> live in that window. (Otherwise WindowInto + GBK + WindowInto would have the 
>> unforunate effect of moving aggregate values into subsequent windows, which 
>> is clearly not the intent.) In other words, the timestamp of a grouped value 
>> is basically End(Window) - epsilon. Unless we choose a representation able 
>> to encode "minus epsilon" we must agree on a granularity.
>>
>> (2) Unless we want to have multiple vairants of all our WindowFns (e.g. 
>> FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a 
>> granularity with which to parameterize these well-known operations. There 
>> are cases (e.g. side input window mapping, merging) where these Fns may be 
>> used downstream in contexts other than where they are applied/defined.
>>
>> (3) Reification of the timestamp into user-visible data, and the other way 
>> around, require a choice of precision to expose to the user. This means that 
>> the timestamp is actual data, and truncating/rounding cannot be done 
>> implicitly. Also round trip of reification and application of timestamps 
>> should hopefully be idempotent no matter the SDK.
>>
>> The closest I've come is possibly parameterizing the timestamp type, where 
>> encoding, decoding (including pulling the end out of a window?), comparison 
>> (against each other and a watermark), "minus epsilon", etc could be UDFs. 
>> Possibly we'd need the full set of arithmetic operations to implement 
>> FixedWindows on an unknown timestamp type. Reification would simply be 
>> dis-allowed (or return an opaque rather than SDK-native) type if the SDK did 
>> not know that window type. The fact that one might need comparison between 
>> timestamps of different types, or (lossless) coercion from one type to 
>> another, means that timestamp types need to know about each other, or 
>> another entity needs to know about the full cross-product, unless there is a 
>> common base-type (at which point we might as well always choose that).
>>
>> An intermediate solution is to settle on floating (decimal) point 
>> representation, plus a "minus-epsiloin" bit. It wouldn't quite solve the 
>> mapping through SDK-native types (which could require rounding or errors or 
>> a new opaque type, and few date librarys could faithfully expose the minus 
>> epsilon part). It might also be more expensive (compute and storage), and 
>> would not allow us to use the protofuf timestamp/duration fields (or any 
>> standard date/time libraries).
>>
>> Unless we can come up with a clean solution to the issues above shortly, I 
>> think we should fix a precision and move forward. If this makes sense to 
>> everyone, then we can start talking about the specific choice of precision 
>> and a migration path (possibly only for portability).
>>
>>
>> For reference, the manipulations we do on timestamps are:
>>
>> WindowInto: Timestamp -> Window
>> TimestampCombine: Window, [Timestamp] -> Timestamp
>> End(Window)
>> Min(Timestamps)
>> Max(Timestamps)
>> PastEndOfWindow: Watermark, Window -> {True, False}
>>
>> [SideInput]WindowMappingFn: Window -> Window
>> WindowInto(End(Window))
>>
>> GetTimestamp: Timestamp -> SDK Native Object
>> EmitAtTimestamp: SDK Native Object -> Timestamp
>>
>>
>>
>>
>>
>>
>

Re: Python SDK timestamp precision

2019-10-18 Thread Robert Bradshaw
TL;DR: We should just settle on nanosecond precision ubiquitously for
timestamp/windowing in Beam.


Re-visiting this discussion in light of cross-language transforms and
runners, and trying to tighten up testing. I've spent some more time
thinking about how we could make these operations granularity-agnostic, but
just can't find a good solution. In particular, the sticklers seem to be:

(1) Windows are half-open intervals, and the timestamp associated with a
window coming out of a GBK is (by default) as large as possible but must
live in that window. (Otherwise WindowInto + GBK + WindowInto would have
the unforunate effect of moving aggregate values into subsequent windows,
which is clearly not the intent.) In other words, the timestamp of a
grouped value is basically End(Window) - epsilon. Unless we choose a
representation able to encode "minus epsilon" we must agree on a
granularity.

(2) Unless we want to have multiple vairants of all our WindowFns (e.g.
FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a
granularity with which to parameterize these well-known operations. There
are cases (e.g. side input window mapping, merging) where these Fns may be
used downstream in contexts other than where they are applied/defined.

(3) Reification of the timestamp into user-visible data, and the other way
around, require a choice of precision to expose to the user. This means
that the timestamp is actual data, and truncating/rounding cannot be done
implicitly. Also round trip of reification and application of timestamps
should hopefully be idempotent no matter the SDK.

The closest I've come is possibly parameterizing the timestamp type, where
encoding, decoding (including pulling the end out of a window?), comparison
(against each other and a watermark), "minus epsilon", etc could be UDFs.
Possibly we'd need the full set of arithmetic operations to implement
FixedWindows on an unknown timestamp type. Reification would simply be
dis-allowed (or return an opaque rather than SDK-native) type if the SDK
did not know that window type. The fact that one might need comparison
between timestamps of different types, or (lossless) coercion from one type
to another, means that timestamp types need to know about each other, or
another entity needs to know about the full cross-product, unless there is
a common base-type (at which point we might as well always choose that).

An intermediate solution is to settle on floating (decimal) point
representation, plus a "minus-epsiloin" bit. It wouldn't quite solve the
mapping through SDK-native types (which could require rounding or errors or
a new opaque type, and few date librarys could faithfully expose the minus
epsilon part). It might also be more expensive (compute and storage), and
would not allow us to use the protofuf timestamp/duration fields (or any
standard date/time libraries).

Unless we can come up with a clean solution to the issues above shortly, I
think we should fix a precision and move forward. If this makes sense to
everyone, then we can start talking about the specific choice of precision
and a migration path (possibly only for portability).


For reference, the manipulations we do on timestamps are:

WindowInto: Timestamp -> Window
TimestampCombine: Window, [Timestamp] -> Timestamp
End(Window)
Min(Timestamps)
Max(Timestamps)
PastEndOfWindow: Watermark, Window -> {True, False}

[SideInput]WindowMappingFn: Window -> Window
WindowInto(End(Window))

GetTimestamp: Timestamp -> SDK Native Object
EmitAtTimestamp: SDK Native Object -> Timestamp






On Fri, May 10, 2019 at 1:33 PM Robert Bradshaw  wrote:

> On Thu, May 9, 2019 at 9:32 AM PM Kenneth Knowles  wrote:
>
> > From: Robert Bradshaw 
> > Date: Wed, May 8, 2019 at 3:00 PM
> > To: dev
> >
> >> From: Kenneth Knowles 
> >> Date: Wed, May 8, 2019 at 6:50 PM
> >> To: dev
> >>
> >> >> The end-of-window, for firing, can be approximate, but it seems it
> >> >> should be exact for timestamp assignment of the result (and similarly
> >> >> with the other timestamp combiners).
> >> >
> >> > I was thinking that the window itself should be stored as exact data,
> while just the firing itself is approximated, since it already is, because
> of watermarks and timers.
> >>
> >> I think this works where we can compare encoded windows, but some
> >> portable interpretation of windows is required for runner-side
> >> implementation of merging windows (for example).
> >
> > But in this case, you've recognized the URN of the WindowFn anyhow, so
> you understand its windows. Remembering that IntervalWindow is just one
> choice, and that windows themselves are totally user-defined and that
> merging logic is complete

Re: RFC: Assigning environments to transforms in a pipeline

2019-10-16 Thread Robert Bradshaw
Sounds nice. Is there a design doc (or, perhaps, you could just give an
example of what this would look like in this thread)?

On Wed, Oct 16, 2019 at 5:51 PM Chad Dombrova  wrote:

> Hi all,
> One of our goals for the portability framework is to be able to assign
> different environments to different segments of a pipeline.  This is not
> possible right now because environments are a concept that really only
> exist in the portable runner as protobuf messages:  they lack a proper API
> on the pipeline definition side of things.
>
> As a first step toward our goal, one of our team members just created a
> PR[1] exposing environments as a proper class hierarchy, akin to
> PTransforms, PCollections, Coders, etc. It's quite straightforward, and we
> were careful to adhere to existing patterns for similar types, so hopefully
> the end result feels natural.  After this PR is merged, our next step will
> be to create a proposal for assigning environments to transforms.
>
> Let us know what you think!
>
> -chad
>
> [1] https://github.com/apache/beam/pull/9811
>
>


Re: [design] A streaming Fn API runner for Python

2019-10-15 Thread Robert Bradshaw
Very excited to see this! I've added some comments to the doc.

On Tue, Oct 15, 2019 at 3:43 PM Pablo Estrada  wrote:

> I've just been informed that access wasn't open. I've since opened access
> to it.
> Thanks
> -P.
>
> On Tue, Oct 15, 2019 at 2:10 PM Pablo Estrada  wrote:
>
>> Hello all,
>> I am planning to work on removing the old BundleBasedDirectRunner, and
>> expand the FnApiRunner to work on streaming as well as batch.
>> Currently, the FnApiRunner orders the processing graph topologically, and
>> "pushes" all the data through each stage in topological order (deferred
>> inputs such as residuals and timers are immediately pushed to the SDK as
>> well).
>> The new design would change from this
>> push-all-data-through-topologically-sorted-stages model to having queues
>> for "bundles", or for elements that are awaiting processing, and routing
>> them to the appropriate bundle processing subgraph.
>>
>> The design is here: http://s.apache.org/streaming-fn-runner-py
>>
>> I expect
>>
>> I'd appreciate comments and everything : )
>> Best
>> -P.
>>
>


Re: So much green

2019-10-11 Thread Robert Bradshaw
Very nice to see, thanks for sharing.

On Fri, Oct 11, 2019 at 5:44 AM Maximilian Michels  wrote:
>
> Glad to see that we have fixed the recent flakes. Let's keep up the good
> work :)
>
> -Max
>
> On 10.10.19 23:37, Kenneth Knowles wrote:
> > All the cells in the pull request template are green right now and have
> > been for most of the afternoon & evening. I just thought I would share
> > this wonderful fact with everyone. Seeing it really makes an impression!
> >
> > Kenn


Re: Python thread pool executor for Apache Beam

2019-10-11 Thread Robert Bradshaw
Can we use a lower default timeout to mitigate this issue in the short
term (I'd imagine one second or possibly smaller would be sufficient
for our use), and get a fix upstream in the long term?

On Fri, Oct 11, 2019 at 9:38 AM Luke Cwik  wrote:
>
> I'm looking for a thread pool that re-uses threads that are idle before 
> creating new ones and has an API that is compatible with the 
> concurrent.futures ThreadPoolExecutor[1].
>
> To my knowledge, the concurrent.futures ThreadPool creates new threads for 
> tasks up until the thread pool limit before re-using existing ones for all 
> Python versions prior to 3.8.
>
> I tried using CollapsingThreadPoolExecutor within pr/9477[2] but after 
> testing it with Apache Beam, I found that it has some pool shutdown issues[3].
>
> Does anyone have any suggestions for a good Python library that contains a 
> stable thread pool implementation?
>
> Preferably the library that provides the thread pool would have no 
> dependencies and be compatible with the same Python versions that Apache Beam 
> is compatible with today.
>
> 1: 
> https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor
> 1: https://github.com/apache/beam/pull/9477
> 2: https://github.com/ftpsolutions/collapsing-thread-pool-executor/issues/3


Re: Beam Python fails to run on macOS 10.15?

2019-10-10 Thread Robert Bradshaw
Looks like an issue with the protobuf library. Do you know what
version of protobuf you're using? (E.g. by running pip freeze.)

I don't have Catalina to test this on, but it'd be useful if you could
winnow this down to the import that fails.

On Thu, Oct 10, 2019 at 8:15 AM Kamil Wasilewski
 wrote:
>
> Hi all,
>
> I've recently updated my macOS to 10.15 Catalina. Since then, I have the 
> following error when I try to import apache_beam package (both in python 2.7 
> and 3.x):
>
> >>> import apache_beam
> [libprotobuf ERROR google/protobuf/descriptor_database.cc:58] File already 
> exists in database:
> [libprotobuf FATAL google/protobuf/descriptor.cc:1370] CHECK failed: 
> GeneratedDatabase()->Add(encoded_file_descriptor, size):
> libc++abi.dylib: terminating with uncaught exception of type 
> google::protobuf::FatalException: CHECK failed: 
> GeneratedDatabase()->Add(encoded_file_descriptor, size):
>
> [1]43669 abort  python
>
> Does anyone has the same problem? I saw that someone else had the same error 
> at the beam-python slack channel, so I guess this problem is not limited to 
> my workstation.
>
> Thanks,
> Kamil


Re: [DISCUSS] JIRA open/close emails?

2019-10-10 Thread Robert Bradshaw
I would also propose that we keep this traffic off of dev@--I don't
think the average dev@ subscriber would want to see these and for
those that do there are alternative ways of getting them (e.g. custom
email notifications, or issues@ with filtering).

On Thu, Oct 10, 2019 at 8:30 AM Kyle Weaver  wrote:
>
> Customized email notifications from Jira +1. Jira notifications will be 
> useful to some, but not all, so they should be opt-in rather than filter-out. 
> I spend enough time filtering internal spam as is :/
>
> On Thu, Oct 10, 2019 at 8:12 AM Chad Dombrova  wrote:
>>
>>
>>
>> On Thu, Oct 10, 2019 at 10:56 AM Maximilian Michels  wrote:
>>>
>>> Our mailing list already has decent traffic. The current solution is
>>> better for its readability. People would simply adapt to more emails by
>>> creating a filter or ignoring them.
>>
>>
>>
>> +1. I find the dev generally informative, but as a part-time external 
>> contributor open/close emails would detract from its usefulness for me.
>>
>> As an alternative to a dedicated mailing list for this purpose, we could 
>> create a wiki article on how to setup customized email notifications from 
>> Jira, with an example specifically for open/close events.  That’s something 
>> I might actually use, but only with additional filters like “sdk-python”.
>>
>> -chad
>>
>>
>>>
>>>
>>> I know that the triaging of issues can be slow. How about a dedicated
>>> mailing for only open/close notifications, and a weekly / monthly
>>> summary of untriaged issues to the dev mailing list?
>>>
>>> Thanks,
>>> Max
>>>
>>> On 09.10.19 21:58, Thomas Weise wrote:
>>> > Depends on JIRA volume, also. There are projects where the dev@ is
>>> > heavily populated with create JIRA notifications (and other traffic goes
>>> > under a bit).
>>> >
>>> > I'm generally in favor of making the creation of JIRAs more visible. But
>>> > if there isn't broad enough interest these notifications can still be
>>> > surfaced individually by filtering.
>>> >
>>> > With "close" you probably mean resolved? That's kind of nice to know
>>> > when something is complete, but I'm unsure if it should go to dev@,
>>> > because create provided the opportunity for interested parties to watch
>>> > the JIRA they care about.
>>> >
>>> >
>>> > On Wed, Oct 9, 2019 at 8:58 PM Manu Zhang >> > > wrote:
>>> >
>>> > +1. Like the subscribe to only "close issue" action instead of each
>>> > comment on GitHub
>>> >
>>> > Manu
>>> >
>>> > On Thu, Oct 10, 2019 at 10:51 AM Kenneth Knowles >> > > wrote:
>>> >
>>> > Currently, all email from JIRA goes to iss...@beam.apache.org
>>> > .
>>> >
>>> > I just learned that HBase has a JIRA configuration so that issue
>>> > open/close goes to dev@ and every other notification goes to
>>> > issues@. That actually seems nice for involving all of the
>>> > community in new issue triage and also closed issue
>>> > verification/notification.
>>> >
>>> > What do you think about such a system?
>>> >
>>> > Kenn
>>> >


Re: [spark structured streaming runner] merge to master?

2019-10-10 Thread Robert Bradshaw
On Thu, Oct 10, 2019 at 12:39 AM Etienne Chauchot  wrote:
>
> Hi guys,
>
> You probably know that there has been for several months an work
> developing a new Spark runner based on Spark Structured Streaming
> framework. This work is located in a feature branch here:
> https://github.com/apache/beam/tree/spark-runner_structured-streaming
>
> To attract more contributors and get some user feedback, we think it is
> time to merge it to master. Before doing so, some steps need to be achieved:
>
> - finish the work on spark Encoders (that allow to call Beam coders)
> because, right now, the runner is in an unstable state (some transforms
> use the new way of doing ser/de and some use the old one, making a
> pipeline incoherent toward serialization)
>
> - clean history: The history contains commits from November 2018, so
> there is a good amount of work, thus a consequent number of commits.
> They were already squashed but not from September 2019

I don't think the number of commits should be an issue--we shouldn't
just squash years worth of history away. (OTOH, if this is a case of
this branch containing lots of little, irrelevant commits that would
have normally been squashed away in the normal review process we do
for the main branch, then, yes, some cleanup could be nice.)

> Regarding status:
>
> - the runner passes 89% of the validates runner tests in batch mode. We
> hope to pass more with the new Encoders
>
> - Streaming mode is barely started (waiting for the multi-aggregations
> support in spark SS framework from the Spark community)
>
> - Runner can execute Nexmark
>
> - Some things are not wired up yet
>
>  - Beam Schemas not wired with Spark Schemas
>
>  - Optional features of the model not implemented:  state api, timer
> api, splittable doFn api, …
>
> WDYT, can we merge it to master once the 2 steps are done ?

I think that as long as it sits parallel to the existing runner, and
is clearly marked with its status, it makes sense to me. How many
changes does it make to the existing codebase (as opposed to add new
code)?


Re: [portability] Removing the old portable metrics API...

2019-10-09 Thread Robert Bradshaw
To clarify on the Dataflow story, there are three Dataflow (worker)
implementations: the legacy non-portable one, the Portability Java Runner
Harness (currently used by Python Streaming), and the Portability Unified
Worker (intended to replace the Java Runner Harness). MonitoringInfos
doesn't apply to the legacy one, is implemented for the JRH, but the UW
still uses the old-style counters.

So, I think we can't delete the old-style yet, but hopefully soon (pending
Go + UW support--it's on the roadmaps but I don't have an ETA).

On Wed, Oct 9, 2019 at 11:40 AM Alex Amato  wrote:

> @Robert Bradshaw  Dataflow is updated to use
> MonitoringInfos.
>
> This is specifically referring to the FN API Layer. Beam Python and Beam
> Java export metrics using the new APIs. And the DataflowRunner harness is
> consuming and using those. When I was removed from that project, most of
> the metrics were implemented in the
> Python and Java SDKs as MonitoringInfos.
>
>
>
> Java SDK
>
> Python SDK
>
> Go SDK
>
> User Counters
>
> Done
>
> Done
>
> Legacy FN API
>
> User Distributions
>
> Done
>
> Done
>
> Legacy FN API
>
> Execution Time Start
>
> Done
>
> Done
>
> Not Started
>
> Execution Time Process()
>
> Done
>
> Done
>
> Not Started
>
> Execution Time Finish()
>
> Done
>
> Done
>
> Not Started
>
> Element Count
>
> Done
>
> Done
>
> Legacy FN API
>
> Sampled PColl Byte Size
>
> Pending (PR/8416 <https://github.com/apache/beam/pull/8416>)
>
> Handoff instructions
>
> BEAM-7462 <https://issues.apache.org/jira/browse/BEAM-7462?filter=-2>
>
> Done
>
> Legacy FN API
>
> And the Dataflow Java Runner Harness was consuming this. +Mikhail
> Gryzykhin  implemented the runner harness layer.
>
> Do delete the deprecated stuff, we would need to get the Go SDK on
> MonitoringInfos for what it has implemented so far.
>
> Integration test coverage could be increased. But we wrote this test
> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py>
> .
>
>
> On Wed, Oct 9, 2019 at 10:51 AM Luke Cwik  wrote:
>
>> One way would be to report both so this way we don't need to update the
>> Dataflow Java implementation but other runners using the new API get all
>> the metrics.
>>
>> On Mon, Oct 7, 2019 at 10:00 AM Robert Bradshaw 
>> wrote:
>>
>>> Yes, Dataflow still uses the old API, for both counters and for its
>>> progress/autoscaling mechanisms. We'd need to convert that over as
>>> well (which is on the TODO list but lower than finishing up support
>>> for portability in general).
>>>
>>> On Mon, Oct 7, 2019 at 9:56 AM Robert Burke  wrote:
>>> >
>>> > The Go SDK uses the old API [1], but it shouldn't be too hard to
>>> migrate it.
>>> >
>>> > The main thing I'd want to do at the same time is move the
>>> dependencies on the protos out of that package and have those live only in
>>> the harness package [2]. I wasn't aware of that particular separation of
>>> concerns until much later, but allows for alternative harness
>>> implementations.
>>> >
>>> > I have some other work to get the Per-DoFn profiling metrics (eleemnt
>>> count, size, time) into the Go SDK this quarter, so I can handle this then.
>>> >
>>> > [1]
>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/metrics/metrics.go#L474
>>> > [2]
>>> https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/core/runtime/harness
>>> >
>>> > On Fri, Oct 4, 2019 at 6:14 PM Pablo Estrada 
>>> wrote:
>>> >>
>>> >> Hello devs,
>>> >> I recently took a look at how Dataflow is retrieving metrics from the
>>> Beam SDK harnesses, and noticed something. As you may (or may not)
>>> remember, the portability API currently has two ways of reporting metrics.
>>> Namely, the newer MonitoringInfo API[1], and the older Metrics one[2].
>>> >>
>>> >> This is somewhat troublesome because now we have two things that do
>>> the same thing. The SDKs report double the amount of metrics[3][4], and I
>>> bet it's confusing for runner implementers.
>>> >>
>>> >> Luckily, it seems like the Flink and Spark runners do use the new API
>>> [5][6] - yay! : ) - so I guess then the only runner that uses the old API
>>> is Dataflow

Re: Please comment on draft comms strategy by Oct 16

2019-10-09 Thread Robert Bradshaw
Probably worth mentioning Slack and StackOverflow as well.

On Wed, Oct 9, 2019 at 3:59 PM María Cruz  wrote:
>
> Hi all,
> sorry for multiple messages. I realized after sending the first email that a 
> new thread with a different subject was probably more efficient.
>
> I created a communication strategy draft. To start, I did a map of Beam 
> channels and content, and I have some questions for you: 
> https://github.com/macruzbar/beam/blob/master/Communication-strategy-DRAFT.md
>
> In order to create these files, I forked the repo. Once this looks good, and 
> if everyone agrees, we can merge the changes to apache/beam.
>
> I didn't assign reviewers for this file because I don't know if there is 
> someone who usually looks at these kinds of documents. So everyone: please 
> feel free to pitch in! I will give this a week for comments.
>
> Looking forward to your comments!
>
> María


Re: [portability] Removing the old portable metrics API...

2019-10-07 Thread Robert Bradshaw
Yes, Dataflow still uses the old API, for both counters and for its
progress/autoscaling mechanisms. We'd need to convert that over as
well (which is on the TODO list but lower than finishing up support
for portability in general).

On Mon, Oct 7, 2019 at 9:56 AM Robert Burke  wrote:
>
> The Go SDK uses the old API [1], but it shouldn't be too hard to migrate it.
>
> The main thing I'd want to do at the same time is move the dependencies on 
> the protos out of that package and have those live only in the harness 
> package [2]. I wasn't aware of that particular separation of concerns until 
> much later, but allows for alternative harness implementations.
>
> I have some other work to get the Per-DoFn profiling metrics (eleemnt count, 
> size, time) into the Go SDK this quarter, so I can handle this then.
>
> [1] 
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/metrics/metrics.go#L474
> [2] 
> https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/core/runtime/harness
>
> On Fri, Oct 4, 2019 at 6:14 PM Pablo Estrada  wrote:
>>
>> Hello devs,
>> I recently took a look at how Dataflow is retrieving metrics from the Beam 
>> SDK harnesses, and noticed something. As you may (or may not) remember, the 
>> portability API currently has two ways of reporting metrics. Namely, the 
>> newer MonitoringInfo API[1], and the older Metrics one[2].
>>
>> This is somewhat troublesome because now we have two things that do the same 
>> thing. The SDKs report double the amount of metrics[3][4], and I bet it's 
>> confusing for runner implementers.
>>
>> Luckily, it seems like the Flink and Spark runners do use the new API [5][6] 
>> - yay! : ) - so I guess then the only runner that uses the old API is 
>> Dataflow? (internally)
>>
>> Which way does the Samza runner use? +Hai Lu?
>> How about the Go SDK +Robert Burke ? - Ah I bet this uses the old API?
>>
>> If they all use the MonitoringInfos, we may be able to clean up the old api, 
>> and move to the new one (somewhat)soon : )
>>
>> [1] 
>> https://github.com/apache/beam/blob/v2.15.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L395
>> [2] 
>> https://github.com/apache/beam/blob/v2.15.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L391
>> [3] 
>> https://github.com/apache/beam/blob/c1007b678a00ea85671872236edef940a8e56adc/sdks/python/apache_beam/runners/worker/sdk_worker.py#L406-L414
>> [4] 
>> https://github.com/apache/beam/blob/c1007b678a00ea85671872236edef940a8e56adc/sdks/python/apache_beam/runners/worker/sdk_worker.py#L378-L384
>>
>> [5] 
>> https://github.com/apache/beam/blob/44fa33e6518574cb9561f47774e218e0910093fe/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java#L94-L97
>> [6] 
>> https://github.com/apache/beam/blob/932bd80a17171bd2d8157820ffe09e8389a52b9b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java#L219-L226


Re: [VOTE] Release 2.16.0, release candidate #1

2019-10-04 Thread Robert Bradshaw
OK, this appears to have been a weird config issue on my system
(though the error certainly could have been better). As BEAM-8303 has
a workaround and all else is looking good, I don't think that's worth
another RC.

+1 (binding) to this release.

On Fri, Oct 4, 2019 at 10:56 AM Robert Bradshaw  wrote:
>
> The artifact signatures and contents all look good to me. I've also
> verify the wheels work for the direct runner. However, I'm having an
> issue with trying to run on dataflow with Python 3.6:
>
> python -m apache_beam.examples.wordcount   --input
> gs://clouddfe-robertwb/chicago_taxi_data/eval/data.csv   --output
> gs://clouddfe-robertwb/test/xcounts.txt   --runner=Dataflow
> --project=google.com:clouddfe
> --temp_location=gs://clouddfe-robertwb/fn-api/tmp
> --staging_location=gs://clouddfe-robertwb/tmp
> --sdk_location=staging/apache-beam-2.16.0.zip
> ...
>   File 
> "/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py",
> line 374, in exists
> self.client.objects.Get(request)  # metadata
>   File 
> "/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
> line 1100, in Get
> download=download)
>   File 
> "/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apitools/base/py/base_api.py",
> line 729, in _RunMethod
> http, http_request, **opts)
>   File 
> "/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apitools/base/py/http_wrapper.py",
> line 360, in MakeRequest
> max_retry_wait, total_wait_sec))
>   File 
> "/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio_overrides.py",
> line 43, in retry_func
> return http_wrapper.HandleExceptionsAndRebuildHttpConnections(retry_args)
>   File 
> "/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apitools/base/py/http_wrapper.py",
> line 294, in HandleExceptionsAndRebuildHttpConnections
> retry_args.exc.status >= 500)):
>
> Is this just me or a wider issue?
>
> On Fri, Oct 4, 2019 at 10:27 AM Pablo Estrada  wrote:
> >
> > Hi all,
> > I looked at https://issues.apache.org/jira/browse/BEAM-8303, and it seems 
> > like the user has a workaround - is that correct?
> > If that's the case, then I vote +1.
> >
> > @Max - lmk if you'd like to discuss further, but for now my vote is on +1.
> > Best
> > -P.
> >
> > On Fri, Oct 4, 2019 at 9:29 AM Mark Liu  wrote:
> >>
> >> +1 (forgot to vote)
> >>
> >> I also triggered Java Nexmark on direct, dataflow, spark and flink runner. 
> >> Didn't saw performance regression from the dashboard 
> >> (https://apache-beam-testing.appspot.com/dashboard-admin)
> >>
> >> On Fri, Oct 4, 2019 at 8:23 AM Mark Liu  wrote:
> >>>
> >>> Thanks for the validation work! I validated following:
> >>>
> >>> - Java Quickstart on direct, dataflow,spark local, flink local runner
> >>> - Java mobile gaming on direct and dataflow runner
> >>> - Python Quickstart in batch and streaming in py2/3.5/3.6/3.7 using 
> >>> wheals/zip
> >>> - Python Mobile Game in batch/streaming in py2/3.5/3.6/3.7 using 
> >>> wheals/zip on direct and dataflow runner
> >>>
> >>> Mark
> >>>
> >>> On Thu, Oct 3, 2019 at 6:57 PM Ahmet Altay  wrote:
> >>>>
> >>>> I see most of the release validations have been completed and marked in 
> >>>> the spreadsheet. Thank you all for doing that. If you have not 
> >>>> validated/voted yet please take a look at the release candidate.
> >>>>
> >>>> On Thu, Oct 3, 2019 at 7:59 AM Thomas Weise  wrote:
> >>>>>
> >>>>> I think there is a different reason why the release manager should 
> >>>>> probably merge/approve all PRs that go into the release branch while 
> >>>>> the release is in progress:
> >>>>>
> >>>>> If/when the need arises for another RC, then only those changes should 
> >>>>> be included that are deemed blockers or explicitly agreed. Otherwise 
> >>>>> the release can potentially be delayed by modifications that invalidate 
&g

Re: Plan for dropping python 2 support

2019-10-04 Thread Robert Bradshaw
e very explicit, one of those 
>>>> dependencies is Dataflow's python pre-portability workers.)
>>>>
>>>> On Thu, Sep 19, 2019 at 5:17 PM Maximilian Michels  wrote:
>>>>>
>>>>> Granted that we just have finalized the Python 3 support, we should
>>>>> allow time for it to mature and for users to make the switch.
>>>>>
>>>>> > Oh, and one more thing, I think it'd make sense for Apache Beam to
>>>>> > sign https://python3statement.org/. The promise is that we'd
>>>>> > discontinue Python 2 support *in* 2020, which is not committing us to
>>>>> > January if we're not ready. Worth a vote?
>>>>>
>>>>> +1
>>>>
>>>>
>>>> +1
>>>>
>>>>>
>>>>>
>>>>> On 19.09.19 15:59, Robert Bradshaw wrote:
>>>>> > Oh, and one more thing, I think it'd make sense for Apache Beam to
>>>>> > sign https://python3statement.org/. The promise is that we'd
>>>>> > discontinue Python 2 support *in* 2020, which is not committing us to
>>>>> > January if we're not ready. Worth a vote?
>>>>> >
>>>>> >
>>>>> > On Thu, Sep 19, 2019 at 3:58 PM Robert Bradshaw  
>>>>> > wrote:
>>>>> >>
>>>>> >> Exactly how long we support Python 2 depends on our users. Other than
>>>>> >> those that speak up (such as yourself, thanks!), it's hard to get a
>>>>> >> handle on how many need Python 2 and for how long. (Should we send out
>>>>> >> a survey? Maybe after some experience with 2.16?)
>>>>
>>>>
>>>> +1, we had some success with collecting information from users using 
>>>> Twitter surveys.
>>>>
>>>>>
>>>>> >>
>>>>> >> On the one hand, the whole ecosystem is finally moving on, and even if
>>>>> >> Beam continues to support Python 2 our dependencies, or other projects
>>>>> >> that are being used in conjunction with Beam, will also be going
>>>>> >> Python 3 only. On the other hand, Beam is, admittedly, quite late to
>>>>> >> the party and could be the one holding people back, and looking at how
>>>>> >> long it took us, if we just barely make it by the end of the year it's
>>>>> >> unreasonable to say at that point "oh, and we're dropping 2.7 at the
>>>>> >> same time."
>>>>> >>
>>>>> >> The good news is that 2.16 is shaping up to be a release I would
>>>>> >> recommend everyone migrate to Python 3 on. The remaining issues are
>>>>> >> things like some issues with main sessions (which already has issues
>>>>> >> in Python 2) and not supporting keyword-only arguments (a new feature,
>>>>> >> not a regression). I would guess that even 2.15 is already good enough
>>>>> >> for most people, at least to kick the tires and running tests to start
>>>>> >> the effort.
>>>>
>>>>
>>>> I share the same sentiment. Beam 2.16 will offer a strong python 3 
>>>> offering. Yes, there are known issues but this is not much different than 
>>>> the known issues for rest of the python offering.
>>>>
>>>>>
>>>>> >>
>>>>> >> (I also agree with the sentiment that once we go 3.x only, it'll be
>>>>> >> likely harder to maintain a 2.x LTS... but the whole LTS thing is
>>>>> >> being discussed in another thread.)
>>>>>
>>>>> >>
>>>>> >> On Thu, Sep 19, 2019 at 2:44 PM Chad Dombrova  
>>>>> >> wrote:
>>>>> >>>
>>>>> >>> Hi all,
>>>>> >>> I had a read through this thread in the archives. It occurred before 
>>>>> >>> I joined the mailing list, so I hope that this email connects up with 
>>>>> >>> the thread properly for everyone.
>>>>> >>>
>>>>> >>> I'd like to respond to the following points:
>>>>> >>>
>>>>> >>>> I believe we are referring to two separate things with support:
>>>>> >&g

Re: [VOTE] Release 2.16.0, release candidate #1

2019-10-04 Thread Robert Bradshaw
The artifact signatures and contents all look good to me. I've also
verify the wheels work for the direct runner. However, I'm having an
issue with trying to run on dataflow with Python 3.6:

python -m apache_beam.examples.wordcount   --input
gs://clouddfe-robertwb/chicago_taxi_data/eval/data.csv   --output
gs://clouddfe-robertwb/test/xcounts.txt   --runner=Dataflow
--project=google.com:clouddfe
--temp_location=gs://clouddfe-robertwb/fn-api/tmp
--staging_location=gs://clouddfe-robertwb/tmp
--sdk_location=staging/apache-beam-2.16.0.zip
...
  File 
"/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py",
line 374, in exists
self.client.objects.Get(request)  # metadata
  File 
"/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
line 1100, in Get
download=download)
  File 
"/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apitools/base/py/base_api.py",
line 729, in _RunMethod
http, http_request, **opts)
  File 
"/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apitools/base/py/http_wrapper.py",
line 360, in MakeRequest
max_retry_wait, total_wait_sec))
  File 
"/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio_overrides.py",
line 43, in retry_func
return http_wrapper.HandleExceptionsAndRebuildHttpConnections(retry_args)
  File 
"/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apitools/base/py/http_wrapper.py",
line 294, in HandleExceptionsAndRebuildHttpConnections
retry_args.exc.status >= 500)):

Is this just me or a wider issue?

On Fri, Oct 4, 2019 at 10:27 AM Pablo Estrada  wrote:
>
> Hi all,
> I looked at https://issues.apache.org/jira/browse/BEAM-8303, and it seems 
> like the user has a workaround - is that correct?
> If that's the case, then I vote +1.
>
> @Max - lmk if you'd like to discuss further, but for now my vote is on +1.
> Best
> -P.
>
> On Fri, Oct 4, 2019 at 9:29 AM Mark Liu  wrote:
>>
>> +1 (forgot to vote)
>>
>> I also triggered Java Nexmark on direct, dataflow, spark and flink runner. 
>> Didn't saw performance regression from the dashboard 
>> (https://apache-beam-testing.appspot.com/dashboard-admin)
>>
>> On Fri, Oct 4, 2019 at 8:23 AM Mark Liu  wrote:
>>>
>>> Thanks for the validation work! I validated following:
>>>
>>> - Java Quickstart on direct, dataflow,spark local, flink local runner
>>> - Java mobile gaming on direct and dataflow runner
>>> - Python Quickstart in batch and streaming in py2/3.5/3.6/3.7 using 
>>> wheals/zip
>>> - Python Mobile Game in batch/streaming in py2/3.5/3.6/3.7 using wheals/zip 
>>> on direct and dataflow runner
>>>
>>> Mark
>>>
>>> On Thu, Oct 3, 2019 at 6:57 PM Ahmet Altay  wrote:

 I see most of the release validations have been completed and marked in 
 the spreadsheet. Thank you all for doing that. If you have not 
 validated/voted yet please take a look at the release candidate.

 On Thu, Oct 3, 2019 at 7:59 AM Thomas Weise  wrote:
>
> I think there is a different reason why the release manager should 
> probably merge/approve all PRs that go into the release branch while the 
> release is in progress:
>
> If/when the need arises for another RC, then only those changes should be 
> included that are deemed blockers or explicitly agreed. Otherwise the 
> release can potentially be delayed by modifications that invalidate prior 
> verification or introduce new instability.


 I agree with this reasoning. It expresses my concern in a more clear way.

>
>
> Thomas
>
>
> On Thu, Oct 3, 2019 at 3:12 AM Maximilian Michels  wrote:
>>
>>  > For the next time, may I suggest asking release manager to do the
>>  > merging to the release branch. We do not know whether there will be an
>>  > RC2 or not. And if there will not be an RC2 release branch as of now
>>  > does not directly correspond to what will be released.
>>
>> The ground truth for releases are the release tags, not the release
>> branches. Downstream projects should not depend on the release branches.
>> Release branches are merely important for the process of creating a
>> release, but they lose validity after the RC has been created and 
>> released.
>>
>> On 02.10.19 11:45, Ahmet Altay wrote:
>> > +1 (validated python quickstarts). Thank you Mark.
>> >
>> > On Wed, Oct 2, 2019 at 10:49 AM Maximilian Michels > > > wrote:
>> >
>> > Thanks for preparing the release, Mark! I would like to address
>> > https://issues.apache.

Re: Dockerhub push denied for py3.6 and py3.7 image

2019-10-02 Thread Robert Bradshaw
Please add robertwb0 as well.

On Wed, Oct 2, 2019 at 9:09 AM Ahmet Altay  wrote:
>
>
>
> On Tue, Oct 1, 2019 at 8:44 PM Pablo Estrada  wrote:
>>
>> When she set up the repo, Hannah requested PMC members to ask for 
>> privileges, so I did.
>> The set of admins currently is just Hannah and myself - and I don't think 
>> this is available in a public page.
>>
>> We could either have a PMC-managed account, or allow more PMC members to 
>> have admin privileges - for redundancy.
>
>
> I remember we agreed to add any interested PMC members for redundancy.
>
> Could you add my username ('aaltaybeam') to the list please?
>
> Ahmet
>
>>
>> Best
>> -P.
>>
>> On Tue, Oct 1, 2019 at 6:44 PM Ahmet Altay  wrote:
>>>
>>> Who are the admins on dockerhub currently? Is there a page that shows a 
>>> list? The next person doing the release will probably run into similar 
>>> issues. For example, pypi page for beam [1] shows lists of maintainers.
>>>
>>> [1] https://pypi.org/project/apache-beam/
>>>
>>> Thank you,
>>> Ahmet
>>>
>>> On Tue, Oct 1, 2019 at 11:32 AM Mark Liu  wrote:

 I can push them now. Thank you Pablo!

 On Tue, Oct 1, 2019 at 11:05 AM Pablo Estrada  wrote:
>
> You were right that the push permissions for repository maintainers were 
> missing. I've just added the permissions, and you should be able to push 
> to them now.
> Thanks Mark!
>
> On Tue, Oct 1, 2019 at 11:02 AM Pablo Estrada  wrote:
>>
>> I'll check for you. One second.
>>
>> On Tue, Oct 1, 2019 at 10:32 AM Mark Liu  wrote:
>>>
>>> Hello Dockerhub Admins,
>>>
>>> I was able to push Java, Go, py2.7 and py3.5 images to 
>>> hub.docker.com/u/apachebeam for 2.16 release, but failed for py3.6 and 
>>> py3.7 due to "denied: requested access to the resource is denied". 
>>> Wondering if I missed some permissions. Can any Dockerhub admins help 
>>> me check it?
>>>
>>> Thanks,
>>> Mark, Release Manager


Re: Multiple iterations after GroupByKey with SparkRunner

2019-10-01 Thread Robert Bradshaw
For this specific usecase, I would suggest this be done via PTranform URNs.
E.g. one could have a GroupByKeyOneShot whose implementation is

input
.apply(GroupByKey.of()
.apply(kv -> KV.of(kv.key(), kv.iterator())

A runner would be free to recognize and optimize this in the graph (based
on its urn) and swap out a more efficient implementation. Of course a
Coder would have to be introduced, and the semantics of
PCollection are a bit odd due to the inherently mutable nature of
Iterators. (Possibly a ReducePerKey transform would be a better
abstraction.)


On Tue, Oct 1, 2019 at 2:16 AM Jan Lukavský  wrote:

> The car analogy was meant to say, that in real world you have to make
> decision before you take any action. There is no retroactivity possible.
>
> Reuven pointed out, that it is possible (although it seems a little weird
> to me, but that is the only thing I can tell against it :-)), that the way
> a grouped PCollection is produced might be out of control of a consuming
> operator. One example of this might be, that the grouping is produced in a
> submodule (some library), but still, the consumer wants to be able to
> specify if he wants or doesn't want reiterations. There still is a
> "classical" solution to this - the library might expose an interface to
> specify a factory for the grouped PCollection, so that the user of the
> library will be able to specify what he wants. But we can say, that we
> don't want to force users (or authors of libraries) to do that. That's okay
> for me.
>
> If we move on, our next option might be to specify the annotation on the
> consumer (as suggested), but that has all the "not really nice" properties
> of being counter-intuitive, ignoring strong types, etc., etc., for which
> reason I think that this should be ruled out as well.
>
> This leaves us with a single option (at least I have not figured out any
> other) - which is we can bundle GBK and associated ParDo into atomic
> PTransform, which can then be overridden by runners that need special
> handling of this situation - these are all runners that need buffer data to
> memory in order to support reiterations (spark and flink, note that this
> problem arises only for batch case, because in streaming case, one can
> reasonably assume that the data resides in a state that supports
> reiterations). But - we already have this PTransform in Euphoria, it is
> called ReduceByKey, and has all the required properties (technically, it is
> not a PTransform now, but that is a minor detail and can be changed
> trivially).
>
> So, the direction I was trying to take this discussion was - what could be
> the best way for a runner to natively support a PTransform from a DSL? I
> can imagine several options:
>
>  a) support it directly and let runners depend on the DSL (compileOnly
> dependency might suffice, because users will include the DSL into their
> code to be able to use it)
>
>  b) create an interface in runners for user-code to be able to provide
> translation for user-specified operators (this could be absolutely generic,
> DSLs might just use this feature the same way any user could), after all
> runners already use a concept of Translator, but that is pretty much
> copy-pasted, not abstracted into a general purpose one
>
>  c) move the operators that need to be translated into core
>
> The option (c) then leaves open questions related to - if we would want to
> move other operators to core, would this be the right time to ask questions
> if our current set of "core" operators is the ideal one? Or could this be
> optimized?
>
> Jan
> On 10/1/19 12:32 AM, Kenneth Knowles wrote:
>
> In the car analogy, you have something this:
>
> Iterable: car
> Iterator: taxi ride
>
> They are related, but not as variations of a common concept.
>
> In the discussion of Combine vs RSBK, if the reducer is required to be an
> associative and commutative operator, then it is the same thing under a
> different name. If the reducer can be non-associative or non-commutative,
> then it admits fewer transformations/optimizations.
>
> If you introduce a GroupIteratorsByKey and implement GroupByKey as a
> transform that combines the iterator by concatenation, I think you do get
> an internally consistent system. To execute efficiently, you need to always
> identify and replace the GroupByKey operation with a primitive one. It does
> make some sense to expose the weakest primitives for the sake of DSLs. But
> they are very poorly suited for end-users, and for GBK on most runners you
> get the more powerful one for free.
>
> Kenn
>
> On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský  wrote:
>
>> > The fact that the annotation on the ParDo "changes" the GroupByKey
>> implementation is very specific to the Spark runner implementation.
>>
>> I don't quite agree. It is not very specific to Spark, it is specific to
>> generally all runners, that produce grouped elements in a way that is not
>> reiterable. That is the key property. The exam

Re: [VOTE] Sign a pledge to discontinue support of Python 2 in 2020.

2019-10-01 Thread Robert Bradshaw
The correct link is https://python3statement.org/

On Tue, Oct 1, 2019 at 10:14 AM Mark Liu  wrote:
>
> +1
>
> btw, the link (http://python3stament.org) you provided is broken.
>
> On Tue, Oct 1, 2019 at 9:44 AM Udi Meiri  wrote:
>>
>> +1
>>
>> On Tue, Oct 1, 2019 at 3:22 AM Łukasz Gajowy  wrote:
>>>
>>> +1
>>>
>>> wt., 1 paź 2019 o 11:29 Maximilian Michels  napisał(a):

 +1

 On 30.09.19 23:03, Reza Rokni wrote:
 > +1
 >
 > On Tue, 1 Oct 2019 at 13:54, Tanay Tummalapalli >>> > > wrote:
 >
 > +1
 >
 > On Tue, Oct 1, 2019 at 8:19 AM Suneel Marthi >>> > > wrote:
 >
 > +1
 >
 > On Mon, Sep 30, 2019 at 10:33 PM Manu Zhang
 > mailto:owenzhang1...@gmail.com>> wrote:
 >
 > +1
 >
 > On Tue, Oct 1, 2019 at 9:44 AM Austin Bennett
 > >>> > > wrote:
 >
 > +1
 >
 > On Mon, Sep 30, 2019 at 5:22 PM Valentyn Tymofieiev
 > mailto:valen...@google.com>> wrote:
 >
 > Hi everyone,
 >
 > Please vote whether to sign a pledge on behalf of
 > Apache Beam to sunset Beam Python 2 offering (in new
 > releases) in 2020 on http://python3stament.org as
 > follows:
 >
 > [ ] +1: Sign a pledge to discontinue support of
 > Python 2 in Beam in 2020.
 > [ ] -1: Do not sign a pledge to discontinue support
 > of Python 2 in Beam in 2020.
 >
 > The motivation and details for this vote were
 > discussed in [1, 2]. Please follow up in [2] if you
 > have any questions.
 >
 > This is a procedural vote [3] that will follow the
 > majority approval rules and will be open for at
 > least 72 hours.
 >
 > Thanks,
 > Valentyn
 >
 > [1]
 > 
 > https://lists.apache.org/thread.html/eba6caa58ea79a7ecbc8560d1c680a366b44c531d96ce5c699d41535@%3Cdev.beam.apache.org%3E
 > [2]
 > 
 > https://lists.apache.org/thread.html/456631fe1a696c537ef8ebfee42cd3ea8121bf7c639c52da5f7032e7@%3Cdev.beam.apache.org%3E
 > [3] https://www.apache.org/foundation/voting.html
 >
 >
 >
 > --
 >
 > This email may be confidential and privileged. If you received this
 > communication by mistake, please don't forward it to anyone else, please
 > erase all copies and attachments, and please let me know that it has
 > gone to the wrong person.
 >
 > The above terms reflect a potential business arrangement, are provided
 > solely as a basis for further discussion, and are not intended to be and
 > do not constitute a legally binding obligation. No legally binding
 > obligations will be created, implied, or inferred until an agreement in
 > final form is executed in writing by all parties involved.
 >


Re: [VOTE] Sign a pledge to discontinue support of Python 2 in 2020.

2019-09-30 Thread Robert Bradshaw
+1

On Mon, Sep 30, 2019 at 5:35 PM David Cavazos  wrote:
>
> +1
>
> On Mon, Sep 30, 2019 at 5:27 PM Ahmet Altay  wrote:
>>
>> +1
>>
>> On Mon, Sep 30, 2019 at 5:22 PM Valentyn Tymofieiev  
>> wrote:
>>>
>>> Hi everyone,
>>>
>>> Please vote whether to sign a pledge on behalf of Apache Beam to sunset 
>>> Beam Python 2 offering (in new releases) in 2020 on 
>>> http://python3stament.org as follows:
>>>
>>> [ ] +1: Sign a pledge to discontinue support of Python 2 in Beam in 2020.
>>> [ ] -1: Do not sign a pledge to discontinue support of Python 2 in Beam in 
>>> 2020.
>>>
>>> The motivation and details for this vote were discussed in [1, 2]. Please 
>>> follow up in [2] if you have any questions.
>>>
>>> This is a procedural vote [3] that will follow the majority approval rules 
>>> and will be open for at least 72 hours.
>>>
>>> Thanks,
>>> Valentyn
>>>
>>> [1] 
>>> https://lists.apache.org/thread.html/eba6caa58ea79a7ecbc8560d1c680a366b44c531d96ce5c699d41535@%3Cdev.beam.apache.org%3E
>>> [2] 
>>> https://lists.apache.org/thread.html/456631fe1a696c537ef8ebfee42cd3ea8121bf7c639c52da5f7032e7@%3Cdev.beam.apache.org%3E
>>> [3] https://www.apache.org/foundation/voting.html
>>>


Re: Why is there no standard boolean coder?

2019-09-27 Thread Robert Bradshaw
Yes, go ahead and do this (though for your usecase I'm hoping we'll be
able to switch to schemas soon).

On Fri, Sep 27, 2019 at 5:35 PM Chad Dombrova  wrote:
>
> Would BooleanCoder continue to fall into this category?  I was under the 
> impression we might make it a full fledge standard coder with this PR.
>
>
>
> On Fri, Sep 27, 2019 at 5:32 PM Brian Hulette  wrote:
>>
>> +1, thank you!
>>
>> Note In my Row Coder PR I added a new section for "Additional Standard 
>> Coders" - i.e. coders that have a URN, but aren't required for a new 
>> runner/sdk to implement the beam model: 
>> https://github.com/apache/beam/pull/9188/files#diff-f0d64c2cfc4583bfe2a7e5ee59818ae2R646
>>
>> I think this would belong there as well, assuming that is a distinction we 
>> want to make.
>>
>> On Fri, Sep 27, 2019 at 5:22 PM Thomas Weise  wrote:
>>>
>>> +1 for adding the coder
>>>
>>> Please also add a test here: 
>>> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
>>>
>>>
>>> On Fri, Sep 27, 2019 at 5:17 PM Chad Dombrova  wrote:
>>>>
>>>> Are there any dissenting votes to making a BooleanCoder a standard 
>>>> (portable) coder?
>>>>
>>>> I'm happy to make a PR to implement a BooleanCoder in python (and to add 
>>>> the Java BooleanCoder to the ModelCoderRegistrar) if everyone agrees that 
>>>> this is useful.
>>>>
>>>> -chad
>>>>
>>>>
>>>> On Fri, Sep 27, 2019 at 3:32 PM Robert Bradshaw  
>>>> wrote:
>>>>>
>>>>> I think boolean is useful to have. What I'm more skeptical of is
>>>>> adding standard types for variations like UnsignedInteger16, etc. that
>>>>> don't have natural representations in all languages.
>>>>>
>>>>> On Fri, Sep 27, 2019 at 2:46 PM Brian Hulette  wrote:
>>>>> >
>>>>> > Some more context from an offline discussion I had with +Robert 
>>>>> > Bradshaw a while ago: We both agreed all of the coders listed in 
>>>>> > BEAM-7996 should be implemented in Python, but didn't come to a 
>>>>> > conclusion on whether or not they should actually be _standard_ coders, 
>>>>> > versus just being implicitly standard as part of row coder.
>>>>> >
>>>>> > On Fri, Sep 27, 2019 at 2:29 PM Kenneth Knowles  wrote:
>>>>> >>
>>>>> >> Yes, noted here: 
>>>>> >> https://github.com/apache/beam/pull/9188/files#diff-f0d64c2cfc4583bfe2a7e5ee59818ae2R678
>>>>> >>  and that links to https://issues.apache.org/jira/browse/BEAM-7996
>>>>> >>
>>>>> >> Kenn
>>>>> >>
>>>>> >> On Fri, Sep 27, 2019 at 12:57 PM Reuven Lax  wrote:
>>>>> >>>
>>>>> >>> Java has one, implemented as a byte coder. My guess is that nobody 
>>>>> >>> has gotten around to implementing it yet for portability.
>>>>> >>>
>>>>> >>> On Fri, Sep 27, 2019 at 12:44 PM Chad Dombrova  
>>>>> >>> wrote:
>>>>> >>>>
>>>>> >>>> Hi all,
>>>>> >>>> It seems a bit unfortunate that there isn’t a portable way to 
>>>>> >>>> serialize a boolean value.
>>>>> >>>>
>>>>> >>>> I’m working on porting my external PubsubIO PR over to use the 
>>>>> >>>> improved schema-based external transform API in python, but because 
>>>>> >>>> of this limitation I can’t use boolean values. For example, this 
>>>>> >>>> fails:
>>>>> >>>>
>>>>> >>>> ReadFromPubsubSchema = typing.NamedTuple(
>>>>> >>>> 'ReadFromPubsubSchema',
>>>>> >>>> [
>>>>> >>>> ('topic', typing.Optional[unicode]),
>>>>> >>>> ('subscription', typing.Optional[unicode]),
>>>>> >>>> ('id_label',  typing.Optional[unicode]),
>>>>> >>>> ('with_attributes', bool),
>>>>> >>>> ('timestamp_attribute',  typing.Optional[unicode]),
>>>>> >>>> ]
>>>>> >>>> )
>>>>> >>>>
>>>>> >>>> It fails because coders.get_coder(bool) returns the non-portable 
>>>>> >>>> pickle coder.
>>>>> >>>>
>>>>> >>>> In the short term I can hack something into the external transform 
>>>>> >>>> API to use varint coder for bools, but this kind of hacky approach 
>>>>> >>>> to portability won’t work in scenarios where round-tripping is 
>>>>> >>>> required without user intervention. In other words, in python it is 
>>>>> >>>> not uncommon to test if x is True, in which case the integer 1 would 
>>>>> >>>> fail this test. All of that is to say that a BooleanCoder would be a 
>>>>> >>>> convenient way to ensure the proper type is used everywhere.
>>>>> >>>>
>>>>> >>>> So, I was just wondering why it’s not there? Are there concerns over 
>>>>> >>>> whether booleans are universal enough to make part of the 
>>>>> >>>> portability standard?
>>>>> >>>>
>>>>> >>>> -chad


Re: Why is there no standard boolean coder?

2019-09-27 Thread Robert Bradshaw
I think boolean is useful to have. What I'm more skeptical of is
adding standard types for variations like UnsignedInteger16, etc. that
don't have natural representations in all languages.

On Fri, Sep 27, 2019 at 2:46 PM Brian Hulette  wrote:
>
> Some more context from an offline discussion I had with +Robert Bradshaw a 
> while ago: We both agreed all of the coders listed in BEAM-7996 should be 
> implemented in Python, but didn't come to a conclusion on whether or not they 
> should actually be _standard_ coders, versus just being implicitly standard 
> as part of row coder.
>
> On Fri, Sep 27, 2019 at 2:29 PM Kenneth Knowles  wrote:
>>
>> Yes, noted here: 
>> https://github.com/apache/beam/pull/9188/files#diff-f0d64c2cfc4583bfe2a7e5ee59818ae2R678
>>  and that links to https://issues.apache.org/jira/browse/BEAM-7996
>>
>> Kenn
>>
>> On Fri, Sep 27, 2019 at 12:57 PM Reuven Lax  wrote:
>>>
>>> Java has one, implemented as a byte coder. My guess is that nobody has 
>>> gotten around to implementing it yet for portability.
>>>
>>> On Fri, Sep 27, 2019 at 12:44 PM Chad Dombrova  wrote:
>>>>
>>>> Hi all,
>>>> It seems a bit unfortunate that there isn’t a portable way to serialize a 
>>>> boolean value.
>>>>
>>>> I’m working on porting my external PubsubIO PR over to use the improved 
>>>> schema-based external transform API in python, but because of this 
>>>> limitation I can’t use boolean values. For example, this fails:
>>>>
>>>> ReadFromPubsubSchema = typing.NamedTuple(
>>>> 'ReadFromPubsubSchema',
>>>> [
>>>> ('topic', typing.Optional[unicode]),
>>>> ('subscription', typing.Optional[unicode]),
>>>> ('id_label',  typing.Optional[unicode]),
>>>> ('with_attributes', bool),
>>>> ('timestamp_attribute',  typing.Optional[unicode]),
>>>> ]
>>>> )
>>>>
>>>> It fails because coders.get_coder(bool) returns the non-portable pickle 
>>>> coder.
>>>>
>>>> In the short term I can hack something into the external transform API to 
>>>> use varint coder for bools, but this kind of hacky approach to portability 
>>>> won’t work in scenarios where round-tripping is required without user 
>>>> intervention. In other words, in python it is not uncommon to test if x is 
>>>> True, in which case the integer 1 would fail this test. All of that is to 
>>>> say that a BooleanCoder would be a convenient way to ensure the proper 
>>>> type is used everywhere.
>>>>
>>>> So, I was just wondering why it’s not there? Are there concerns over 
>>>> whether booleans are universal enough to make part of the portability 
>>>> standard?
>>>>
>>>> -chad


Re: Collecting feedback for Beam usage

2019-09-26 Thread Robert Bradshaw
eople don't accidentally enable 
>> it in their quick-running direct runner unit tests, causing lots of traffic.
>> - I would not dismiss the possibility of spam and attacks.
>>
>> I'd recommend to start by listing the questions we're hoping to answer using 
>> the collected feedback, and then judging whether the proposed method indeed 
>> allows answering them while respecting the users' privacy.
>>
>> On Tue, Sep 24, 2019 at 1:49 PM Lukasz Cwik  wrote:
>>>
>>> One of the options could be to just display the URL and not to phone home. 
>>> I would like it so that users can integrate this into their deployment 
>>> solution so we get regular stats instead of only when a user decides to run 
>>> a pipeline manually.
>>>
>>> On Tue, Sep 24, 2019 at 11:13 AM Robert Bradshaw  
>>> wrote:
>>>>
>>>> I think the goal is to lower the barrier of entry. Displaying a URL to
>>>> click on while waiting for your pipeline to start up, that contains
>>>> all the data explicitly visible, is about as easy as it gets.
>>>> Remembering to run a new (probably not as authentic) pipeline with
>>>> that flag is less so.
>>>>
>>>> On Tue, Sep 24, 2019 at 11:04 AM Mikhail Gryzykhin  
>>>> wrote:
>>>> >
>>>> > I'm with Luke on this. We can add a set of flags to send home stats and 
>>>> > crash dumps if user agrees. If we keep code isolated, it will be easy 
>>>> > enough for user to check what is being sent.
>>>> >
>>>> > One more heavy-weight option is to also allow user configure and persist 
>>>> > what information he is ok with sharing.
>>>> >
>>>> > --Mikhail
>>>> >
>>>> >
>>>> > On Tue, Sep 24, 2019 at 10:02 AM Lukasz Cwik  wrote:
>>>> >>
>>>> >> Why not add a flag to the SDK that would do the phone home when 
>>>> >> specified?
>>>> >>
>>>> >> From a support perspective it would be useful to know:
>>>> >> * SDK version
>>>> >> * Runner
>>>> >> * SDK provided PTransforms that are used
>>>> >> * Features like user state/timers/side inputs/splittable dofns/...
>>>> >> * Graph complexity (# nodes, # branches, ...)
>>>> >> * Pipeline failed or succeeded
>>>> >>
>>>> >> On Mon, Sep 23, 2019 at 3:18 PM Robert Bradshaw  
>>>> >> wrote:
>>>> >>>
>>>> >>> On Mon, Sep 23, 2019 at 3:08 PM Brian Hulette  
>>>> >>> wrote:
>>>> >>> >
>>>> >>> > Would people actually click on that link though? I think Kyle has a 
>>>> >>> > point that in practice users would only find and click on that link 
>>>> >>> > when they're having some kind of issue, especially if the link has 
>>>> >>> > "feedback" in it.
>>>> >>>
>>>> >>> I think the idea is that we would make the link very light-weight,
>>>> >>> kind of like a survey (but even easier as it's pre-populated).
>>>> >>> Basically an opt-in phone-home. If we don't collect any personal data
>>>> >>> (not even IP/geo, just (say) version + runner, all visible in the
>>>> >>> URL), no need to guard/anonymize (and this may be sufficient--I don't
>>>> >>> think we have to worry about spammers and ballot stuffers given the
>>>> >>> target audience). If we can catch people while they wait for their
>>>> >>> pipeline to start up (and/or complete), this is a great time to get
>>>> >>> some feedback.
>>>> >>>
>>>> >>> > I agree usage data would be really valuable, but I'm not sure that 
>>>> >>> > this approach would get us good data. Is there a way to get download 
>>>> >>> > statistics for the different runner artifacts? Maybe that could be a 
>>>> >>> > better metric to compare usage.
>>>> >>>
>>>> >>> This'd be useful too, but hard to get and very noisy.
>>>> >>>
>>>> >>> >
>>>> >>> > On Mon, Sep 23, 2019 at 2:57 PM Ankur Goenka  
>>>> >>> > wrote:
>>>&

Re: Jenkins queue times steadily increasing for a few months now

2019-09-24 Thread Robert Bradshaw
Yeah, that's useful. I was asking about getting things at the jenkins
job level. E.g. are our PostCommits taking up all the time, or our
Precommits?

On Tue, Sep 24, 2019 at 1:23 PM Lukasz Cwik  wrote:
>
> We can get the per gradle task profile with the --profile flag: 
> https://jakewharton.com/static/files/trace/profile.html
> This information also appears within the build scans that are sent to Gradle.
>
> Integrating with either of these sources of information would allow us to 
> figure out whether its new tasks or old tasks taking longer.
>
> On Tue, Sep 24, 2019 at 12:23 PM Robert Bradshaw  wrote:
>>
>> Does anyone know how to gather stats on where the time is being spent?
>> Several times the idea of consolidating many of the (expensive)
>> validates runner integration tests into a single pipeline, and then
>> running things individually only if that fails, has come up. I think
>> that'd be a big win if indeed this is where our time is being spent.
>>
>> On Tue, Sep 24, 2019 at 12:13 PM Daniel Oliveira  
>> wrote:
>> >
>> > Those ideas all sound good. I especially agree with trying to reduce tests 
>> > first and then if we've done all we can there and latency is still too 
>> > high, it means we need more workers. Also in addition to reducing the 
>> > amount of tests, there's also running less important tests less 
>> > frequently, particularly when it comes to postcommits since many of those 
>> > are resource intensive. That would require people with good context around 
>> > what our many postcommits are used for.
>> >
>> > Another idea I thought of is trying to avoid running automated tests 
>> > outside of peak coding times. Ideally, during the times when we get the 
>> > greatest amounts of PRs (and therefore precommits) we shouldn't have any 
>> > postcommits running. If we have both pre and postcommits going at the same 
>> > time during peak hours, our queue times will shoot up even if the total 
>> > amount of work doesn't change much.
>> >
>> > Btw, you mentioned that this was a problem last year. Do you have any 
>> > links to discussions about that? It seems like it could be useful.
>> >
>> > On Thu, Sep 19, 2019 at 1:10 PM Mikhail Gryzykhin  
>> > wrote:
>> >>
>> >> Hi Daniel,
>> >>
>> >> Generally this looks feasible since jobs wait for new worker to be 
>> >> available to start.
>> >>
>> >> Over time we added more tests and did not deprecate enough, this 
>> >> increases load on workers. I wonder if we can add something like total 
>> >> runtime of all running jobs? This will be a safeguard metric that will 
>> >> show amount of time we actually run jobs. If it increases with same 
>> >> amount of workers, that will prove that we are overloading them (inverse 
>> >> is not necessarily correct).
>> >>
>> >> On addressing this, we can review approaches we took last year and see if 
>> >> any of them apply. If I do some brainstorming, following ideas come to 
>> >> mind: add more work force, reduce amount of tests, do better work on 
>> >> filtering out irrelevant tests, cancel irrelevant jobs (ie: cancel tests 
>> >> if linter fails) and/or add option for cancelling irrelevant jobs. One 
>> >> more big point can be effort on deflaking, but we seem to be decent in 
>> >> this area.
>> >>
>> >> Regards,
>> >> Mikhail.
>> >>
>> >>
>> >> On Thu, Sep 19, 2019 at 12:22 PM Daniel Oliveira  
>> >> wrote:
>> >>>
>> >>> Hi everyone,
>> >>>
>> >>> A little while ago I was taking a look at the Precommit Latency metrics 
>> >>> on Grafana (link) and saw that the monthly 90th percentile metric has 
>> >>> been really increasing the past few months, from around 10 minutes to 
>> >>> currently around 30 minutes.
>> >>>
>> >>> After doing some light digging I was shown this page (beam load 
>> >>> statistics) which seems to imply that queue times are shooting up when 
>> >>> all the test executors are occupied, and it seems this is happening 
>> >>> longer and more often recently. I also took a look at the commit history 
>> >>> for our Jenkins tests and I see that new tests have steadily been added.
>> >>>
>> >>> I wanted to bring this up with the dev@ to ask:
>> >>>
>> >>> 1. Is this accurate? Can anyone provide insight into the metrics? Does 
>> >>> anyone know how to double check my assumptions with more concrete 
>> >>> metrics?
>> >>>
>> >>> 2. Does anyone have ideas on how to address this?
>> >>>
>> >>> Thanks,
>> >>> Daniel Oliveira


Re: Jenkins queue times steadily increasing for a few months now

2019-09-24 Thread Robert Bradshaw
Does anyone know how to gather stats on where the time is being spent?
Several times the idea of consolidating many of the (expensive)
validates runner integration tests into a single pipeline, and then
running things individually only if that fails, has come up. I think
that'd be a big win if indeed this is where our time is being spent.

On Tue, Sep 24, 2019 at 12:13 PM Daniel Oliveira  wrote:
>
> Those ideas all sound good. I especially agree with trying to reduce tests 
> first and then if we've done all we can there and latency is still too high, 
> it means we need more workers. Also in addition to reducing the amount of 
> tests, there's also running less important tests less frequently, 
> particularly when it comes to postcommits since many of those are resource 
> intensive. That would require people with good context around what our many 
> postcommits are used for.
>
> Another idea I thought of is trying to avoid running automated tests outside 
> of peak coding times. Ideally, during the times when we get the greatest 
> amounts of PRs (and therefore precommits) we shouldn't have any postcommits 
> running. If we have both pre and postcommits going at the same time during 
> peak hours, our queue times will shoot up even if the total amount of work 
> doesn't change much.
>
> Btw, you mentioned that this was a problem last year. Do you have any links 
> to discussions about that? It seems like it could be useful.
>
> On Thu, Sep 19, 2019 at 1:10 PM Mikhail Gryzykhin  wrote:
>>
>> Hi Daniel,
>>
>> Generally this looks feasible since jobs wait for new worker to be available 
>> to start.
>>
>> Over time we added more tests and did not deprecate enough, this increases 
>> load on workers. I wonder if we can add something like total runtime of all 
>> running jobs? This will be a safeguard metric that will show amount of time 
>> we actually run jobs. If it increases with same amount of workers, that will 
>> prove that we are overloading them (inverse is not necessarily correct).
>>
>> On addressing this, we can review approaches we took last year and see if 
>> any of them apply. If I do some brainstorming, following ideas come to mind: 
>> add more work force, reduce amount of tests, do better work on filtering out 
>> irrelevant tests, cancel irrelevant jobs (ie: cancel tests if linter fails) 
>> and/or add option for cancelling irrelevant jobs. One more big point can be 
>> effort on deflaking, but we seem to be decent in this area.
>>
>> Regards,
>> Mikhail.
>>
>>
>> On Thu, Sep 19, 2019 at 12:22 PM Daniel Oliveira  
>> wrote:
>>>
>>> Hi everyone,
>>>
>>> A little while ago I was taking a look at the Precommit Latency metrics on 
>>> Grafana (link) and saw that the monthly 90th percentile metric has been 
>>> really increasing the past few months, from around 10 minutes to currently 
>>> around 30 minutes.
>>>
>>> After doing some light digging I was shown this page (beam load statistics) 
>>> which seems to imply that queue times are shooting up when all the test 
>>> executors are occupied, and it seems this is happening longer and more 
>>> often recently. I also took a look at the commit history for our Jenkins 
>>> tests and I see that new tests have steadily been added.
>>>
>>> I wanted to bring this up with the dev@ to ask:
>>>
>>> 1. Is this accurate? Can anyone provide insight into the metrics? Does 
>>> anyone know how to double check my assumptions with more concrete metrics?
>>>
>>> 2. Does anyone have ideas on how to address this?
>>>
>>> Thanks,
>>> Daniel Oliveira


Re: Collecting feedback for Beam usage

2019-09-24 Thread Robert Bradshaw
I think the goal is to lower the barrier of entry. Displaying a URL to
click on while waiting for your pipeline to start up, that contains
all the data explicitly visible, is about as easy as it gets.
Remembering to run a new (probably not as authentic) pipeline with
that flag is less so.

On Tue, Sep 24, 2019 at 11:04 AM Mikhail Gryzykhin  wrote:
>
> I'm with Luke on this. We can add a set of flags to send home stats and crash 
> dumps if user agrees. If we keep code isolated, it will be easy enough for 
> user to check what is being sent.
>
> One more heavy-weight option is to also allow user configure and persist what 
> information he is ok with sharing.
>
> --Mikhail
>
>
> On Tue, Sep 24, 2019 at 10:02 AM Lukasz Cwik  wrote:
>>
>> Why not add a flag to the SDK that would do the phone home when specified?
>>
>> From a support perspective it would be useful to know:
>> * SDK version
>> * Runner
>> * SDK provided PTransforms that are used
>> * Features like user state/timers/side inputs/splittable dofns/...
>> * Graph complexity (# nodes, # branches, ...)
>> * Pipeline failed or succeeded
>>
>> On Mon, Sep 23, 2019 at 3:18 PM Robert Bradshaw  wrote:
>>>
>>> On Mon, Sep 23, 2019 at 3:08 PM Brian Hulette  wrote:
>>> >
>>> > Would people actually click on that link though? I think Kyle has a point 
>>> > that in practice users would only find and click on that link when 
>>> > they're having some kind of issue, especially if the link has "feedback" 
>>> > in it.
>>>
>>> I think the idea is that we would make the link very light-weight,
>>> kind of like a survey (but even easier as it's pre-populated).
>>> Basically an opt-in phone-home. If we don't collect any personal data
>>> (not even IP/geo, just (say) version + runner, all visible in the
>>> URL), no need to guard/anonymize (and this may be sufficient--I don't
>>> think we have to worry about spammers and ballot stuffers given the
>>> target audience). If we can catch people while they wait for their
>>> pipeline to start up (and/or complete), this is a great time to get
>>> some feedback.
>>>
>>> > I agree usage data would be really valuable, but I'm not sure that this 
>>> > approach would get us good data. Is there a way to get download 
>>> > statistics for the different runner artifacts? Maybe that could be a 
>>> > better metric to compare usage.
>>>
>>> This'd be useful too, but hard to get and very noisy.
>>>
>>> >
>>> > On Mon, Sep 23, 2019 at 2:57 PM Ankur Goenka  wrote:
>>> >>
>>> >> I agree, these are the questions that need to be answered.
>>> >> The data can be anonymize and stored as public data in BigQuery or some 
>>> >> other place.
>>> >>
>>> >> The intent is to get the usage statistics so that we can get to know 
>>> >> what people are using Flink or Spark etc and not intended for discussion 
>>> >> or a help channel.
>>> >> I also think that we don't need to monitor this actively as it's more 
>>> >> like a survey rather than active channel to get issues resolved.
>>> >>
>>> >> If we think its useful for the community then we come up with the 
>>> >> solution as to how can we do this (similar to how we released the 
>>> >> container images).
>>> >>
>>> >>
>>> >>
>>> >> On Fri, Sep 20, 2019 at 4:38 PM Kyle Weaver  wrote:
>>> >>>
>>> >>> There are some logistics that would need worked out. For example, Where 
>>> >>> would the data go? Who would own it?
>>> >>>
>>> >>> Also, I'm not convinced we need yet another place to discuss Beam when 
>>> >>> we already have discussed the challenge of simultaneously monitoring 
>>> >>> mailing lists, Stack Overflow, Slack, etc. While "how do you use Beam" 
>>> >>> is certainly an interesting question, and I'd be curious to know that 
>>> >>> >= X many people use a certain runner, I'm not sure answers to these 
>>> >>> questions are as useful for guiding the future of Beam as discussions 
>>> >>> on the dev/users lists, etc. as the latter likely result in more 
>>> >>> depth/specific feedback.
>>> >>>
>>> >>> How

Re: Collecting feedback for Beam usage

2019-09-23 Thread Robert Bradshaw
On Mon, Sep 23, 2019 at 3:08 PM Brian Hulette  wrote:
>
> Would people actually click on that link though? I think Kyle has a point 
> that in practice users would only find and click on that link when they're 
> having some kind of issue, especially if the link has "feedback" in it.

I think the idea is that we would make the link very light-weight,
kind of like a survey (but even easier as it's pre-populated).
Basically an opt-in phone-home. If we don't collect any personal data
(not even IP/geo, just (say) version + runner, all visible in the
URL), no need to guard/anonymize (and this may be sufficient--I don't
think we have to worry about spammers and ballot stuffers given the
target audience). If we can catch people while they wait for their
pipeline to start up (and/or complete), this is a great time to get
some feedback.

> I agree usage data would be really valuable, but I'm not sure that this 
> approach would get us good data. Is there a way to get download statistics 
> for the different runner artifacts? Maybe that could be a better metric to 
> compare usage.

This'd be useful too, but hard to get and very noisy.

>
> On Mon, Sep 23, 2019 at 2:57 PM Ankur Goenka  wrote:
>>
>> I agree, these are the questions that need to be answered.
>> The data can be anonymize and stored as public data in BigQuery or some 
>> other place.
>>
>> The intent is to get the usage statistics so that we can get to know what 
>> people are using Flink or Spark etc and not intended for discussion or a 
>> help channel.
>> I also think that we don't need to monitor this actively as it's more like a 
>> survey rather than active channel to get issues resolved.
>>
>> If we think its useful for the community then we come up with the solution 
>> as to how can we do this (similar to how we released the container images).
>>
>>
>>
>> On Fri, Sep 20, 2019 at 4:38 PM Kyle Weaver  wrote:
>>>
>>> There are some logistics that would need worked out. For example, Where 
>>> would the data go? Who would own it?
>>>
>>> Also, I'm not convinced we need yet another place to discuss Beam when we 
>>> already have discussed the challenge of simultaneously monitoring mailing 
>>> lists, Stack Overflow, Slack, etc. While "how do you use Beam" is certainly 
>>> an interesting question, and I'd be curious to know that >= X many people 
>>> use a certain runner, I'm not sure answers to these questions are as useful 
>>> for guiding the future of Beam as discussions on the dev/users lists, etc. 
>>> as the latter likely result in more depth/specific feedback.
>>>
>>> However, I do think it could be useful in general to include links directly 
>>> in the console output. For example, maybe something along the lines of "Oh 
>>> no, your Flink pipeline crashed! Check Jira/file a bug/ask the mailing 
>>> list."
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>>
>>>
>>> On Fri, Sep 20, 2019 at 4:14 PM Ankur Goenka  wrote:

 Hi,

 At the moment we don't really have a good way to collect any usage 
 statistics for Apache Beam. Like runner used etc. As many of the users 
 don't really have a way to report their usecase.
 How about if we create a feedback page where users can add their pipeline 
 details and usecase.
 Also, we can start printing the link to this page when user launch the 
 pipeline in the command line.
 Example:
 $ python my_pipeline.py --runner DirectRunner --input /tmp/abc

 Starting pipeline
 Please use http://feedback.beam.org?args=runner=DirectRunner,input=/tmp/abc
 Pipeline started
 ..

 Using a link and not publishing the data automatically will give user 
 control over what they publish and what they don't. We can enhance the 
 text and usage further but the basic idea is to ask for user feeback at 
 each run of the pipeline.
 Let me know what you think.


 Thanks,
 Ankur


Re: Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-19 Thread Robert Bradshaw
On Thu, Sep 19, 2019 at 4:33 PM Maximilian Michels  wrote:
>
> > This is obviously less than ideal for the user... Should we "fix" the
> > Java SDK? Of is the long-terms solution here to have runners do this
> > rewrite?
>
> I think ideal would be that the Runner adds the Impulse override. That
> way also the Python SDK would not have to have separate code paths for
> Reads.

Or, rather, that the Runner adds the non-Impuls override (in Java and Python).

> On 19.09.19 11:46, Robert Bradshaw wrote:
> > On Thu, Sep 19, 2019 at 11:22 AM Maximilian Michels  wrote:
> >>
> >> The flag is insofar relevant to the PortableRunner because it affects
> >> the translation of the pipeline. Without the flag we will generate
> >> primitive Reads which are unsupported in portability. The workaround we
> >> have used so far is to check for the Runner (e.g. PortableRunner) during
> >> pipeline translation and then add it automatically.
> >>
> >> A search in the Java code base reveals 18 occurrences of the flag, all
> >> inside the Dataflow Runner. This is good because the Java SDK itself
> >> does not make use of it. In portable Java pipelines the pipeline author
> >> has to take care to override primitive reads with the JavaReadViaImpulse
> >> wrapper.
> >
> > This is obviously less than ideal for the user... Should we "fix" the
> > Java SDK? Of is the long-terms solution here to have runners do this
> > rewrite?
> >
> >> On the Python side the IO code uses the flag directly to either generate
> >> a primitive Read or a portable Impulse + ParDoReadAdapter.
> >>
> >> Would it be conceivable to remove the beam_fn_api flag and introduce a
> >> legacy flag which the Dataflow Runner could then use? With more runners
> >> implementing portability, I believe this would make sense.
> >>
> >> Thanks,
> >> Max
> >>
> >> On 18.09.19 18:29, Ahmet Altay wrote:
> >>> I believe the flag was never relevant for PortableRunner. I might be
> >>> wrong as well. The flag affects a few bits in the core code and that is
> >>> why the solution cannot be by just setting the flag in Dataflow runner.
> >>> It requires some amount of clean up. I agree that it would be good to
> >>> clean this up, and I also agree to not rush this especially if this is
> >>> not currently impacting users.
> >>>
> >>> Ahmet
> >>>
> >>> On Wed, Sep 18, 2019 at 12:56 PM Maximilian Michels  >>> <mailto:m...@apache.org>> wrote:
> >>>
> >>>   > I disagree that this flag is obsolete. It is still serving a
> >>>  purpose for batch users using dataflow runner and that is decent
> >>>  chunk of beam python users.
> >>>
> >>>  It is obsolete for the PortableRunner. If the Dataflow Runner needs
> >>>  this
> >>>  flag, couldn't we simply add it there? As far as I know Dataflow 
> >>> users
> >>>  do not use the PortableRunner. I might be wrong.
> >>>
> >>>  As Kyle mentioned, he already fixed the issue. The fix is only 
> >>> present
> >>>  in the 2.16.0 release though. This flag has repeatedly caused 
> >>> friction
> >>>  for users and that's why I want to get rid of it.
> >>>
> >>>  There is of course no need to rush this but it would be great to 
> >>> tackle
> >>>  this for the next release. Filed a JIRA:
> >>>  https://jira.apache.org/jira/browse/BEAM-8274
> >>>
> >>>  Cheers,
> >>>  Max
> >>>
> >>>  On 17.09.19 15:39, Kyle Weaver wrote:
> >>>   > Actually, the reported issues are already fixed on head. We're 
> >>> just
> >>>   > trying to prevent similar issues in the future.
> >>>   >
> >>>   > Kyle Weaver | Software Engineer | github.com/ibzib
> >>>  <http://github.com/ibzib>
> >>>   > <http://github.com/ibzib> | kcwea...@google.com
> >>>  <mailto:kcwea...@google.com> <mailto:kcwea...@google.com
> >>>  <mailto:kcwea...@google.com>>
> >>>   >
> >>>   >
> >>>   > On Tue, Sep 17, 2019 at 3:38 PM Ahmet Altay  >>>  <mailto:al...@google.com>
> >>>   > <mailto:al...@google.com <mai

Re: Plan for dropping python 2 support

2019-09-19 Thread Robert Bradshaw
Oh, and one more thing, I think it'd make sense for Apache Beam to
sign https://python3statement.org/. The promise is that we'd
discontinue Python 2 support *in* 2020, which is not committing us to
January if we're not ready. Worth a vote?


On Thu, Sep 19, 2019 at 3:58 PM Robert Bradshaw  wrote:
>
> Exactly how long we support Python 2 depends on our users. Other than
> those that speak up (such as yourself, thanks!), it's hard to get a
> handle on how many need Python 2 and for how long. (Should we send out
> a survey? Maybe after some experience with 2.16?)
>
> On the one hand, the whole ecosystem is finally moving on, and even if
> Beam continues to support Python 2 our dependencies, or other projects
> that are being used in conjunction with Beam, will also be going
> Python 3 only. On the other hand, Beam is, admittedly, quite late to
> the party and could be the one holding people back, and looking at how
> long it took us, if we just barely make it by the end of the year it's
> unreasonable to say at that point "oh, and we're dropping 2.7 at the
> same time."
>
> The good news is that 2.16 is shaping up to be a release I would
> recommend everyone migrate to Python 3 on. The remaining issues are
> things like some issues with main sessions (which already has issues
> in Python 2) and not supporting keyword-only arguments (a new feature,
> not a regression). I would guess that even 2.15 is already good enough
> for most people, at least to kick the tires and running tests to start
> the effort.
>
> (I also agree with the sentiment that once we go 3.x only, it'll be
> likely harder to maintain a 2.x LTS... but the whole LTS thing is
> being discussed in another thread.)
>
> On Thu, Sep 19, 2019 at 2:44 PM Chad Dombrova  wrote:
> >
> > Hi all,
> > I had a read through this thread in the archives. It occurred before I 
> > joined the mailing list, so I hope that this email connects up with the 
> > thread properly for everyone.
> >
> > I'd like to respond to the following points:
> >
> >> I believe we are referring to two separate things with support:
> >> - Supporting existing releases for patches - I agree that we need to give
> >> users a long enough window to upgrade. Great if it happens with an LTS
> >> release. Even if it does not, I think it will be fair to offer patches on
> >> the last python 2 supporting release during some part of 2020 if that
> >> becomes necessary.
> >> - Making new releases with python 2 support - Each new Beam release with
> >> python 2 support will implicitly extend the lifetime of beam's python 2
> >> support. I do not think we need to extend this to beyond 2019. 2 releases
> >> (~ 3 months) after solid python 3 support will very likely put the last
> >> python 2 supporting release to last quarter of 2019 already.
> >
> >
> > With so many important features still under active development 
> > (portability, expansion, external IO transforms, schema coders) and new 
> > versions of executors tied to the Beam source, staying behind is not really 
> > an option for many of us, and with python3 support not yet fully completed, 
> > the window in which Beam is fully working for both python versions is 
> > rapidly approaching 2 months, and could ultimately be even less, depending 
> > on how long it takes to complete the dozen remaining issues in Jira, and 
> > whatever pops up thereafter.
> >
> >> The cost of maintaining Python 2.7 support is higher than 0. Some issues
> >> that come to mind:
> >> - Maintaining Py2.7 / Py 3+ compatibility of Beam codebase makes it
> >> difficult to use Python 3 syntax in Beam which may be necessary to support
> >> and test syntactic constructs introduced in Python 3.
> >> - Running additional test suites increases the load on test infrastructure
> >> and increases flakiness.
> >
> >
> > I would argue that the cost of maintaining a python2-only LTS version will 
> > be far greater than maintaining python2 support for a little while longer.  
> > Dropping support for python2 could mean a number of things from simply 
> > disabling the python2 tests, to removing 2-to-3 idioms in favor of 
> > python3-only constructs.  If what you have in mind is anything like the 
> > latter then the master branch will become quite divergent from the LTS 
> > release, and backporting changes will be not be as simple as cherry-picking 
> > commits.  All-in-all, I think it's a lose/lose for everyone -- users and 
> > developers, of which I am both -- to drop python2 support on such 

Re: Plan for dropping python 2 support

2019-09-19 Thread Robert Bradshaw
Exactly how long we support Python 2 depends on our users. Other than
those that speak up (such as yourself, thanks!), it's hard to get a
handle on how many need Python 2 and for how long. (Should we send out
a survey? Maybe after some experience with 2.16?)

On the one hand, the whole ecosystem is finally moving on, and even if
Beam continues to support Python 2 our dependencies, or other projects
that are being used in conjunction with Beam, will also be going
Python 3 only. On the other hand, Beam is, admittedly, quite late to
the party and could be the one holding people back, and looking at how
long it took us, if we just barely make it by the end of the year it's
unreasonable to say at that point "oh, and we're dropping 2.7 at the
same time."

The good news is that 2.16 is shaping up to be a release I would
recommend everyone migrate to Python 3 on. The remaining issues are
things like some issues with main sessions (which already has issues
in Python 2) and not supporting keyword-only arguments (a new feature,
not a regression). I would guess that even 2.15 is already good enough
for most people, at least to kick the tires and running tests to start
the effort.

(I also agree with the sentiment that once we go 3.x only, it'll be
likely harder to maintain a 2.x LTS... but the whole LTS thing is
being discussed in another thread.)

On Thu, Sep 19, 2019 at 2:44 PM Chad Dombrova  wrote:
>
> Hi all,
> I had a read through this thread in the archives. It occurred before I joined 
> the mailing list, so I hope that this email connects up with the thread 
> properly for everyone.
>
> I'd like to respond to the following points:
>
>> I believe we are referring to two separate things with support:
>> - Supporting existing releases for patches - I agree that we need to give
>> users a long enough window to upgrade. Great if it happens with an LTS
>> release. Even if it does not, I think it will be fair to offer patches on
>> the last python 2 supporting release during some part of 2020 if that
>> becomes necessary.
>> - Making new releases with python 2 support - Each new Beam release with
>> python 2 support will implicitly extend the lifetime of beam's python 2
>> support. I do not think we need to extend this to beyond 2019. 2 releases
>> (~ 3 months) after solid python 3 support will very likely put the last
>> python 2 supporting release to last quarter of 2019 already.
>
>
> With so many important features still under active development (portability, 
> expansion, external IO transforms, schema coders) and new versions of 
> executors tied to the Beam source, staying behind is not really an option for 
> many of us, and with python3 support not yet fully completed, the window in 
> which Beam is fully working for both python versions is rapidly approaching 2 
> months, and could ultimately be even less, depending on how long it takes to 
> complete the dozen remaining issues in Jira, and whatever pops up thereafter.
>
>> The cost of maintaining Python 2.7 support is higher than 0. Some issues
>> that come to mind:
>> - Maintaining Py2.7 / Py 3+ compatibility of Beam codebase makes it
>> difficult to use Python 3 syntax in Beam which may be necessary to support
>> and test syntactic constructs introduced in Python 3.
>> - Running additional test suites increases the load on test infrastructure
>> and increases flakiness.
>
>
> I would argue that the cost of maintaining a python2-only LTS version will be 
> far greater than maintaining python2 support for a little while longer.  
> Dropping support for python2 could mean a number of things from simply 
> disabling the python2 tests, to removing 2-to-3 idioms in favor of 
> python3-only constructs.  If what you have in mind is anything like the 
> latter then the master branch will become quite divergent from the LTS 
> release, and backporting changes will be not be as simple as cherry-picking 
> commits.  All-in-all, I think it's a lose/lose for everyone -- users and 
> developers, of which I am both -- to drop python2 support on such a short 
> timeline.
>
> I'm an active contributor to this project and it will put me and the company 
> that I work for in a very bad position if you force us onto an LTS release in 
> early 2020.  I understand the appeal of moving to python3-only code and I 
> want to get there too, but I would hope that you give your users are much 
> time to transition their own code as the Beam project itself has taken.  I'm 
> not asking for a full 12 months to transition, but more than a couple will be 
> required.
>
> thanks,
> -chad
>
>
>
>


Re: Next LTS?

2019-09-19 Thread Robert Bradshaw
In many ways the 2.7 LTS was trying to flesh out the process. I think
we learned some valuable lessons. It would have been good to push out
something (even if it didn't have everything we wanted) but that is
unlikely to be worth pursuing now (and 2.7 should probably be retired
as LTS and no longer recommended).

I agree that it does not seem there is strong demand for an LTS at
this point. I would propose that we keep 2.16, etc. as potential
candidates, but only declare one as LTS pending demand. The question
of how to keep our tooling stable (or backwards/forwards compatible)
is a good one, especially as we move to drop Python 2.7 in 2020 (which
could itself be a driver for an LTS).

On Thu, Sep 19, 2019 at 12:27 PM Kenneth Knowles  wrote:
>
> Yes, I pretty much dropped 2.7.1 release process due to lack of interest.
>
> There are known problems so that I cannot recommend anyone to use 2.7.0, yet 
> 2.7 it is the current LTS family. So my work on 2.7.1 was philosophical. I 
> did not like the fact that we had a designated LTS family with no usable 
> releases.
>
> But many backports were proposed to block 2.7.1 and took a very long time to 
> get contirbutors to implement the backports. I ended up doing many of them 
> just to move it along. This indicates a lack of interest to me. The problem 
> is that we cannot really use a strict cut off date as a way to ensure people 
> do the important things and skip the unimportant things, because we do know 
> that the issues are critical.
>
> And, yes, the fact that Jenkins jobs are separately evolving but pretty 
> tightly coupled to the repo contents is a serious problem that I wish we had 
> fixed. So verification of each PR was manual.
>
> Altogether, I still think LTS is valuable to have as a promise to users that 
> we will backport critical fixes. I would like to keep that promise and 
> continue to try. Things that are rapidly changing (which something always 
> will be) just won't have fixes backported, and that seems OK.
>
> Kenn
>
> On Thu, Sep 19, 2019 at 10:59 AM Maximilian Michels  wrote:
>>
>> An LTS only makes sense if we end up patching the LTS, which so far we
>> have never done. There has been work done in backporting fixes, see
>> https://github.com/apache/beam/commits/release-2.7.1 but the effort was
>> never completed. The main reason I believe were complications with
>> running the evolved release scripts against old Beam versions.
>>
>> Now that the portability layer keeps maturing, it makes me optimistic
>> that we might have a maintained LTS in the future.
>>
>> -Max
>>
>> On 19.09.19 08:40, Ismaël Mejía wrote:
>> > The fact that end users never asked AFAIK in the ML for an LTS and for
>> > a subsequent minor release of the existing LTS shows IMO the low
>> > interest on having a LTS.
>> >
>> > We still are heavily iterating in many areas (portability/schema) and
>> > I am not sure users (and in particular users of open source runners)
>> > get a big benefit of relying on an old version. Maybe this is the
>> > moment to reconsider if having a LTS does even make sense given (1)
>> > that our end user facing APIs are 'mostly' stable (even if many still
>> > called @Experimental). (2) that users get mostly improvements on
>> > runners translation and newer APIs with a low cost just by updating
>> > the version number, and (3) that in case of any regression in an
>> > intermediary release we still can do a minor release even if we have
>> > not yet done so, let's not forget that the only thing we need to do
>> > this is enough interest to do the release from the maintainers.
>> >
>> >
>> > On Tue, Sep 17, 2019 at 12:00 AM Valentyn Tymofieiev
>> >  wrote:
>> >>
>> >> I support nominating 2.16.0 as LTS release since in has robust Python 3 
>> >> support compared with prior releases, and also for reasons of pending 
>> >> Python 2 deprecation. This has been discussed before [1]. As Robert 
>> >> pointed out in that thread, LTS nomination in Beam is currently 
>> >> retroactive. If we keep the retroactive policy, the question is how long 
>> >> we should wait for a release to be considered "safe" for nomination.  
>> >> Looks like in case of 2.7.0 we waited a month, see [2,3].
>> >>
>> >> Thanks,
>> >> Valentyn
>> >>
>> >> [1] 
>> >> https://lists.apache.org/thread.html/eba6caa58ea79a7ecbc8560d1c680a366b44c531d96ce5c699d41535@%3Cdev.beam.apache.org%3E
>> >> [2] https://beam.apache.org/blog/2018/10/03/beam-2.7.0.html
>> >> [3] 
>> >> https://lists.apache.org/thread.html/896cbc9fef2e60f19b466d6b1e12ce1aeda49ce5065a0b1156233f01@%3Cdev.beam.apache.org%3E
>> >>
>> >> On Mon, Sep 16, 2019 at 2:46 PM Austin Bennett 
>> >>  wrote:
>> >>>
>> >>> Hi All,
>> >>>
>> >>> According to our policies page [1]: "There will be at least one new LTS 
>> >>> release in a 12 month period, and LTS releases are considered deprecated 
>> >>> after 12 months"
>> >>>
>> >>> The last LTS was released 2018-10-02 [2].
>> >>>
>> >>> Does that mean the next release (2.16) shoul

Re: Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-19 Thread Robert Bradshaw
would like to give
> > enough time
> >  > to decouple the flag from the core code. (With a quick search
> > I saw
> >  > two instances related to Read and Create.) Have time to test
> > changes
> >  > and then switch the default.
> >  >
> >  >
> >  > An isinstance check might be smarter, but does not get rid of
> >  > the root
> >  > of the problem.
> >  >
> >  >
> >  > I might be wrong, IIUC, it will temporarily resolve the reported
> >  > issues. Is this not accurate?
> >  >
> >  >
> >  > -Max
> >  >
> >  > On 17.09.19 14:20, Ahmet Altay wrote:
> >  >  > Could you make that change and see if it would have
> > addressed
> >  > the issue
> >  >  > here?
> >  >  >
> >  >  > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver
> >  > mailto:kcwea...@google.com>
> > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
> >  >  > <mailto:kcwea...@google.com
> > <mailto:kcwea...@google.com> <mailto:kcwea...@google.com
> > <mailto:kcwea...@google.com>>>> wrote:
> >  >  >
> >  >  > The flag is automatically set, but not in a smart
> > way. Taking
> >  >  > another look at the code, a more resilient fix
> > would be
> >  > to just
> >  >  > check if the runner isinstance of PortableRunner.
> >  >  >
> >  >  > Kyle Weaver | Software Engineer | github.com/ibzib
> > <http://github.com/ibzib>
> >  > <http://github.com/ibzib>
> >  >  > <http://github.com/ibzib> | kcwea...@google.com
> > <mailto:kcwea...@google.com>
> >  > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
> >  >  > <mailto:kcwea...@google.com
> > <mailto:kcwea...@google.com> <mailto:kcwea...@google.com
> > <mailto:kcwea...@google.com>>>
> >  >  >
> >  >  >
> >  >  > On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay
> >  > mailto:al...@google.com>
> > <mailto:al...@google.com <mailto:al...@google.com>>
> >  >  > <mailto:al...@google.com <mailto:al...@google.com>
> > <mailto:al...@google.com <mailto:al...@google.com>>>> wrote:
> >  >  >
> >  >  > Is not this flag set automatically for the
> > portable
> >  > runner here
> >  >  > [1] ?
> >  >  >
> >  >  > [1]
> >  >  >
> >  >
> > 
> > https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160
> >  >  >
> >  >  > On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw
> >  >  >  > <mailto:rober...@google.com> <mailto:rober...@google.com
> > <mailto:rober...@google.com>>
> >  > <mailto:rober...@google.com <mailto:rober...@google.com>
> > <mailto:rober...@google.com <mailto:rober...@google.com>>>> wrote:
> >  >  >
> >  >  > On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise
> >  > mailto:t...@apache.org>
> > <mailto:t...@apache.org <mailto:t...@apache.org>>
> >  >  > <mailto:t...@apache.org
> > <mailto:t...@apache.org> <mailto:t...@apache.org
> > <mailto:t...@apache.org>>>>
> >  > wrote:
> >  >  >  >
> >  >  >  > +1 for making --experiments=beam_fn_api
> > default.
> >  >  >  >
> >  >  >  > Can the Dataflow runner driver just
> > remove the
> >  > setting if
> >  >  > it is not compatible?
> >  >  >
> &g

Re: Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-17 Thread Robert Bradshaw
On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise  wrote:
>
> +1 for making --experiments=beam_fn_api default.
>
> Can the Dataflow runner driver just remove the setting if it is not 
> compatible?

The tricky bit would be undoing the differences in graph construction
due to this flag flip. But I would be in favor of changing the default
(probably just removing the flag) and moving the non-portability parts
into the dataflow runner itself. (It looks like the key differences
here are for the Create and Read transforms.)

> On Tue, Sep 17, 2019 at 11:33 AM Maximilian Michels  wrote:
>>
>> +dev
>>
>> The beam_fn_api flag and the way it is automatically set is error-prone.
>> Is there anything that prevents us from removing it? I understand that
>> some Runners, e.g. Dataflow Runner have two modes of executing Python
>> pipelines (legacy and portable), but at this point it seems clear that
>> the portability mode should be the default.
>>
>> Cheers,
>> Max
>>
>> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe
>>  wrote:
>>
>> Kyle
>>
>> Thank you for the assistance.
>>
>> By specifying "experiments" in PipelineOptions ,
>> ==
>>  options = PipelineOptions([
>>"--runner=FlinkRunner",
>>"--flink_version=1.8",
>>"--flink_master_url=localhost:8081",
>>"--experiments=beam_fn_api"
>>])
>> ==
>>
>> I was able to submit the job successfully.
>>
>> [grpc-default-executor-0] INFO
>> org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
>> BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
>> [grpc-default-executor-0] INFO
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
>> Starting job invocation
>> BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
>> [flink-runner-job-invoker] INFO
>> org.apache.beam.runners.flink.FlinkPipelineRunner - Translating
>> pipeline to Flink program.
>> [flink-runner-job-invoker] INFO
>> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating
>> a Batch Execution Environment.
>> [flink-runner-job-invoker] INFO
>> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using
>> Flink Master URL localhost:8081.
>> [flink-runner-job-invoker] WARN
>> org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
>> default parallelism could be found. Defaulting to parallelism 1.
>> Please set an explicit parallelism with --parallelism
>> [flink-runner-job-invoker] INFO
>> org.apache.flink.api.java.ExecutionEnvironment - The job has 0
>> registered types and 0 default Kryo serializers
>> [flink-runner-job-invoker] INFO
>> org.apache.flink.configuration.Configuration - Config uses fallback
>> configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
>> [flink-runner-job-invoker] INFO
>> org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
>> [flink-runner-job-invoker] INFO
>> org.apache.flink.client.program.rest.RestClusterClient - Submitting
>> job 4e055a8878dda3f564a7b7c84d48510d (detached: false).
>>
>> Thanks,
>> Yu Watanabe
>>
>> On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver > > wrote:
>>
>> Try adding "--experiments=beam_fn_api" to your pipeline options.
>> (This is a known issue with Beam 2.15 that will be fixed in 2.16.)
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib
>>  | kcwea...@google.com
>> 
>>
>>
>> On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe
>> mailto:yu.w.ten...@gmail.com>> wrote:
>>
>> Hello.
>>
>> I am trying to spin up the flink runner but looks like data
>> serialization is failing.
>> I would like to ask for help to get over with this error.
>>
>> 
>> 
>> [flink-runner-job-invoker] ERROR
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
>> - Error during job invocation
>> 
>> BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
>> java.lang.IllegalArgumentException: unable to deserialize
>> BoundedSource
>>  at
>> 
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>>  at
>> 
>> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
>>  at
>> 
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineT

Re: The state of external transforms in Beam

2019-09-16 Thread Robert Bradshaw
Thanks for bringing this up again. My thoughts on the open questions below.

On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
> That commit solves 2 problems:
>
> Adds the pubsub Java deps so that they’re available in our portable pipeline
> Makes the coder for the PubsubIO message-holder type, PubsubMessage, 
> available as a standard coder. This is required because both PubsubIO.Read 
> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage 
> objects, but only “standard” (i.e. portable) coders can be used, so we have 
> to hack it to make PubsubMessage appear as a standard coder.
>
> More details:
>
> There’s a similar magic commit required for Kafka external transforms
> The Jira issue for this problem is here: 
> https://jira.apache.org/jira/browse/BEAM-7870
> For problem #2 above there seems to be some consensus forming around using 
> Avro or schema/row coders to send compound types in a portable way. Here’s 
> the PR for making row coders portable
> https://github.com/apache/beam/pull/9188

+1. Note that this doesn't mean that the IO itself must produce rows;
part of the Schema work in Java is to make it easy to automatically
convert from various Java classes to schemas transparently, so this
same logic that would allow one to apply an SQL filter directly to a
Kafka/PubSub read would allow cross-language. Even if that doesn't
work, we need not uglify the Java API; we can have an
option/alternative transform that appends the convert-to-Row DoFn for
easier use by external (though the goal of the former work is to make
this step unnecissary).

> I don’t really have any ideas for problem #1

The crux of the issue here is that the jobs API was not designed with
cross-language in mind, and so the artifact API ties artifacts to jobs
rather than to environments. To solve this we need to augment the
notion of environment to allow the specification of additional
dependencies (e.g. jar files in this specific case, or better as
maven/pypi/... dependencies (with version ranges) such that
environment merging and dependency resolution can be sanely done), and
a way for the expansion service to provide such dependencies.

Max wrote up a summary of the prior discussions at
https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8

In the short term, one can build a custom docker image that has all
the requisite dependencies installed.

This touches on a related but separable issue that one may want to run
some of these transforms "natively" in the same process as the runner
(e.g. a Java IO in the Flink Java Runner) rather than via docker.
(Similarly with subprocess.) Exactly how that works with environment
specifications is also a bit TBD, but my proposal has been that these
are best viewed as runner-specific substitutions of standard
environments.

> So the portability expansion system works, and now it’s time to sand off some 
> of the rough corners. I’d love to hear others’ thoughts on how to resolve 
> some of these remaining issues.

+1


On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
>
> Hi all,
> There was some interest in this topic at the Beam Summit this week (btw, 
> great job to everyone involved!), so I thought I’d try to summarize the 
> current state of things.
> First, let me explain the idea behind an external transforms for the 
> uninitiated.
>
> Problem:
>
> there’s a transform that you want to use, but it’s not available in your 
> desired language. IO connectors are a good example: there are many available 
> in the Java SDK, but not so much in Python or Go.
>
> Solution:
>
> Create a stub transform in your desired language (e.g. Python) whose primary 
> role is to serialize the parameters passed to that transform
> When you run your portable pipeline, just prior to it being sent to the Job 
> Service for execution, your stub transform’s payload is first sent to the 
> “Expansion Service” that’s running in the native language (Java), where the 
> payload is used to construct an instance of the native transform, which is 
> then expanded and converted to a protobuf and sent back to the calling 
> process (Python).
> The protobuf representation of the expanded transform gets integrated back 
> into the pipeline that you’re submitting
> Steps 2-3 are repeated for each external transform in your pipeline
> Then the whole pipeline gets sent to the Job Service to be invoked on 
> Flink/Spark/etc
>
> 
>
> Now on to my journey to get PubsubIO working in python on Flink.
>
> The first issue I encountered was that there was a lot of boilerplate 
> involved in serializing the stub python transform’s parameters so they can be 
> sent to the expansion service.
>
> I created a PR to make this simpler, which has just been merged to master: 
> https://github.com/apache/beam/pull/9098
>
> With this feature in place, if you’re using python 3.7 you can use a 
> dataclass and the typing module to crea

Re: How do you write portable runner pipeline on separate python code ?

2019-09-13 Thread Robert Bradshaw
Note that loopback won't fix the problem for, say, cross-language IOs. But,
yes, it's really handy and should probably be used more.

On Fri, Sep 13, 2019 at 8:29 AM Lukasz Cwik  wrote:

> And/or update the wiki/website with some how to's...
>
> On Fri, Sep 13, 2019 at 7:51 AM Thomas Weise  wrote:
>
>> I agree that loopback would be preferable for this purpose. I just wasn't
>> aware this even works with the portable Flink runner. Is it one of the best
>> guarded secrets? ;-)
>>
>> Kyle, can you please post the pipeline options you would use for Flink?
>>
>>
>> On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver  wrote:
>>
>>> I prefer loopback because a) it writes output files to the local
>>> filesystem, as the user expects, and b) you don't have to pull or build
>>> docker images, or even have docker installed on your system -- which is one
>>> less point of failure.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>>
>>>
>>> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise  wrote:
>>>
>>>> This should become much better with 2.16 when we have the Docker images
>>>> prebuilt.
>>>>
>>>> Docker is probably still the best option for Python on a JVM based
>>>> runner in a local environment that does not have a development setup.
>>>>
>>>>
>>>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver 
>>>> wrote:
>>>>
>>>>> +dev  I think we should probably point new users
>>>>> of the portable Flink/Spark runners to use loopback or some other
>>>>> non-docker environment, as Docker adds some operational complexity that
>>>>> isn't really needed to run a word count example. For example, Yu's 
>>>>> pipeline
>>>>> errored here because the expected Docker container wasn't built before
>>>>> running.
>>>>>
>>>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>>>> kcwea...@google.com
>>>>>
>>>>>
>>>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw 
>>>>> wrote:
>>>>>
>>>>>> On this note, making local files easy to read is something we'd
>>>>>> definitely like to improve, as the current behavior is quite surprising.
>>>>>> This could be useful not just for running with docker and the portable
>>>>>> runner locally, but more generally when running on a distributed system
>>>>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if 
>>>>>> we
>>>>>> could automatically stage local files to be read as artifacts that could 
>>>>>> be
>>>>>> consumed by any worker (possibly via external directory mounting in the
>>>>>> local docker case rather than an actual copy), and conversely copy small
>>>>>> outputs back to the local machine (with the similar optimization for 
>>>>>> local
>>>>>> docker).
>>>>>>
>>>>>> At the very least, however, obvious messaging when the local
>>>>>> filesystem is used from within docker, which is often a (non-obvious and
>>>>>> hard to debug) mistake should be added.
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik 
>>>>>> wrote:
>>>>>>
>>>>>>> When you use a local filesystem path and a docker environment,
>>>>>>> "/tmp" is written inside the container. You can solve this issue by:
>>>>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>>>>> * Mounting an external directory into the container so that any
>>>>>>> "local" writes appear outside the container
>>>>>>> * Using a non-docker environment such as external or process.
>>>>>>>
>>>>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello.
>>>>>>>>
>>>>>>>> I would like to ask for help with my sample code using portable
>>>>>>>> runner using apache flink.
>>>>>>>> I was able to work out the wordcount.py using this page.
>>>>>>>>
>>&g

Re: [discuss] How we support our users on Slack / Mailing list / StackOverflow

2019-09-06 Thread Robert Bradshaw
I would also suggest SO as the best alternative, especially due to its
indexability and searchability. If discussion is needed, the users
list (my preference) or slack can be good options, and ideally the
resolution is brought back to SO.

On Fri, Sep 6, 2019 at 1:10 PM Udi Meiri  wrote:
>
> I don't go on Slack, but I will be notified of mentions. It has the advantage 
> of being an informal space.
> SO can feel just as intimidating as the mailing list IMO. Unlike the others, 
> it doesn't lend itself very well to discussions (you can only post comments 
> or answers).
>
>
>
> On Fri, Sep 6, 2019 at 10:55 AM Pablo Estrada  wrote:
>>
>> Hello all,
>>
>> THE SITUATION:
>> It was brought to my attention recently that Python users in Slack are not 
>> getting much support, because most of the Beam Python-knowledgeable people 
>> are not on Slack. Unfortunately, in the Beam site, we do refer people to 
>> Slack for assistance[1].
>>
>> Java users do receive reasonable support, because there are enough Beam 
>> Java-knowledgeable people online, and willing to answer.
>>
>> On the other hand, at Google we do have a number of people who are 
>> responsible to answer questions on StackOverflow[2], and we do our best to 
>> answer promptly. I think we do a reasonable job overall.
>>
>> SO LET'S DISCUSS:
>> How should we advise the community to ask questions about Beam?
>> - Perhaps we should encourage people to try the mailing list first
>> - Perhaps we should encourage people to try StackOverflow first
>> - Perhaps we should write a bot that encourages Python users to go to 
>> StackOverflow
>> - something else?
>>
>> My personal opinion is that a mailing list is not great: It's intimidating, 
>> it does not provide great indexing or searchability.
>>
>> WHAT I PROPOSE:
>>
>> I think explicitly encouraging everyone to go to StackOverflow first will be 
>> the best alternative: It's indexed, searchable, less intimidating than the 
>> mailing list. We can add that they can try Slack as well - without any 
>> guarantees.
>>
>> What do others think?
>> -P.
>>
>> [1] https://beam.apache.org/community/contact-us/
>> [2] https://stackoverflow.com/questions/tagged/apache-beam?tab=Newest


Re: Stop publishing unneeded Java artifacts

2019-09-03 Thread Robert Bradshaw
:sdks:java:testing:expansion-service could be useful to publish for
testing as well.

On Fri, Aug 30, 2019 at 3:13 PM Lukasz Cwik  wrote:
>
> Google internally relies on being able to get the POM files generated for:
> :sdks:java:testing:nexmark
> :sdks:java:testing:test-utils
>
> Generating the POM files currently relies on publishing being enabled for 
> those projects so could we keep publishing those two modules. Disabling the 
> others sounds fine to me.
>
> On Thu, Aug 29, 2019 at 9:41 PM Lukasz Cwik  wrote:
>>
>> I wanted to double check that we don't rely on this publishing inside Google 
>> for some reason. Will update this thread tomorrow.
>>
>> On Wed, Aug 28, 2019 at 7:11 AM Łukasz Gajowy  wrote:
>>>
>>> Hi all,
>>>
>>> I wanted to notify that in PR 9417 I'm planning to turn off publishing of 
>>> the following modules' artifacts to the maven repository:
>>>
>>> :runners:google-cloud-dataflow-java:worker:windmill
>>> :sdks:java:build-tools
>>> :sdks:java:javadoc
>>> :sdks:java:testing:expansion-service
>>> :sdks:java:io:bigquery-io-perf-tests
>>> :sdks:java:io:file-based-io-tests
>>> :sdks:java:io:elasticsearch-tests:elasticsearch-tests-2
>>> :sdks:java:io:elasticsearch-tests:elasticsearch-tests-5
>>> :sdks:java:io:elasticsearch-tests:elasticsearch-tests-6
>>> :sdks:java:io:elasticsearch-tests:elasticsearch-tests-common
>>> :sdks:java:testing:load-tests
>>> :sdks:java:testing:nexmark
>>> :sdks:java:testing:test-utils
>>>
>>> AFAIK, the purpose of these modules is to keep related 
>>> tests/test-utils/utils together. We are not expecting users to make use of 
>>> such artifacts. Please let me know if you have any objections. If there are 
>>> none and the PR gets merged, the artifacts will no longer be published.
>>>
>>> Thanks!
>>> Łukasz


Re: Improve container support

2019-08-27 Thread Robert Bradshaw
On Tue, Aug 27, 2019 at 6:20 PM Ahmet Altay  wrote:
>
> On Tue, Aug 27, 2019 at 5:50 PM Robert Bradshaw  wrote:
>>
>> On Tue, Aug 27, 2019 at 3:35 PM Hannah Jiang  wrote:
>> >
>> > Hi team
>> >
>> > I am working on improving docker container support for Beam. We would like 
>> > to publish prebuilt containers for each release version and daily 
>> > snapshot. Current work focuses on release images only and it would be part 
>> > of the release process.
>>
>> This would be great!
>>
>> > The release images will be pushed to GCR which is publicly 
>> > accessible(pullable). We will use the following locations.
>> > Repository: gcr.io/beam
>> > Project: apache-beam-testing
>>
>> Given that these are release artifacts, we should use a project with
>> more restricted access than "anyone who opens a PR on github."
>
>
> We have two options:
> -  gcr.io works based on the permissions of the gcs bucket that is backing 
> it. GCS supports bucket only permissions. These permissions needs to be 
> explicitly granted and the service accounts used by jenkins jobs does not 
> have these explicit permissions today.
> - we can create a new project in gcr, bintray or anything else that offers 
> the same service.

I think the cleanest is to simply have a new project whose membership
consists of (interested) PMC members. If we have to populate this
manually I think that'd still be OK as the churn is quite low.


Re: Improve container support

2019-08-27 Thread Robert Bradshaw
On Tue, Aug 27, 2019 at 3:35 PM Hannah Jiang  wrote:
>
> Hi team
>
> I am working on improving docker container support for Beam. We would like to 
> publish prebuilt containers for each release version and daily snapshot. 
> Current work focuses on release images only and it would be part of the 
> release process.

This would be great!

> The release images will be pushed to GCR which is publicly 
> accessible(pullable). We will use the following locations.
> Repository: gcr.io/beam
> Project: apache-beam-testing

Given that these are release artifacts, we should use a project with
more restricted access than "anyone who opens a PR on github."

> More details, including naming and tagging scheme, can be found at wiki which 
> is written by several contributors.

Would it make sense to put this in a format more amenable to commenting?

> I would like to discuss these two questions.
> 1. How many tests do we need to run before pushing images to gcr?
> Publishing artifacts is the last step of the release process, so at this 
> moment, we already verified all codebase. In addition, many Jenkins tests use 
> containers, so it is already verified several times. Do we need to run it 
> again?
>
> 2. How many tests do we need to run to validate pushed images?
> When we push the images, we assume the images would work and pass all the 
> tests. After pushing, we should confirm the images are pullable and useable. 
> I suggest we run several tests on dataflow with each pushed image. What do 
> you think?

I think the release manager publishing these images as part of the
release process (just like publishing to the maven repo and svn) and
validation happens as part of validating the artifacts during the
vote.


Re: Write-through-cache in State logic

2019-08-27 Thread Robert Bradshaw
Just to clarify, the repeated list of cache tokens in the process
bundle request is used to validate reading *and* stored when writing?
In that sense, should they just be called version identifiers or
something like that?

On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels  wrote:
>
> Thanks. Updated:
>
> message ProcessBundleRequest {
>   // (Required) A reference to the process bundle descriptor that must be
>   // instantiated and executed by the SDK harness.
>   string process_bundle_descriptor_reference = 1;
>
>   // A cache token which can be used by an SDK to check for the validity
>   // of cached elements which have a cache token associated.
>   message CacheToken {
>
> // A flag to indicate a cache token is valid for user state.
> message UserState {}
>
> // A flag to indicate a cache token is valid for a side input.
> message SideInput {
>   // The id of a side input.
>   string side_input = 1;
> }
>
> // The scope of a cache token.
> oneof type {
>   UserState user_state = 1;
>   SideInput side_input = 2;
> }
>
> // The cache token identifier which should be globally unique.
> bytes token = 10;
>   }
>
>   // (Optional) A list of cache tokens that can be used by an SDK to reuse
>   // cached data returned by the State API across multiple bundles.
>   repeated CacheToken cache_tokens = 2;
> }
>
> On 27.08.19 19:22, Lukasz Cwik wrote:
>
> SideInputState -> SideInput (side_input_state -> side_input)
> + more comments around the messages and the fields.
>
>
> On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels  wrote:
>>
>> We would have to differentiate cache tokens for user state and side inputs. 
>> How about something like this?
>>
>> message ProcessBundleRequest {
>>   // (Required) A reference to the process bundle descriptor that must be
>>   // instantiated and executed by the SDK harness.
>>   string process_bundle_descriptor_reference = 1;
>>
>>   message CacheToken {
>>
>> message UserState {
>> }
>>
>> message SideInputState {
>>   string side_input_id = 1;
>> }
>>
>> oneof type {
>>   UserState user_state = 1;
>>   SideInputState side_input_state = 2;
>> }
>>
>> bytes token = 10;
>>   }
>>
>>   // (Optional) A list of cache tokens that can be used by an SDK to reuse
>>   // cached data returned by the State API across multiple bundles.
>>   repeated CacheToken cache_tokens = 2;
>> }
>>
>> -Max
>>
>> On 27.08.19 18:43, Lukasz Cwik wrote:
>>
>> The bundles view of side inputs should never change during processing and 
>> should have a point in time snapshot.
>>
>> I was just trying to say that the cache token for side inputs being deferred 
>> till side input request time simplified the runners implementation since 
>> that is conclusively when the runner would need to take a look at the side 
>> input. Putting them as part of the ProcesBundleRequest complicates that but 
>> does make the SDK implementation significantly simpler which is a win.
>>
>> On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels  wrote:
>>>
>>> Thanks for the quick response.
>>>
>>> Just to clarify, the issue with versioning side input is also present
>>> when supplying the cache tokens on a request basis instead of per
>>> bundle. The SDK never knows when the Runner receives a new version of
>>> the side input. Like you pointed out, it needs to mark side inputs as
>>> stale and generate new cache tokens for the stale side inputs.
>>>
>>> The difference between per-request tokens and per-bundle tokens would be
>>> that the side input can only change after a bundle completes vs. during
>>> the bundle. Side inputs are always fuzzy in that regard because there is
>>> no precise instance where side inputs are atomically updated, other than
>>> the assumption that they eventually will be updated. In that regard
>>> per-bundle tokens for side input seem to be fine.
>>>
>>> All of the above is not an issue for user state, as its cache can remain
>>> valid for the lifetime of a Runner<=>SDK Harness connection. A simple
>>> solution would be to not cache side input because there are many cases
>>> where the caching just adds additional overhead. However, I can also
>>> imagine cases where side input is valid forever and caching would be
>>> very beneficial.
>>>
>>> For the first version I want to focus on user state because that's where
>>> I see the most benefit for caching. I don't see a problem though for the
>>> Runner to detect new side input and reflect that in the cache tokens
>>> supplied for a new bundle.
>>>
>>> -Max
>>>
>>> On 26.08.19 22:27, Lukasz Cwik wrote:
>>> > Your summary below makes sense to me. I can see that recovery from
>>> > rolling back doesn't need to be a priority and simplifies the solution
>>> > for user state caching down to one token.
>>> >
>>> > Providing cache tokens upfront does require the Runner to know what
>>> > "version" of everything it may supply to the SDK upfront (instead of on
>>> > re

Re: Write-through-cache in State logic

2019-08-27 Thread Robert Bradshaw
On Sun, Aug 18, 2019 at 7:30 PM Rakesh Kumar  wrote:
>
> not to completely hijack Max's question but a tangential question regarding 
> LRU cache.
>
> What is the preferred python library for LRU cache?
> I noticed that cachetools [1] is used as one of the dependencies for GCP [2]. 
> Cachetools[1] has LRU cache and it supports Python 2 & 3. It can potentially 
> support our use case.  Can we move cachetools to the required pacakge list 
> [3] and use it for cross bundle caching?
>
> 1. https://pypi.org/project/cachetools/
> 2. 
> https://github.com/apache/beam/blob/96abacba9b8c7475c753eb3c0b58cca27c46feb1/sdks/python/setup.py#L143
> 3. 
> https://github.com/apache/beam/blob/96abacba9b8c7475c753eb3c0b58cca27c46feb1/sdks/python/setup.py#L104

cachetools sounds like a fine choice to me.


[ANNOUNCE] New committer: Valentyn Tymofieiev

2019-08-26 Thread Robert Bradshaw
Hi,

Please join me and the rest of the Beam PMC in welcoming a new
committer: Valentyn Tymofieiev

Valentyn has made numerous contributions to Beam over the last several
years (including 100+ pull requests), most recently pushing through
the effort to make Beam compatible with Python 3. He is also an active
participant in design discussions on the list, participates in release
candidate validation, and proactively helps keep our tests green.

In consideration of Valentyn's contributions, the Beam PMC trusts him
with the responsibilities of a Beam committer [1].

Thank you, Valentyn, for your contributions and looking forward to many more!

Robert, on behalf of the Apache Beam PMC

[1] 
https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer


Re: Brief of interactive Beam

2019-08-26 Thread Robert Bradshaw
On Fri, Aug 23, 2019 at 4:25 PM Ning Kang  wrote:

> On Aug 23, 2019, at 3:09 PM, Robert Bradshaw  wrote:
>
> Cool, sounds like we're getting closer to the same page. Some more replies
> below.
>
> On Fri, Aug 23, 2019 at 1:47 PM Ning Kang  wrote:
>
>> Thanks for the feedback, Robert! I think I got your idea.
>> Let me summarize it to see if it’s correct:
>> 1. You want everything about
>>
>> standard Beam concepts
>>
>>  to follow existing pattern: so we can shot down create_pipeline() and
>> keep the InteractiveRunner notion when constructing pipeline, I agree with
>> it. A runner can delegate another runner, also agreed. Let’s keep it that
>> way.
>>
>
> Despite everything I've written, I'm not convinced that exposing this as a
> Runner is the most intuitive way to get interactivity either. Given that
> the "magic" of interactivity is being able to watch PCollections (for
> inspection and further construction), and if no PCollecitons are watched
> execution proceeds as normal, what are your thoughts about making all
> pipelines "interactive" and just doing the magic iff there are PCollections
> to watch? (The opt-in incantation here would be ibeam.watch(globals()) or
> similar.)
>
> FWIW, Flume has something similar (called marking collections as to be
> materialized). It has its pros and cons.
>
> By default __main__ is watched, similar to the watch(globals()). If no
> PCollection variable is being watched, it’s not doing any magic.
> I’m not sure about making all pipelines “interactive” such as by adding an
> “interactive=True/False” option when constructing pipeline.
>

My point was that watch(globals()) (or anything else) would be the explicit
op in to interactive, instead of doing interactive=True or manually
constructing an InteractiveRunner or anything else.


> Since we couldn’t decide which one is more intuitive, I would stick to the
> existing InteractiveRunner constructor that is open sourced.
> And we try to avoid changing any code outside …/runners/interactive/.
>
> Yes, we can stick with what's already there for now to avoid blocking any
implementation work.

> 2. watch() and visualize() can be in the independent interactive beam
>> module since they are
>>
>> concepts that are unique to being interactive
>>
>> 3. I'll add some example for the run_pipeline() in design doc. The short
>> answer is run_pipeline() != p.run(). Thanks for sharing the doc (
>> https://s.apache.org/no-beam-pipeline).
>> As described in the doc, when constructing the pipeline, we still want to
>> bundle a runner and options to the constructed pipeline even in the future.
>> So if the runner is InteractiveRunner, the interactivity instrument
>> (implicitly applied read/write cache PTransform and input/output wiring) is
>> only applied when "run_pipeline()" of the runner implementation is invoked.
>> p.run() will apply the instrument. However, this static function
>> run_pipeline() takes in a new runner and options,
>> invoking “run_pipeline()” implementation of the new runner and wouldn’t
>> have the instrument, thus no interactivity.
>> Because you cannot (don’t want to, as seen in the doc, users cannot
>> access the bundled pipeline/options in the future) change the runner easily
>> without re-executing all the notebook cells, this shorthand function allows
>> a user to run pipeline without interactivity immediately anywhere in a
>> notebook. In the meantime, the pipeline is still bundled with the original
>> Interactive Runner. The users can keep developing further pipelines.
>> The usage of this function is not intuitive until you put it in a
>> notebook user scenario where users develop, test in prod-like env and
>> develop further. And it’s equivalent to users writing
>> "from_runner_api(to_runner_api(pipeline))” in their notebook. It’s just a
>> shorthand.
>>
>
> What you're trying to work around here is the flaw in the existing API
> that a user binds the choice of Runner before pipeline construction, rather
> than at the point of execution. I propose we look at fixing this in Beam
> itself.
>
> Then I would propose not exposing this. If late runner binding is
> supported, we wouldn’t even need this. We can write it in an example
> notebook rather than exposing it.
>

Sounds good.


> 4. And we both agree that implicit cache is palatable and should be the
>> only thing we use to support interactivity. Cache and watched pipeline
>> definition (which tells us what to cache) are the main “hidden state” I
>> meant. Because the cache mechanism is totally implicit and hidden

Re: Python question about save_main_session

2019-08-23 Thread Robert Bradshaw
I suggest re-writing the test to avoid save_main_session.

On Fri, Aug 23, 2019 at 11:57 AM Udi Meiri  wrote:

> Hi,
> I'm trying to get pytest with the xdist plugin to run Beam tests. The
> issue is with save_main_session and a dependency of pytest-xdist called
> execnet, which triggers this error:
>
> *apache_beam/examples/complete/tfidf.py*:212: in run*output | 'write' >> 
> WriteToText(known_args.output)**apache_beam/pipeline.py*:426: in __exit__*
> self.run().wait_until_finish()**apache_beam/pipeline.py*:406: in run*
> self._options).run(False)**apache_beam/pipeline.py*:416: in run*
> pickler.dump_session(os.path.join(tmpdir, 
> 'main_session.pickle'))**apache_beam/internal/pickler.py*:282: in 
> dump_session*
> dill.load_session(file_path)**../../../../virtualenvs/beam-py35/lib/python3.5/site-packages/dill/_dill.py*:410:
>  in load_session*module = 
> unpickler.load()**../../../../virtualenvs/beam-py35/lib/python3.5/site-packages/execnet/gateway_base.py*:130:
>  in __getattr__*locs = 
> self._importdef.get(name)**../../../../virtualenvs/beam-py35/lib/python3.5/site-packages/execnet/gateway_base.py*:130:
>  in __getattr__*locs = 
> self._importdef.get(name)**../../../../virtualenvs/beam-py35/lib/python3.5/site-packages/execnet/gateway_base.py*:130:
>  in __getattr__*locs = self._importdef.get(name)**E   RecursionError: 
> maximum recursion depth exceeded*
> !!! Recursion detected (same locals & position)
>
>
> Does anyone on this list have experience with these kinds of errors? Any
> workarounds I can use? (can we handle this module specially / can we
> exclude it from main session?)
>


Re: Brief of interactive Beam

2019-08-23 Thread Robert Bradshaw
 should be tailored for different
> underlying runners.”
>
>The caching mechanism / the magic that helps the interactivity
> instrumenting process might need different implementation for different
> underlying runners. Because the runner can be anywhere deployed in any
> architecture, the notebook is just a process on a machine. They need to
> work together.
>Currently, we have the local file based cache. If we run a pipeline
> with underlying_runner as DataflowRunner, we’ll need something like GCS
> based cache. An in-memory cache might be runner agnostic, but it might
> explode with big data source.
>

Yep, we need filesystem/directory to use as a cache. We have an existing
temp_location flag that we can use for this (and is required for
distributed runners). If unset we can default to a local temp dir (which
works for the direct runner).


>Existing InteractiveRunner has the following portability
> <https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive#portability>.
> That’s why I said the interactivity (implementation) needs to be tailored
> for different underlying runners.
>If we allow users to pass in all kinds of underlying runners (even
> their in-house ones), we have to support the interactivity for all of them
> which we probably don't. That’s why we wanted a create_pipeline() wrapper
> so that in notebook, when building a pipeline, bundle to DirectRunner by
> default.
>The focus on the Direct Runner is also related to our objective: we
> want to provide easy-to-use notebook and some notebook environment where
> users can interactively execute pipelines without worrying about setup
> (especially when the setup is not Beam but Interactive Beam related).
> 6. We don’t fix typo for user defined transforms
>
> I'm talking about pruning like having a cell with
>
> pcoll | beam.Map(lambda x: expression_with_typo)
>
> and then fixing it (and re-evaluating) with
>
> pcoll | beam.Map(lambda x: expression_with_typo_fixed)
>
> where the former Map would *always* fail and never get removed from the
> pipeline.
>
> We never change the pipeline defined by the user. Interactivity is applied
> to a copy of user defined pipeline.
>

Sure. But does the executed (copy) of the pipeline contain the bad Map
operation in it? If so, it in essence "poisons" the entire pipeline,
forcing a user to re-create and re-define it from the start to make forward
progress (which results in quite a poor user experience--the errors in cell
N manifest in cell M, but worse fixing and re-executing cell N doesn't fix
cell M). If not, how is it intelligently excluded (and in a way that is not
too dis-similar from non-interactive mode, and doesn't cause surprises with
the p.run vs. run_pipeline difference)?


> 7.
>
> One approach was that the pipeline construction is re-executed every time
> (i.e the "pipeline" object to run is really a callback, like a callable or
> a PTransform) and then there's no ambiguity here.
>
> I didn’t quite get it. Pipeline construction only happens when a user
> executes a cell with the pipeline construction code.
> Are you suggesting changing the logic in pipeline.apply() to always
> reapply/replace a NamedPTransform? I don’t think we (Interactive Beam) can
> decide that because it changes the behavior of Beam.
> We had some thought of subclassing pipeline and use the create_pipeline()
> method to create the subclassed pipeline object. Then intercept the
> pipeline.apply() to always replace PTransform with existing full label and
> apply logic of parent pipeline’s apply() logic.
> It seems to be a no go to me now.
>

Sorry I wasn't clear. I'm referring to the style in the above doc about
getting rid of the Pipeline object (as a long-lived thing at least). In
this case the actual execution of pipeline construction never spans
multiple cells (though its implementation might via function calls) so one
never has out-of-date transforms dangling off the pipeline object.


> This has the downsides of recreating the PCollectiion objects which are
> being used as handles (though perhaps they could be re-identified).
>
> If a user re-executes a cell with PCollection = p | PTransform, the
> PCollection object will be a new instance. That is not a downside.
> We can keep the existing behavior of Beam to always raise an error when
> the cell with named PTransform is re-executed.
>
> Thanks!
>
> Ning.
>
>
>
> On Aug 23, 2019, at 11:36 AM, Robert Bradshaw  wrote:
>
> On Wed, Aug 21, 2019 at 3:33 PM GMAIL  wrote:
>
>> Thanks for the input, Robert!
>>
>> On Aug 21, 2019, at 11:49 AM, Robert Bradshaw 
>> wrote:
>>
>> On Wed, Aug 14, 2019 at 11

Re: Brief of interactive Beam

2019-08-23 Thread Robert Bradshaw
On Wed, Aug 21, 2019 at 3:33 PM GMAIL  wrote:

> Thanks for the input, Robert!
>
> On Aug 21, 2019, at 11:49 AM, Robert Bradshaw  wrote:
>
> On Wed, Aug 14, 2019 at 11:29 AM Ning Kang  wrote:
>
>> Ahmet, thanks for forwarding!
>>
>>
>>> My main concern at this point is the introduction of new concepts, even
>>> though these are not changing other parts of the Beam SDKs. It would be
>>> good to see at least an alternative option covered in the design document.
>>> The reason is each additional concept adds to the mental load of users. And
>>> also concepts from interactive Beam will shift user's expectations of Beam
>>> even though there are not direct SDK modifications.
>>
>>
>> Hi Robert. About the concern, I think I have a few points:
>>
>>1. *Interactive Beam (or Interactive Runner) is already an existing
>>"new concept" that normal Beam user could opt-in if they want an
>>interactive Beam experience.* They need to do lots of setup steps and
>>learn new things such as Jupyter notebook and at least interactive_runner
>>module to make it work and make use of it.
>>
>> I think we should start with the perspective that most users interested
> in using Beam interactively already know about Jupyter notebooks, or at
> least ipython, and would want to use it to learn (and more effectively use)
> Beam.
>
> Yes, I agree with the perspective for users who are familiar with
> notebook. Yet it doesn’t prevent us from creating ready-to-use containers
> (such as binder <https://github.com/jupyterhub/binderhub>)  for users who
> want to try Beam interactively without setting up a environment with all
> the dependencies interactive Beam introduces. I agree that experienced
> users understand how to set up additional dependencies and read examples,
> it’s just we are also targeting other entry level audiences.
> But back to the original topic, the design is not trying to add new
> concept, but fixing some rough edges of existing Interactive Beam features.
> We can discuss whether a factory of create_pipeline() is really desired and
> decide whether to expose it later. We hope the interactive_beam module to
> be the only module an Interactive Beam user would directly invoke in their
> notebook.
>

My goal would be that one uses a special interactive module for those
concepts that are unique to being interactive, and standard Beam concepts
(rather than replacements or wrappers) otherwise.

>
>>1. *The behavior of existing interactive Beam is different from
>>normal Beam because of the interactive nature and the users would expect
>>that.* And the users wouldn't shift their expectation of normal Beam.
>>Just like running Python scripts might result in different behavior than
>>running all of them in an interactive Python session.
>>
>> I'm not quite following this. One of the advantages strengths of Python
> is that lack of the difference between the interactive vs. non-interactive
> behavior. (The fact that a script's execution is always in top to bottom
> order, unlike a notebook, is the primary difference.)
>
> Sorry for the confusion. What I’m saying is about the hidden states.
> Running several Python scripts from top to bottom in an IPython session
> might generate different effects than running them in the same order
> normally. Say if you have an in-memory global configuration that is shared
> among all the scripts and if it’s missing, a script initializes one.
> Running the scripts in IPython will pass the initialization and
> modification of configuration along the scripts. While running the scripts
> one by one will initialize different configurations. Running cells in a
> notebook is equivalent to appending the cells into a script and run it. The
> interactivity is not about the order, but if there is hidden states
> preserved between each statement or each script execution. And the users
> should expect that there might be hidden states when they are in an
> interactive environment because that is exactly the interactivity they
> expect. However, they don’t hold the hidden states, the session does it for
> them. A user wouldn’t need to explicitly say “preserve the variable x I’ve
> defined in this cell because I want to reuse it in some other cells I’m
> going to execute”. The user can directly access variable x once the cell
> defining x is executed. And even if the user deletes the cell defining x, x
> still exists. At that stage, no one would know there is a variable x in
> memory by just looking at the notebook. One would see a missing execution
> sequence (on top left of each executed cell) and wonder wher

Re: Brief of interactive Beam

2019-08-21 Thread Robert Bradshaw
road of having different
interactive apis/experiences for different runners. In particular, the many
instances to the DirectRunner are worrisome--what's special about the
DirectRunner that other runners cannot provide that's needed for
interactive? If we can't come up with a good answer to that, we should not
impose this restriction.

>
>1. *When users run pipeline built from interactive runner in a
>non-interactive environment, it's direct runner like any other Beam
>tutorial demonstrates*. It's even easier because the user doesn't need
>to specify the runner nor pass in options.
>
>  So is the idea to have code like

if is_ipython() or is_jupyter() or is ...:
  do_something()
else:
  do_another_thing()

I'd really like to avoid this as it means one will (quite surprisingly, and
possibly for subtle reasons) not copy code from a notebook elsewhere. Or
did you mean something else here?

>
>1. *Interactive Beam is solving an orthogonal set of problems than
>Beam*. You can think of it as a wrapper of Beam that enables
>interactivity and it's not even a real runner. It doesn't change the Beam
>model such as how you build a pipeline. And with the Beam portability, you
>get the capability to run the pipeline built from interactive runner with
>other runners for free. It adds the interactive behavior that a user
>expects.
>2. *We want to open source it though we can iterate faster without
>doing it*. The whole project can be encapsulated in a completely
>irrelevant repository and from a developer's perspective, I want to hide
>all the implementation details from the interactive Beam user. However, as
>there is more and more desire for interactive Beam (+Mehran Nazir
> for more details), we want to share the
>implementation with others who want to contribute and explore the
>interactive world.
>
> I would much rather see interactivity as part of the Beam project. With
good APIs the implementations don't have to be tightly coupled (e.g. the
underlying runner delegation) but I think it will be a better user
experience if interactive was a mode rather than a wrapper with different
entry points.


I think watch() is a really good solution to knowing which collections to
cache, and visualize() will be very useful.

One thing I don't see tackled at all yet is the fact that pipelines are
only ever mutated by appending on new operations, so some design needs to
be done in terms of how to remove (possibly error-causing) operations or
replace bad ones with fixed ones. This is where most of the unsolved
problems lie.

Also +David Yan   for more opinions.
>
> Thanks!
>
> Ning.
>
> On Tue, Aug 13, 2019 at 6:00 PM Ahmet Altay  wrote:
>
>> Ning, I believe Robert's questions from his email has not been answered
>> yet.
>>
>> On Tue, Aug 13, 2019 at 5:00 PM Ning Kang  wrote:
>>
>>> Hi all, I'll leave another 3 days for design
>>> <https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit?usp=sharing>
>>>  review.
>>> Then we can have a vote session if there is no objection.
>>>
>>> Thanks!
>>>
>>> On Fri, Aug 9, 2019 at 12:14 PM Ning Kang  wrote:
>>>
>>>> Thanks Ahmet for the introduction!
>>>>
>>>> I've composed a design overview
>>>> <https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit?usp=sharing>
>>>> describing changes we are making to components around interactive runner.
>>>> I'll share the document in our email thread too.
>>>>
>>>> The truth is since interactive runner is not yet a recognized runner as
>>>> part of the Beam SDK (and it's fundamentally a wrapper around direct
>>>> runner), we are not touching any Beam SDK components.
>>>> We'll not change any behavior of existing Beam SDK and we'll try our
>>>> best to keep it that way in the future.
>>>>
>>>
>> My main concern at this point is the introduction of new concepts, even
>> though these are not changing other parts of the Beam SDKs. It would be
>> good to see at least an alternative option covered in the design document.
>> The reason is each additional concept adds to the mental load of users. And
>> also concepts from interactive Beam will shift user's expectations of Beam
>> even though there are not direct SDK modifications.
>>
>>
>>>
>>>> In the meantime, I'll work on other components orthogonal to Beam such
>>>> as Pipeline Display and Data Visu

Re: Try to understand "Output timestamps must be no earlier than the timestamp of the current input"

2019-08-20 Thread Robert Bradshaw
The original timestamps are probably being assigned in the
watchForNewFiles transform, which is also setting the watermark:

https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L668

Until https://issues.apache.org/jira/browse/BEAM-644 is resolved, it
probably makes sense to be able to customize the lag here.

On Fri, Aug 16, 2019 at 6:44 PM Chengzhi Zhao  wrote:
>
> Hi Theodore,
>
> Thanks again for your insight and help. I'd like to learn more about how we 
> got the timestamp from WindowedValue initially from +dev@beam.apache.org
>
> -Chengzhi
>
> On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu  wrote:
>>
>> Hi Chengzhi,
>>
>> I'm not completely sure where/how the timestamp is set for a ProcessContext 
>> object. Here is the error code found within the Apache Beam repo.
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
>> which makes reference to `elem.getTimestamp()` where elem is a WindowedValue.
>>
>> I am thinking +dev@beam.apache.org can offer some insight. Would be 
>> interested to find out more myself.
>>
>> -Theo
>>
>> On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao  
>> wrote:
>>>
>>> Hi Theodore,
>>>
>>> Thanks for your reply. This is just a simple example that I tried to 
>>> understand how event time works in Beam. I could have more fields and I 
>>> would have an event time for each of record, so I tried to let Beam know 
>>> which filed is the event time to use for later windowing and computation.
>>>
>>> I think we you mentioned the probable reason sounds reasonable, I am still 
>>> trying to figure out in the error message "current input 
>>> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it.
>>>
>>> Thanks a lot for your help.
>>>
>>> -- Chengzhi
>>>
>>> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu  wrote:

 Hi Chengzhi,

 Are you simply trying to emit the timestamp onward? Why not just use 
 `out.output` with an PCollection?

 static class ReadWithEventTime extends DoFn {
 @DoFn.ProcessElement
 public void processElement(@Element String line, 
 OutputReceiver out){
 out.output(new Instant(Long.parseLong(line)));
 }
 }

 You can also output the line itself as a PCollection. If you line 
 has additional information to parse, consider a KeyValue Pair 
 https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html
  where you can emit both some parsed context of the string and the 
 timestamp.

 The probable reason why outputWithTimestamp doesn't work with older times 
 is that the timestamp emitted is used specifically for windowing and for 
 streaming type Data pipelines to determine which window each record 
 belongs for aggregations.

 -Theo


 On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao  
 wrote:
>
> Hi folks,
>
> I am new to Beam and try to play with some example, I am running Beam 
> 2.14 with Direct runner to read some files (I continue generated).
>
> I am facing this error: Cannot output with timestamp 
> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the 
> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the 
> allowed skew (0 milliseconds). I searched online but still don't quite 
> understand it so I am asking here for some help.
>
> A file has some past timestamp in it:
> 1565958615120
> 1565958615120
> 1565958615121
>
> My code looks something like this:
>
>static class ReadWithEventTime extends DoFn {
> @ProcessElement
> public void processElement(@Element String line, 
> OutputReceiver out){
> out.outputWithTimestamp(line, new Instant(Long.parseLong(line)));
> }
> }
>
> public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
>
> String sourcePath = new File("files/").getPath();
>
> PCollection data = pipeline.apply("ReadData",
> TextIO.read().from(sourcePath + "/test*")
> .watchForNewFiles(Duration.standardSeconds(5), 
> Watch.Growth.never()));
>
> data.apply("ReadWithEventTime", ParDo.of(new 
> ReadWithEventTime()));
>
> pipeline.run().waitUntilFinish();
>
> }
>
>
> I am trying to understand in the error message where "current input 
> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark 
> when I start my application? If that's the case, is there a way that I 
> can change the initial watermark?
>
> Also, I can setup `withAllowedTimestampSkew` but it looks like it has 
>

Re: [PROPOSAL] An initial Schema API in Python

2019-08-20 Thread Robert Bradshaw
On Mon, Aug 19, 2019 at 5:44 PM Ahmet Altay  wrote:
>
>
>
> On Mon, Aug 19, 2019 at 9:56 AM Brian Hulette  wrote:
>>
>>
>>
>> On Fri, Aug 16, 2019 at 5:17 PM Chad Dombrova  wrote:

 >> Agreed on float since it seems to trivially map to a double, but I’m 
 >> torn on int still. While I do want int type hints to work, it doesn’t 
 >> seem appropriate to map it to AtomicType.INT64, since it has a 
 >> completely different range of values.
 >>
 >> Let’s say we used native int for the runtime field type, not just as a 
 >> schema declaration for numpy.int64. What is the real world fallout from 
 >> this? Would there be data loss?
 >
 > I'm not sure I follow the question exactly, what is the interplay 
 > between int and numpy.int64 in this scenario? Are you saying that 
 > np.int64 is used in the schema declaration, but we just use native int 
 > at runtime, and check the bit width when encoding?
 >
 > In any case, I don't think the real world fallout of using int is nearly 
 > that dire. I suppose data loss is possible if a poorly designed pipeline 
 > overflows an int64 and crashes,

 The primary risk is that it *won't* crash when overflowing an int64,
 it'll just silently give the wrong answer. That's much less safe than
 using a native int and then actually crashing in the case it's too
 large at the point one tries to encode it.
>>>
>>>
>>> If the behavior of numpy.int64 is less safe than int, and both support 
>>> 64-bit integers, and int is the more intuitive type to use, then that seems 
>>> to make a strong case for using int rather than numpy.int64.
>>>
>>
>> I'm not sure we established numpy.int64 is less safe, just that a silent 
>> overflow is a risk.

Silent overflows are inherently less safe, especially for a language
where users in general never have to deal with this.

>> By default numpy will just log a warning when an overflow occurs, so it's 
>> not totally silent, but definitely risky. numpy can however be made to throw 
>> an exception when an overflow occurs with `np.seterr(over='raise')`.

Warning logs on remote machines are unlikely to ever be seen. Even if
one knew about the numpy setting (keep in mind the user may not ever
directly user or import numpy), it doesn't seem to work (and one would
have to set it on the remote workers, or propagate this setting if set
in the main program).

In [1]: import numpy as np
In [2]: np.seterr(over='raise')  # returns previous value
Out[2]: {'divide': 'warn', 'invalid': 'warn', 'over': 'warn', 'under': 'ignore'}
In [3]: np.int64(2**36) * np.int64(2**36)
Out[3]: 0

>> Regardless of what type is used in the typing representation of a schema, 
>> we've established that RowCoder.encode should accept anything convertible to 
>> an int for integer fields. So it will need to check it's width and raise an 
>> error if it's too large.
>> I added some tests last week to ensure that RowCoder does this [1]. However 
>> they're currently skipped because I'm unsure of the proper place to raise 
>> the error. I wrote up the details in a comment [2] (sorry I did a force push 
>> so the comment doesn't show up in the appropriate place).
>>
>> Note that when decoding an INT32/64 field RowCoder still produces plain old 
>> ints (since it relies on VarIntCoder), so int really is the runtime type, 
>> and the numpy types are just for the typing representation of a schema.
>>
>> I also updated my PR to accept int, float, and str in the typing 
>> representation of a schema, and added the following summary of type mappings 
>> to typehints.schema [1], since it's not readily apparent from the code 
>> itself:
>
>
> Cool!
>
>>
>>
>> Python  Schema
>> np.int8 <-> BYTE
>> np.int16<-> INT16
>> np.int32<-> INT32
>> np.int64<-> INT64
>> int ---/
>> np.float32  <-> FLOAT
>> np.float64  <-> DOUBLE
>> float   ---/
>> bool<-> BOOLEAN
>> The mappings for STRING and BYTES are different between python 2 and python 
>> 3,
>> because of the changes to str:
>> py3:
>> str/unicode <-> STRING
>> bytes   <-> BYTES
>> ByteString  ---/
>> py2:
>> unicode <-> STRING
>> str/bytes   ---/
>> ByteString  <-> BYTES
>>
>> As you can see, int and float typings can now be used to create a schema 
>> with an INT64 or DOUBLE attribute, but when creating an anonymous NamedTuple 
>> sub-class from a schema, the numpy types are preferred. I prefer that 
>> approach, if only for symmetry with the other integer and floating point 
>> types, but I can change it to prefer int/float if I'm the only one that 
>> feels that way.

Just to be clear, this is just talking about the schema itself (as at
that level, due to the many-to-one mapping above, no distinction is
made between int vs. int64). The runtime types are still int/float,
right?

> Just an opinion: As a user I would expect anonymous types created for me to 
> have n

Re: (mini-doc) Beam (Flink) portable job templates

2019-08-20 Thread Robert Bradshaw
The point of expansion services is to run at pipeline construction
time so that the caller can build on top of the outputs. E.g. we're
hoping to expose Beam's SQL transforms to other languages via an
expansion service and *not* duplicate the logic of parsing the SQL
statements to determine the type(s) of the outputs. Even for simpler
IOs, we would like to take advantage of schema information (e.g.
looked up at construction time) to produce results and validate (or
even inform) subsequent construction.

I think we're also making a mistake in talking about "the" expansion
service here, as if there was only one well defined service that all
pipenes used. If we go the route of deferring some expansion to the
runner, we need a way of naming expansion services. It seems like this
proposal is simply isomorphic to defining new primitive transforms
which some (all?) runners are just expected to understand.

On Tue, Aug 20, 2019 at 10:11 AM Thomas Weise  wrote:
>
>
>
> On Tue, Aug 20, 2019 at 8:56 AM Lukasz Cwik  wrote:
>>
>>
>>
>> On Mon, Aug 19, 2019 at 5:52 PM Ahmet Altay  wrote:
>>>
>>>
>>>
>>> On Sun, Aug 18, 2019 at 12:34 PM Thomas Weise  wrote:
>>>>
>>>> There is a PR open for this: https://github.com/apache/beam/pull/9331
>>>>
>>>> (it wasn't tagged with the JIRA and therefore not linked)
>>>>
>>>> I think it is worthwhile to explore how we could further detangle the 
>>>> client side Python and Java dependencies.
>>>>
>>>> The expansion service is one more dependency to consider in a build 
>>>> environment. Is it really necessary to expand external transforms prior to 
>>>> submission to the job service?
>>>
>>>
>>> +1, this will make it easier to use external transforms from the already 
>>> familiar client environments.
>>>
>>
>>
>> The intent is to make it so that you CAN (not MUST) run an expansion service 
>> separate from a Runner. Creating a single endpoint that hosts both the Job 
>> and Expansion service is something that gRPC does very easily since you can 
>> host multiple service definitions on a single port.
>
>
> Yes, that's fine. The point here is when the expansion occurs. I believe the 
> runner can also invoke the expansion service, thereby eliminating the 
> expansion service interaction from the client side.
>
>
>>
>>
>>>>
>>>>
>>>> Can we come up with a partially constructed proto that can be produced by 
>>>> just running the Python entry point? Note this would also require pushing 
>>>> the pipeline options parsing into the job service.
>>>
>>>
>>> Why would this require pushing the pipeline options parsing to the job 
>>> service. Assuming that python will have enough idea about the external 
>>> transform what options it will need. The necessary bit could be converted 
>>> to arguments and be part of that partially constructed proto.
>>>
>>>>
>>>>
>>>> On Sun, Aug 18, 2019 at 12:01 PM enrico canzonieri  
>>>> wrote:
>>>>>
>>>>> I found the tracking ticket at BEAM-7966
>>>>>
>>>>> On Sun, Aug 18, 2019 at 11:59 AM enrico canzonieri 
>>>>>  wrote:
>>>>>>
>>>>>> Is this alternative still being considered? Creating a portable jar 
>>>>>> sounds like a good solution to re-use the existing runner specific 
>>>>>> deployment mechanism (e.g. Flink k8s operator) and in general simplify 
>>>>>> the deployment story.
>>>>>>
>>>>>> On Fri, Aug 9, 2019 at 12:46 AM Robert Bradshaw  
>>>>>> wrote:
>>>>>>>
>>>>>>> The expansion service is a separate service. (The flink jar happens to
>>>>>>> bring both up.) However, there is negotiation to receive/validate the
>>>>>>> pipeline options.
>>>>>>>
>>>>>>> On Fri, Aug 9, 2019 at 1:54 AM Thomas Weise  wrote:
>>>>>>> >
>>>>>>> > We would also need to consider cross-language pipelines that 
>>>>>>> > (currently) assume the interaction with an expansion service at 
>>>>>>> > construction time.
>>>>>>> >
>>>>>>> > On Thu, Aug 8, 2019, 4:38 PM Kyle Weaver  wrote:
>>>>>>> >>
>>>>>>> >> > It might also be usef

Re: Java 11 compatibility question

2019-08-09 Thread Robert Bradshaw
On Fri, Aug 9, 2019 at 12:48 PM Michał Walenia 
wrote:

> From what I understand, the Java 8 -> 11 testing isn't in essence similar
> to py2 -> py3 checks.
>

True. Python 3 is in many ways a new language, and much less (and more
subtly) backwards compatible. You also can't "link" Python 3 code against
Python 2 code the way you can use old Java classes in new JVMs.


> In the case of Java, all we want to do is check if Beam downloaded by
> users from Maven (and compiled with JDK8) won't act up if used from a
> JDK/JRE 11 environment. We don't want to migrate the tool itself to a newer
> language version. As I mentioned in my previous email, there already are
> test suites checking compatibility - ValidatesRunner on Direct and Dataflow
> runners running in normal and portable mode.
> Those tests keep passing, so I believe we're mostly fine regarding
> compatibility.
> All I want to know is - is this enough?
> How else can we test Beam to be sure it works in JRE 11? After several
> accidental launches of build tasks in JDK 11, I am sure that it's not
> buildable with it, but this is not the compatibility type we want to check.
>

Well, we will want this eventually. Before that, we'll want to be sure
users can build their Java 11 code against our artifacts.


>
> Thank you for your replies,
> Michal
>
>
> On Thu, Aug 8, 2019 at 10:25 PM Valentyn Tymofieiev 
> wrote:
>
>> From Python 3 migration standpoint, some high level pillars that increase
>> our confidence are:
>> - Test coverage: (PreCommit, PostCommit), creating a system to make it
>> easy for add test coverage in new language for new functionality.
>> - Support of new language version by core runners + ValidatesRunner test
>> coverage.
>> - Test of time: offer new functionality in a few releases, monitor &
>> address user feedback.
>>
>> Dependency audit and critical feature support in new language, as
>> mentioned by others, are important  points. If you are curious about
>> detailed AIs that went into Python 3 support, feel free to look into
>> BEAM-1251 or Py3 Kanban Board (
>> https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=245&view=detail
>> ).
>>
>> Thanks,
>> Valentyn
>>
>>
>> On Thu, Aug 8, 2019 at 7:24 PM Mark Liu  wrote:
>>
>>> Some actions we did for py2 to py3 works:
>>> - Check and resolve incompatible dependencies.
>>> - Enable py3 lint.
>>> - Fill feature gaps between py2 and py3 (e.g. new py3 container, new
>>> solution for type hint)
>>> - Add unit tests, integration tests and other tests on py3 for coverage.
>>> - Release (p3) and deprecation (p2) plan.
>>>
>>> Hope this helps on Java upgrade.
>>>
>>> Mark
>>>
>>> On Wed, Aug 7, 2019 at 3:19 PM Ahmet Altay  wrote:
>>>


 On Wed, Aug 7, 2019 at 12:21 PM Elliotte Rusty Harold <
 elh...@ibiblio.org> wrote:

> gRPC bug here: https://github.com/grpc/grpc-java/issues/3522
>
> google-cloud-java bug:
> https://github.com/googleapis/google-cloud-java/issues/5760
>
> Neither has a cheap or easy fix, I'm afraid. Commenting on these
> issues might help us prove that there's a demand to priorotize these
> compared to other work. If anyone has a support contract and could
> file a ticket asking for a fix, that would help even more.
>
> Those are the two I know about. There might be others elsewhere in the
> dependency tree.
>
>
> On Wed, Aug 7, 2019 at 2:25 PM Lukasz Cwik  wrote:
> >
> > Since java8 -> java11 is similar to python2 -> python3 migration,
> what was the acceptance criteria there?
>

 I do not remember formally discussing this. The bar used was, all
 existing tests will pass for python2 and python3. New tests will be added
 for python3 specific features. (To avoid any confusion this bar has not
 been cleared yet.)

 cc: +Valentyn Tymofieiev  could add more details.


> >
> > On Wed, Aug 7, 2019 at 1:54 PM Elliotte Rusty Harold <
> elh...@ibiblio.org> wrote:
> >>
> >>
> >>
> >> On Wed, Aug 7, 2019 at 9:41 AM Michał Walenia <
> michal.wale...@polidea.com> wrote:
> >>>
> >>>
> >>> Are these tests sufficient to say that we’re java 11 compatible?
> What other aspects do we need to test to be able to say that?
> >>>
> >>>
> >>
> >> Are any packages split across multiple jar files, including
> packages beam dependns on? That's the one that's bitten some other
> projects, including google-cloud-java and gRPC. If so, beam is not going 
> to
> work with the module system.
> >>
> >> Work is ongoing to fix splitn packages in both gRPC and
> google-cloud-java, but we're not very far down that path and I think it's
> going to be an API breaking change.
> >>
> > Romain pointed this out earlier and I fixed the last case of
> packages being split across multiple jars within Apache Beam but as you
> point out our transitive dependencies are not re

Re: Write-through-cache in State logic

2019-08-09 Thread Robert Bradshaw
The question is whether the SDK needs to wait for the StateResponse to
come back before declaring the bundle done. The proposal was to not
send the cache token back as part of an append StateResponse [1], but
pre-provide it as part of the bundle request.

Thinking about this some more, if we assume the state response was
successfully applied, there's no reason for the SDK to block the
bundle until it has its hands on the cache token--we can update the
cache once the StateResponse comes back whether or not the bundle is
still active. On the other hand, the runner needs a way to assert it
has received and processed all StateRequests from the SDK associated
with a bundle before it can declare the bundle complete (regardless of
the cache tokens), so this might not be safe without some extra
coordination (e.g. the ProcessBundleResponse indicating the number of
state requests associated with a bundle).

[1] 
https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627

On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik  wrote:
>
> The purpose of the new state API call in BEAM-7000 is to tell the runner that 
> the SDK is now blocked waiting for the result of a specific state request and 
> it should be used for fetches (not updates) and is there to allow for SDKs to 
> differentiate readLater (I will need this data at some point in time in the 
> future) from read (I need this data now). This comes up commonly where the 
> user prefetches multiple state cells and then looks at their content allowing 
> the runner to batch up those calls on its end.
>
> The way it can be used for clear+append is that the runner can store requests 
> in memory up until some time/memory limit or until it gets its first 
> "blocked" call and then issue all the requests together.
>
>
> On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw  wrote:
>>
>> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise  wrote:
>> >
>> > That would add a synchronization point that forces extra latency 
>> > especially in streaming mode.
>> >
>> > Wouldn't it be possible for the runner to assign the token when starting 
>> > the bundle and for the SDK to pass it along the state requests? That way, 
>> > there would be no need to batch and wait for a flush.
>>
>> I think it makes sense to let the runner pre-assign these state update
>> tokens rather than forcing a synchronization point.
>>
>> Here's some pointers for the Python implementation:
>>
>> Currently, when a DoFn needs UserState, a StateContext object is used
>> that converts from a StateSpec to the actual value. When running
>> portably, this is FnApiUserStateContext [1]. The state handles
>> themselves are cached at [2] but this context only lives for the
>> lifetime of a single bundle. Logic could be added here to use the
>> token to share these across bundles.
>>
>> Each of these handles in turn invokes state_handler.get* methods when
>> its read is called. (Here state_handler is a thin wrapper around the
>> service itself) and constructs the appropriate result from the
>> StateResponse. We would need to implement caching at this level as
>> well, including the deserialization. This will probably require some
>> restructoring of how _StateBackedIterable is implemented (or,
>> possibly, making that class itself cache aware). Hopefully that's
>> enough to get started.
>>
>> [1] 
>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402
>> [2] 
>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436
>> .
>>
>> > On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik  wrote:
>> >>
>> >> I believe the intent is to add a new state API call telling the runner 
>> >> that it is blocked waiting for a response (BEAM-7000).
>> >>
>> >> This should allow the runner to wait till it sees one of these I'm 
>> >> blocked requests and then merge + batch any state calls it may have at 
>> >> that point in time allowing it to convert clear + appends into set calls 
>> >> and do any other optimizations as well. By default, the runner would have 
>> >> a time and space based limit on how many outstanding state calls there 
>> >> are before choosing to resolve them.
>> >>
>> >> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik  wrote:
>> >>>
>> >>> Now I see what you mean.
>> >>>
>> >>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise  wrote:
>> >

Re: (mini-doc) Beam (Flink) portable job templates

2019-08-09 Thread Robert Bradshaw
The expansion service is a separate service. (The flink jar happens to
bring both up.) However, there is negotiation to receive/validate the
pipeline options.

On Fri, Aug 9, 2019 at 1:54 AM Thomas Weise  wrote:
>
> We would also need to consider cross-language pipelines that (currently) 
> assume the interaction with an expansion service at construction time.
>
> On Thu, Aug 8, 2019, 4:38 PM Kyle Weaver  wrote:
>>
>> > It might also be useful to have the option to just output the proto and 
>> > artifacts, as alternative to the jar file.
>>
>> Sure, that wouldn't be too big a change if we were to decide to go the SDK 
>> route.
>>
>> > For the Flink entry point we would need to allow for the job server to be 
>> > used as a library.
>>
>> We don't need the whole job server, we only need to add a main method to 
>> FlinkPipelineRunner [1] as the entry point, which would basically just do 
>> the setup described in the doc then call FlinkPipelineRunner::run.
>>
>> [1] 
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java#L53
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>
>>
>> On Thu, Aug 8, 2019 at 4:21 PM Thomas Weise  wrote:
>>>
>>> Hi Kyle,
>>>
>>> It might also be useful to have the option to just output the proto and 
>>> artifacts, as alternative to the jar file.
>>>
>>> For the Flink entry point we would need to allow for the job server to be 
>>> used as a library. It would probably not be too hard to have the Flink job 
>>> constructed via the context execution environment, which would require no 
>>> changes on the Flink side.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Thu, Aug 8, 2019 at 9:52 AM Kyle Weaver  wrote:
>>>>
>>>> Re Javaless/serverless solution:
>>>> I take it this would probably mean that we would construct the jar 
>>>> directly from the SDK. There are advantages to this: full separation of 
>>>> Python and Java environments, no need for a job server, and likely a 
>>>> simpler implementation, since we'd no longer have to work within the 
>>>> constraints of the existing job server infrastructure. The only downside I 
>>>> can think of is the additional cost of implementing/maintaining jar 
>>>> creation code in each SDK, but that cost may be acceptable if it's simple 
>>>> enough.
>>>>
>>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>>>
>>>>
>>>> On Thu, Aug 8, 2019 at 9:31 AM Thomas Weise  wrote:
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 8, 2019 at 8:29 AM Robert Bradshaw  
>>>>> wrote:
>>>>>>
>>>>>> > Before assembling the jar, the job server runs to create the 
>>>>>> > ingredients. That requires the (matching) Java environment on the 
>>>>>> > Python developers machine.
>>>>>>
>>>>>> We can run the job server and have it create the jar (and if we keep
>>>>>> the job server running we can use it to interact with the running
>>>>>> job). However, if the jar layout is simple enough, there's no need to
>>>>>> even build it from Java.
>>>>>>
>>>>>> Taken to the extreme, this is a one-shot, jar-based JobService API. We
>>>>>> choose a standard layout of where to put the pipeline description and
>>>>>> artifacts, and can "augment" an existing jar (that has a
>>>>>> runner-specific main class whose entry point knows how to read this
>>>>>> data to kick off a pipeline as if it were a users driver code) into
>>>>>> one that has a portable pipeline packaged into it for submission to a
>>>>>> cluster.
>>>>>
>>>>>
>>>>> It would be nice if the Python developer doesn't have to run anything 
>>>>> Java at all.
>>>>>
>>>>> As we just discussed offline, this could be accomplished by  including 
>>>>> the proto that is produced by the SDK into the pre-existing jar.
>>>>>
>>>>> And if the jar has an entry point that creates the Flink job in the 
>>>>> prescribed manner [1], it can be directly submitted to the Flink REST 
>>>>> API. That would allow for Java free client.
>>>>>
>>>>> [1] 
>>>>> https://lists.apache.org/thread.html/6db869c53816f4e2917949a7c6992c2b90856d7d639d7f2e1cd13768@%3Cdev.flink.apache.org%3E
>>>>>


Re: Beam Python Portable Runner - Adding timeout to JobServer grpc calls

2019-08-09 Thread Robert Bradshaw
If we do provide a configuration value for this, I would make it have a
fairly large default and ure-use the flag for all RPCs of similar nature,
not tweeks for this particular service only.

On Fri, Aug 9, 2019 at 2:58 AM Ahmet Altay  wrote:

> Default plus a flag to override sounds reasonable. Although from Dataflow
> experience I do not remember timeouts causing issues and each new added
> flag adds complexity. What do others think?
>
> On Thu, Aug 8, 2019 at 11:38 AM Kyle Weaver  wrote:
>
>> If we do make a default, I still think it should be configurable via a
>> flag. I can't think of why the prepare, stage artifact, job state, or job
>> message requests might take more than 60 seconds, but you never know,
>> particularly with artifact staging, which might be uploading artifacts to
>> distributed storage.
>>
>> I assume the run request itself would not be subject to timeouts, as
>> running the pipeline can be assumed to take significantly longer than the
>> setup work.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>
>>
>> On Thu, Aug 8, 2019 at 11:20 AM Enrico Canzonieri 
>> wrote:
>>
>>> Default timeout with no flag may work as well.
>>> The main consideration here is whether some api calls may take longer
>>> than 60 seconds because of the complexity of the users' Beam pipeline. E.g.
>>> Could job_service.Prepare() take longer than 60 seconds if the given Beam
>>> pipeline is extremely complex?
>>>
>>> Basically if there are cases when the user code may cause the call
>>> duration to increase to the point the timeout prevents submitting the app
>>> itself then we should consider having a flag.
>>>
>>> On 2019/08/07 20:13:12, Ahmet Altay wrote:
>>> > Could we pick a default timeout value instead of introducing a flag?
>>> We use>
>>> > 60 seconds as the default timeout for http client [1], we can do the
>>> same>
>>> > here.>
>>> >
>>> > [1]>
>>> >
>>> https://github.com/apache/beam/blob/3a182d64c86ad038692800f5c343659ab0b935b0/sdks/python/apache_beam/internal/http_client.py#L32>
>>>
>>> >
>>> > On Wed, Aug 7, 2019 at 11:53 AM enrico canzonieri >
>>> > wrote:>
>>> >
>>> > > Hello,>
>>> > >>
>>> > > I noticed that the calls to the JobServer from the Python SDK do not
>>> have>
>>> > > timeouts. If I'm not mistaken that means that the call to
>>> pipeline.run()>
>>> > > could hang forever if the JobServer is not running (or failing to
>>> start).>
>>> > > E.g.>
>>> > >
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307>
>>>
>>> > > the call to Prepare() doesn't provide any timeout value and the
>>> same>
>>> > > applies to other JobServer requests.>
>>> > > I was considering adding a --job-server-request-timeout to the>
>>> > > PortableOptions>
>>> > > >
>>> > > class to be used in the JobServer interactions inside
>>> probable_runner.py.>
>>> > > Is there any specific reason why the timeout is not currently
>>> supported?>
>>> > > Does anybody have any objection adding the jobserver timeout? I
>>> could>
>>> > > volunteer to file a ticket and submit a pr for this.>
>>> > >>
>>> > > Cheers,>
>>> > > Enrico Canzonieri>
>>> > >>
>>> >
>>>
>>>


Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread Robert Bradshaw
Could you clarify what you mean by "inconsistent" and "incorrect"? Are
elements missing/duplicated, or just batched differently?

On Fri, Aug 9, 2019 at 2:18 AM rahul patwari  wrote:
>
> I only ran in Direct runner. I will run in other runners and let you know the 
> results.
> I am not setting "streaming" when executing.
>
> On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik,  wrote:
>>
>> Have you tried running this on more than one runner (e.g. Dataflow, Flink, 
>> Direct)?
>>
>> Are you setting --streaming when executing?
>>
>> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari  
>> wrote:
>>>
>>> Hi,
>>>
>>> I am getting inconsistent results when using GroupIntoBatches PTransform.
>>> I am using Create.of() PTransform to create a PCollection from in-memory. 
>>> When a coder is given with Create.of() PTransform, I am facing the issue.
>>> If the coder is not provided, the results are consistent and correct(Maybe 
>>> this is just a coincidence and the problem is at some other place).
>>> If Batch Size is 1, results are always consistent.
>>>
>>> Not sure if this is an issue with Serialization/Deserialization (or) 
>>> GroupIntoBatches (or) Create.of() PTransform.
>>>
>>> The Java code, expected correct results, and inconsistent results are 
>>> available at https://github.com/rahul8383/beam-examples
>>>
>>> Thanks,
>>> Rahul


Re: Write-through-cache in State logic

2019-08-08 Thread Robert Bradshaw
 bundles would be capped 
>>>>>>> by the count limit. Bumping the count limit increases the throughput by 
>>>>>>> reducing the chatter over the state plane (more cache hits due to 
>>>>>>> larger bundle).
>>>>>>>
>>>>>>> The next level of investigation would involve profiling. But just by 
>>>>>>> looking at metrics, the CPU utilization on the Python worker side 
>>>>>>> dropped significantly while on the Flink side it remains nearly same. 
>>>>>>> There are no metrics for state operations on either side, I think it 
>>>>>>> would be very helpful to get these in place also.
>>>>>>>
>>>>>>> Below the stateful processing code for reference.
>>>>>>>
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> class StatefulFn(beam.DoFn):
>>>>>>> count_state_spec = userstate.CombiningValueStateSpec(
>>>>>>> 'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()), 
>>>>>>> sum)
>>>>>>> timer_spec = userstate.TimerSpec('timer', 
>>>>>>> userstate.TimeDomain.WATERMARK)
>>>>>>>
>>>>>>> def process(self, kv, count=beam.DoFn.StateParam(count_state_spec), 
>>>>>>> timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam):
>>>>>>> count.add(1)
>>>>>>> timer_seconds = (window.end.micros // 100) - 1
>>>>>>> timer.set(timer_seconds)
>>>>>>>
>>>>>>> @userstate.on_timer(timer_spec)
>>>>>>> def process_timer(self, 
>>>>>>> count=beam.DoFn.StateParam(count_state_spec), 
>>>>>>> window=beam.DoFn.WindowParam):
>>>>>>> if count.read() == 0:
>>>>>>> logging.warning("###timer fired with count %d, window %s" % 
>>>>>>> (count.read(), window))
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw  
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar  
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > Thanks Robert,
>>>>>>>> >
>>>>>>>> >  I stumble on the jira that you have created some time ago
>>>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428
>>>>>>>> >
>>>>>>>> > You also marked code where code changes are required:
>>>>>>>> > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
>>>>>>>> > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>>>>>>>> > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>>>>>>>> >
>>>>>>>> > I am willing to provide help to implement this. Let me know how I 
>>>>>>>> > can help.
>>>>>>>>
>>>>>>>> As far as I'm aware, no one is actively working on it right now.
>>>>>>>> Please feel free to assign yourself the JIRA entry and I'll be happy
>>>>>>>> to answer any questions you might have if (well probably when) these
>>>>>>>> pointers are insufficient.
>>>>>>>>
>>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw 
>>>>>>>> >  wrote:
>>>>>>>> >>
>>>>>>>> >> This is documented at
>>>>>>>> >> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>>>>>>>> >> . Note that it requires participation of both the runner and the SDK
>>>>>>>> >> (though there are no correctness issues if one or the other side 
>>>>>

Re: (mini-doc) Beam (Flink) portable job templates

2019-08-08 Thread Robert Bradshaw
On Wed, Aug 7, 2019 at 5:59 PM Thomas Weise  wrote:
>
>> > * The pipeline construction code itself may need access to cluster 
>> > resources. In such cases the jar file cannot be created offline.
>>
>> Could you elaborate?
>
>
> The entry point is arbitrary code written by the user, not limited to Beam 
> pipeline construction alone. For example, there could be access to a file 
> system or other service to fetch metadata that is required to build the 
> pipeline. Such services can be accessed when the code runs within the 
> infrastructure, but typically not in a development environment.

Yes, this may be limited to the case that the pipeline construction
can be done on the users machine before submission (remotely staging
the executing the Python (or Go, or ...) code within the
infrastructure to build the pipeline and then running the job server
there is a bit more complicated). We control the entry point from then
on.

>> > * For k8s deployment, a container image with the SDK and application code 
>> > is required for the worker. The jar file (which is really a derived 
>> > artifact) would need to be built in addition to the container image.
>>
>> Yes. For standard use, a vanilla released Beam published SDK container
>> + staged artifacts should be sufficient.
>>
>> > * To build such jar file, the user would need a build environment with job 
>> > server and application code. Do we want to make that assumption?
>>
>> Actually, it's probably much easier than that. A jar file is just a
>> zip file with a standard structure, to which one can easily add (data)
>> files without having a full build environment. The (pre-compiled) main
>> class would know how to read this data to construct the pipeline and
>> kick off the job just like any other Flink job.
>
> Before assembling the jar, the job server runs to create the ingredients. 
> That requires the (matching) Java environment on the Python developers 
> machine.

We can run the job server and have it create the jar (and if we keep
the job server running we can use it to interact with the running
job). However, if the jar layout is simple enough, there's no need to
even build it from Java.

Taken to the extreme, this is a one-shot, jar-based JobService API. We
choose a standard layout of where to put the pipeline description and
artifacts, and can "augment" an existing jar (that has a
runner-specific main class whose entry point knows how to read this
data to kick off a pipeline as if it were a users driver code) into
one that has a portable pipeline packaged into it for submission to a
cluster.


Re: [PROPOSAL] An initial Schema API in Python

2019-08-08 Thread Robert Bradshaw
On Wed, Aug 7, 2019 at 11:12 PM Brian Hulette  wrote:
>
> Thanks for all the suggestions, I've added responses inline.
>
> On Wed, Aug 7, 2019 at 12:52 PM Chad Dombrova  wrote:
>>
>> There’s a lot of ground to cover here, so I’m going to pull from a few 
>> different responses.
>>
>> 
>>
>> numpy ints
>>
>> A properly written library should accept any type implementing the int (or 
>> index) methods in place of an int, rather than doing explicit type checks
>>
>> Yes, but the reality is that very very few actually do this, including Beam 
>> itself (check the code for Timestamp and Duration, to name a few).
>>
>> Which brings me to my next topic:
>>
>> I tested this out with mypy and it would not be compatible:
>>
>> def square(x: int):
>> return x*x
>>
>> square(np.int16(32)) # mypy error
>>
>> The proper way to check this scenario is using typing.SupportsInt. Note that 
>> this only guarantees that __int__ exists, so you still need to cast to int 
>> if you want to do anything with the object:
>>
>> def square(x: typing.SupportsInt) -> int:
>> if not isinstance(x, int):
>> x = int(x)
>> return x*x
>>
>> square('foo')  # error!
>> square(1.2)  # ok
>
>  Yep I came across this while writing my last reply. I agree though it seems 
> unlikely that many libraries actually do this.
>
>> 
>>
>> Native python ints
>>
>> Agreed on float since it seems to trivially map to a double, but I’m torn on 
>> int still. While I do want int type hints to work, it doesn’t seem 
>> appropriate to map it to AtomicType.INT64, since it has a completely 
>> different range of values.
>>
>> Let’s say we used native int for the runtime field type, not just as a 
>> schema declaration for numpy.int64. What is the real world fallout from 
>> this? Would there be data loss?
>
> I'm not sure I follow the question exactly, what is the interplay between int 
> and numpy.int64 in this scenario? Are you saying that np.int64 is used in the 
> schema declaration, but we just use native int at runtime, and check the bit 
> width when encoding?
>
> In any case, I don't think the real world fallout of using int is nearly that 
> dire. I suppose data loss is possible if a poorly designed pipeline overflows 
> an int64 and crashes,

The primary risk is that it *won't* crash when overflowing an int64,
it'll just silently give the wrong answer. That's much less safe than
using a native int and then actually crashing in the case it's too
large at the point one tries to encode it.

> but that's possible whether we use int or np.int64 at runtime. I'm just 
> saying that a user could be forgiven for thinking that they're safe from 
> overflows if they declare a schema as NamedTuple('Foo', 
> [('to_infinity_and_beyond', int)]), but they shouldn't make the same mistake 
> when they explicitly call it an int64.

Yes. But for schemas to be maximally useful, we'll want to be able to
infer them from all sorts of things that aren't written with Beam in
mind (e.g. external data classes, function annotations) and rejecting
the builtin int type will be a poor user experience here.

>> 
>>
>> Python3-only
>>
>> No need to worry about 2/3 compatibility for strings, we could just use str
>>
>> This is already widely handled throughout the Beam python SDK using the 
>> future/past library, so it seems silly to give up on this solution for 
>> schemas.
>>
>> On this topic, I added some comments to the PR about using 
>> past.builtins.unicode instead of numpy.unicode. They’re the same type, but 
>> there’s no reason to get this via numpy, when everywhere else in the code 
>> gets it from past.
>>
>> We could just use bytes for byte arrays (as a shorthand for 
>> typing.ByteString [1])
>>
>> Neat, but in my obviously very biased opinion it is not worth cutting off 
>> python2 users over this.
>
> Ok I won't do this :) I wasn't aware of typing.Sequence, that does seem like 
> a good fit. The other two items are just nice-to-haves, I'm happy to work 
> around those and use Sequence for arrays instead.

I would imagine that we could accept bytes or typing.ByteString for
BYTES, with only Python 2 users having to do the latter. (In both
Python 2 and Python 3 one would use str for STRING, it would decode to
past.builtins.unicode. This seems to capture the intent better than
mapping str to BYTES in Python 2 only.)


Re: Brief of interactive Beam

2019-08-08 Thread Robert Bradshaw
Thanks for the note. Are there any associated documents worth sharing as
well? More below.

On Wed, Aug 7, 2019 at 9:39 PM Ning Kang  wrote:

> To whom may concern,
>
> This is Ning from Google. We are currently making efforts to leverage an
> interactive runner under python beam sdk.
>
> There is already an interactive Beam (iBeam for short) runner with jupyter
> notebook in the repo
> 
> .
> Following the instructions on that page, one can set up an interactive
> environment to develop and execute Beam pipeline interactively.
>
> However, there are many issues with existing iBeam. One issue is that it
> uses a concept of leaf PCollection to cache and materialize intermediate
> PCollection. If the user wants to reuse/introspect a non-leaf PCollection,
> the interactive runner will run into errors.
>
> Our initial effort will be fixing the existing issues. And we also want to
> make iBeam easy to use. Since iBeam uses the same model Beam uses, there
> isn't really any difference for users between creating a pipeline with
> interactive runner and other runners.
> So we want to minimize the interfaces a user needs to learn while giving
> the user some capability to interact with the interactive environment.
>
> See this initial PR , the
> interactive_beam module will provide mainly 4 interfaces:
>
>- For advanced users who define pipeline outside __main__, let them
>tell current interactive environment where they define their pipeline:
>watch()
>   - This is very useful for tests where pipeline can be defined in
>   test methods.
>   - If the user simply creates pipeline in a Jupyter notebook or a
>   plain Python script, they don't have to know/use this feature at all.
>
>
This is for using visualize() below, or building further on the pipeline,
right?


>
>- Let users create an interactive pipeline: create_pipeline()
>   - invoking create_pipeline(), the user gets a Pipeline object that
>   works as any other Pipeline object created from apache_beam.Pipeline()
>   - However, the pipeline object p, when invoking p.run(), does some
>   extra interactive magic.
>   - We'll support interactive execution for DirectRunner at this
>   moment.
>
> How is this different than creating a pipeline with the interactive
runner? It'd be nice to reduce the number of new concepts a user needs to
know (and also reduce the number of changes needed to move from interactive
to non-interactive). Is there any need to limit this to the Direct runner?

>
>- Let users run the interactive pipeline as a normal pipeline:
>run_pipeline()
>   - In an interactive environment, a user only needs to add and
>   execute 1 line of code run_pipeline(pipeline) to execute any existing
>   interactive pipeline object as normal pipeline in any selected platform.
>   - We'll probably support Dataflow only. Other implementations can
>   be added though.
>
> Again, how is this different than pipeline.run()? What features require
limiting this to only certain runners?

>
>- Let users introspect any intermediate PCollection they have handler
>to: visualize()
>   - If a user ever writes pcoll = p | "Some Transform" >>
>   some_transform() ..., they can visualize(pcoll) once the pipeline p is
>   executed.
>   - p can be batch or streaming
>   - The visualization will be some plot graph of data for the given
>   PCollection as if it's materialized. If the PCollection is unbounded, 
> the
>   graph is dynamic.
>
> The PR will implement 1 and 2.
>
> We'll use https://issues.apache.org/jira/browse/BEAM-7923 as the top
> level JIRA and add blocking JIRAs as development goes.
>
> External Beam users will not worry about any of the underlying
> implementation details.
> Except the 4 interfaces above, they learn and write normal Beam code and
> can execute the pipeline immediately when they are done with prototyping.
>
> Ning.
>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-08-07 Thread Robert Bradshaw
d deserialize the timestamp, window and pane 
>>> > properties in Flink. But currently FullWindowedValueCoder is used by 
>>> > default in WireCoders.addWireCoder, I suggest to make the coder 
>>> > configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)
>>> >
>>> > 5) Currently if a coder is not defined in StandardCoders, it will be 
>>> > wrapped with LengthPrefixedCoder (WireCoders.addWireCoder -> 
>>> > LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few 
>>> > coders are defined in StandardCoders. It means that for most coder, a 
>>> > length will be added to the serialized bytes which is not necessary in my 
>>> > thoughts. My suggestion is maybe we can add some interfaces or tags for 
>>> > the coder which indicate whether the coder is needed a length prefix or 
>>> > not.
>>> >
>>> > 6) Set log level according to PipelineOption in Python SDK Harness. 
>>> > Currently the log level is set to INFO by default.
>>> >
>>> > 7) Allows to start up StatusServer according to PipelineOption in Python 
>>> > SDK Harness. Currently the StatusServer is start up by default.
>>> >
>>> > Although I put 3) 4) 5) into the "Nice to Have" as they are performance 
>>> > related, I still think they are very critical for Python UDF execution 
>>> > performance.
>>> >
>>> > Open questions:
>>> > -
>>> > 1) Which coders should be / can be defined in StandardCoders?
>>> >
>>> > Currently we are preparing the design of how to support Python UDF in 
>>> > Flink based on the Beam portability framework and we will bring up the 
>>> > discussion in Flink community. We may propose more changes for Beam 
>>> > during that time and may need more support from Beam community.
>>> >
>>> > To be honest, I'm not an expert of Beam and so please feel free to 
>>> > correct me if my understanding is wrong. Welcome any feedback.
>>> >
>>> > Best,
>>> > Jincheng
>>>
>>>
>>>
>>>
>>>
>>>
>>> Cheers,
>>> Max
>>>
>>> On 31.07.19 12:16, Robert Bradshaw wrote:
>>> > Yep, Python support under active development,
>>> > e.g. https://github.com/apache/beam/pull/9188
>>> >
>>> > On Wed, Jul 31, 2019 at 9:24 AM jincheng sun >> > <mailto:sunjincheng...@gmail.com>> wrote:
>>> >
>>> > Thanks a lot for sharing the link. I take a quick look at the design
>>> > and the implementation in Java and think it could address my
>>> > concern. It seems that it's still not supported in the Python SDK
>>> > Harness. Is there any plan on that?
>>> >
>>> > Robert Bradshaw mailto:rober...@google.com>>
>>> > 于2019年7月30日周二 下午12:33写道:
>>> >
>>> > On Tue, Jul 30, 2019 at 11:52 AM jincheng sun
>>> > mailto:sunjincheng...@gmail.com>> 
>>> > wrote:
>>> >
>>> >
>>> > Is it possible to add an interface such as
>>> > `isSelfContained()` to the `Coder`? This interface
>>> > indicates
>>> > whether the serialized bytes are self contained. If
>>> > it returns true, then there is no need to add a
>>> > prefixing length.
>>> > In this way, there is no need to introduce an extra
>>> > protocol,  Please correct me if I missed something :)
>>> >
>>> >
>>> > The question is how it is self contained. E.g.
>>> > DoubleCoder is self contained because it always uses
>>> > exactly 8 bytes, but one needs to know the double coder
>>> > to leverage this. VarInt coder is self-contained a
>>> > different way, as is StringCoder (which does just do
>>> > prefixing).
>>> >
>>> >
>>> > Yes, you are right! I think it again that we can not add
>>> > such interface for the coder, due to runner can not call it.
>>> > And just one more thought: does it make sense to add a
>>> > method such as "registerSelfContained Coder(xxx)" or so to
>>> > let users register the coders which can be processed in the
>>> > SDK Harness?  It's the responsibility of the SDK harness to
>>> > ensure that the coder is supported.
>>> >
>>> >
>>> > Basically, a "please don't add length prefixing to this coder,
>>> > assume everyone else can understand it (and errors will ensue if
>>> > anyone doesn't)" at the user level? Seems a bit dangerous. Also,
>>> > there is not "the SDK"--there may be multiple other SDKs in
>>> > general, and of course runner components, some of which may
>>> > understand the coder in question and some of which may not.
>>> >
>>> > I would say that if this becomes a problem, we could look at the
>>> > pros and cons of various remedies, this being one alternative.
>>> >
>>> >
>>> >
>>> >
>>> > I am hopeful that schemas give us a rich enough way to
>>> > encode the vast majority of types that we will want to
>>> > transmit across language barriers (possibly with some
>>> > widening promotions). For high performance one will want
>>> > to use formats like arrow rather than one-off coders as
>>> > well, which also biases us towards the schema work. The
>>> > set of StandardCoders is not closed, and nor is the
>>> > possibility of figuring out a way to communicate outside
>>> > this set for a particular pair of languages, but I think
>>> > it makes sense to avoid going that direction unless we
>>> > have to due to the increased API surface aread and
>>> > complexity it imposes on all runners and SDKs.
>>> >
>>> >
>>> > Great! Could you share some links about the schema work. It
>>> > seems very interesting and promising.
>>> >
>>> >
>>> > https://beam.apache.org/contribute/design-documents/#sql--schema 
>>> > and
>>> > of particular relevance https://s.apache.org/beam-schemas
>>> >
>>> >
>>> >


Re: Collecting metrics in JobInvocation - BEAM-4775

2019-08-07 Thread Robert Bradshaw
I think the question here is whether PipelineRunner::run is allowed to
be blocking. If it is, then the futures make sense (but there's no way
to properly cancel it). I'm OK with not being able to return metrics
on cancel in this case, or the case the pipeline didn't even start up
yet. Otherwise, we should quickly get a handle to the PipelineResult
and be able to query that for all future use.

On Fri, Jul 26, 2019 at 6:04 PM Kenneth Knowles  wrote:
>
> Took a look at the code, too. It seems like a mismatch in a few ways
>
>  - PipelineRunner::run is async already and returns while the job is still 
> running
>  - PipelineResult is a legacy name - it is really meant to be a handle to a 
> running job
>  - cancel() on a future is just not really related to cancel() in a job. I 
> would expect to cancel a job with PipelineResult::cancel and I would expect 
> JobInvocation::cancel to cancel the "start job" RPC/request/whatever. So I 
> would not expect metrics for a job which I decided to not even start.
>
> Kenn
>
> On Fri, Jul 26, 2019 at 8:48 AM Łukasz Gajowy  wrote:
>>
>> Hi all,
>>
>> I'm currently working on BEAM-4775. The goal here is to pass portable 
>> MetricResults over the RPC API to the PortableRunner (SDK) part and allow 
>> reading them there. The metrics can be collected from the pipeline result 
>> that is available in JobInvocation's callbacks. The callbacks are registered 
>> in start() and cancel() methods of JobInvocation. This is the place where my 
>> problems begin:
>>
>> I want to access the pipeline result and get the MetricResults from it. This 
>> is possible only in onSuccess(PipelineResult result) method of the callbacks 
>> registered in start() and cancel() in JobInvocation. Now, when I cancel the 
>> job invocation, invocationFuture.cancel() is called and will result in 
>> invoking onFailure(Throwable throwable) in case the pipeline is still 
>> running. onFailure() has no PipelineResult parameter, hence there currently 
>> is no possibility to collect the metrics there.
>>
>> My questions currently are:
>>
>> Should we collect metrics after the job is canceled? So far I assumed that 
>> we should.
>> If so, does anyone have some other ideas on how to collect metrics so that 
>> we could collect them when canceling the job?
>>
>> PR I'm working on with more discussions on the topic: PR 9020
>> The current idea on how the metrics could be collected in JobInvocation: link
>>
>> Thanks,
>> Łukasz
>>


Re: (mini-doc) Beam (Flink) portable job templates

2019-08-07 Thread Robert Bradshaw
On Wed, Aug 7, 2019 at 6:20 AM Thomas Weise  wrote:
>
> Hi Kyle,
>
> [document doesn't have comments enabled currently]
>
> As noted, worker deployment is an open question. I believe pipeline 
> submission and worker execution need to be considered together for a complete 
> deployment story. The idea of creating a self containing jar file is 
> interesting, but there are trade-offs:
>
> * The pipeline construction code itself may need access to cluster resources. 
> In such cases the jar file cannot be created offline.

Could you elaborate?

> * For k8s deployment, a container image with the SDK and application code is 
> required for the worker. The jar file (which is really a derived artifact) 
> would need to be built in addition to the container image.

Yes. For standard use, a vanilla released Beam published SDK container
+ staged artifacts should be sufficient.

> * To build such jar file, the user would need a build environment with job 
> server and application code. Do we want to make that assumption?

Actually, it's probably much easier than that. A jar file is just a
zip file with a standard structure, to which one can easily add (data)
files without having a full build environment. The (pre-compiled) main
class would know how to read this data to construct the pipeline and
kick off the job just like any other Flink job.

> The document that I had shared discusses options for pipeline submission. It 
> might be interesting to explore if your proposal for building such a jar can 
> be integrated or if you have other comments?
>
> Thomas
>
>
>
> On Tue, Aug 6, 2019 at 5:03 PM Kyle Weaver  wrote:
>>
>> Hi all,
>>
>> Following up on discussion about portable Beam on Flink on Kubernetes [1], I 
>> have drafted a short document on how I propose we bundle portable Beam 
>> applications into jars that can be run on OSS runners, similar to Dataflow 
>> templates (but without the actual template part, at least for the first 
>> iteration). It's pretty straightforward, but I thought I would broadcast it 
>> here in case anyone is interested.
>>
>> https://docs.google.com/document/d/1kj_9JWxGWOmSGeZ5hbLVDXSTv-zBrx4kQRqOq85RYD4/edit#
>>
>> [1] 
>> https://lists.apache.org/thread.html/a12dd939c4af254694481796bc08b05bb1321cfaadd1a79cd3866584@%3Cdev.beam.apache.org%3E
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


Re: [PROPOSAL] An initial Schema API in Python

2019-08-06 Thread Robert Bradshaw
On Sun, Aug 4, 2019 at 12:03 AM Chad Dombrova  wrote:
>
> Hi,
>
> This looks like a great feature.
>
> Is there a plan to eventually support custom field types?
>
> I assume adding support for dataclasses in python 3.7+ should be trivial to 
> do in a follow up PR. Do you see any complications with that? The main 
> advantage that dataclasses have over NamedTuple in this context is argument 
> defaults, which is a nice convenience.

Java has a notion of logical types which has yet to be figured out in
a cross-langauge way but tackles this exact issue. I think there's a
lot of value in "anonymous" named tuples as intermediates well, e.g.
one might to a projection onto a subset of fields, and then do a
grouping/aggregating operation, in which case the new schema can be
inferred (even if it doesn't have a name).

> My PR as it is right now actually doesn’t even support int. I probably should 
> at least make a change to accept int as a type specification for iint64 but 
> throw an error when encoding if an int is too big.
>
> Should probably do the same for float.
>
> Another concern I have is, if there is a user function or a library that user 
> does not control, that uses typing to indicate that a function accepts a type 
> of int, would it be compatible with numpy types?
>
> I have similar concerns. I guess we’ll just have to cast to int before 
> passing into 3rd party code, which is not ideal. Why not use int for int64 in 
> python?

A properly written library should accept any type implementing the
__int__ (or __index__) methods in place of an int, rather than doing
explicit type checks, though performance may suffer. Likewise when
encoding, we should accept all sorts of ints when an int32 (say) is
expected, rather than force the user to know and cast to the right
type.

As for the mappings between Python types and schemas, there are
several mappings that are somewhat being conflated.

(1) There is the mapping used in definitions. At the moment, all
subclasses of NamedTuple map to the same generic Row schema type,
probably not something we want in the long run (but could be OK for
now if we think doing better in the future counts as backwards
compatible). For integral types, it makes sense to accept
np.int{8,16,32,64}, but should we accept the equivalent arrow types
here as well? I think we also need to accept the plain Python "int"
and "float" type, otherwise a standard Python class like

NamedTuple('Word', [('name', str), ('rank', int), ('frequency', float)]

will be surprisingly rejected.

(2) The mapping of concrete values to Python types. Rows mapping to
NamedTuples may give expectations beyond the attributes they offer
(and I'd imagine we'll want to be flexible with the possible
representations here, e.g. offering a slice of an arrow record batch).
Or do we need to pay the cost of re-creating the users NamedTuple
subclass. Ints are another interesting case--it may be quite
surprising to users for the returned values to have silent truncating
overflow semantics (very unlike Python) rather than the arbitrary
precision that Python's ints give (especially if the conversion from a
python int to an int64 happens due to an invisible fusion barrier).
Better to compute the larger value and then thrown an error if/when it
is encoded into a fixed width type later.

(3) The mapping of Python values into a row (e.g. for serialization).
If there are errors (e.g. a DoFn produces tuples of the wrong type),
how eagerly can we detect them? At what cost? How strict should we be
(e.g. if a named tuple with certain fields is expected, can we map a
concrete subclass to it? A NamedTuple that has a superset of the
fields? Implicitly mapping Python's float (aka a 64-bit C double) to a
float32 is a particularly sticky question.

I think we can make forward progress on implementation in parallel to
answering these questions, but we should be explicit and document what
the best options are here and then get the code to align.


Re: [ANNOUNCE] Beam 2.14.0 Released!

2019-08-02 Thread Robert Bradshaw
Lots of improvements all around. Thank you for pushing this through, Anton!

On Fri, Aug 2, 2019 at 1:37 AM Chad Dombrova  wrote:
>
> Nice work all round!  I love the release blog format with the highlights and 
> links to issues.
>
> -chad
>
>
> On Thu, Aug 1, 2019 at 4:23 PM Anton Kedin  wrote:
>>
>> The Apache Beam team is pleased to announce the release of version 2.14.0.
>>
>> Apache Beam is an open source unified programming model to define and
>> execute data processing pipelines, including ETL, batch and stream
>> (continuous) processing. See https://beam.apache.org
>>
>> You can download the release here:
>>
>> https://beam.apache.org/get-started/downloads/
>>
>> This release includes bugfixes, features, and improvements detailed on
>> the Beam blog: https://beam.apache.org/blog/2019/07/31/beam-2.14.0.html
>>
>> Thanks to everyone who contributed to this release, and we hope you enjoy
>> using Beam 2.14.0.
>>
>> -- Anton Kedin, on behalf of The Apache Beam team


Re: [ANNOUNCE] New committer: Jan Lukavský

2019-08-01 Thread Robert Bradshaw
Congratulations!

On Thu, Aug 1, 2019 at 9:59 AM Jan Lukavský  wrote:

> Thanks everyone!
>
> Looking forward to working with this great community! :-)
>
> Cheers,
>
>  Jan
> On 8/1/19 12:18 AM, Rui Wang wrote:
>
> Congratulations!
>
>
> -Rui
>
> On Wed, Jul 31, 2019 at 10:51 AM Robin Qiu  wrote:
>
>> Congrats!
>>
>> On Wed, Jul 31, 2019 at 10:31 AM Aizhamal Nurmamat kyzy <
>> aizha...@apache.org> wrote:
>>
>>> Congratulations, Jan! Thank you for your contributions!
>>>
>>> On Wed, Jul 31, 2019 at 10:04 AM Tanay Tummalapalli 
>>> wrote:
>>>
 Congratulations!

 On Wed, Jul 31, 2019 at 10:05 PM Ahmet Altay  wrote:

> Congratulations Jan! Thank you for your contributions!
>
> On Wed, Jul 31, 2019 at 2:30 AM Ankur Goenka 
> wrote:
>
>> Congratulations Jan!
>>
>> On Wed, Jul 31, 2019, 1:23 AM David Morávek  wrote:
>>
>>> Congratulations Jan, well deserved! ;)
>>>
>>> D.
>>>
>>> On Wed, Jul 31, 2019 at 10:17 AM Ryan Skraba 
>>> wrote:
>>>
 Congratulations Jan!

 On Wed, Jul 31, 2019 at 10:10 AM Ismaël Mejía 
 wrote:
 >
 > Hi,
 >
 > Please join me and the rest of the Beam PMC in welcoming a new
 > committer: Jan Lukavský.
 >
 > Jan has been contributing to Beam for a while, he was part of the
 team
 > that contributed the Euphoria DSL extension, and he has done
 > interesting improvements for the Spark and Direct runner. He has
 also
 > been active in the community discussions around the Beam model and
 > other subjects.
 >
 > In consideration of Jan's contributions, the Beam PMC trusts him
 with
 > the responsibilities of a Beam committer [1].
 >
 > Thank you, Jan, for your contributions and looking forward to
 many more!
 >
 > Ismaël, on behalf of the Apache Beam PMC
 >
 > [1] https://beam.apache.org/committer/committer

>>>


Re: [DISCUSS] Integer coders used in SchemaCoder

2019-07-31 Thread Robert Bradshaw
The standard VARINT coder is used for all sorts of integer values (e.g. the
output of the CountElements transform), but the vast majority of them are
likely significantly less than a full 64 bits. In Python, declaring an
element type to be int will use this. On the other hand, using a VarInt
format for int8 seems quite wasteful. Where the cutoff is is probably
arbitrary, but the java 32-bit int type is often used as the generic (and
often small-ish) integer type in Java, whereas int16 is an explicit choice
where one knows that 16 bits is good enough, but 8 isn't.

It looks like Go use the VarInt encoding everywhere:
https://github.com/apache/beam/blob/release-2.14.0/sdks/go/pkg/beam/coder.go#L135
. Python, as mentioned, uses VarInt encoding everywhere as well.

(There's also the question of whether we want to introduce StandardCoders
for all of these, or if we'd rather move to using Schemas over Coders and
just define them as part of the RowCoder.)




On Tue, Jul 30, 2019 at 8:30 PM Brian Hulette  wrote:

> Forgot to include a link to the code. The mapping from primitive type to
> coders can be found here:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java#L44
>
> On Tue, Jul 30, 2019 at 11:24 AM Brian Hulette 
> wrote:
>
>> Currently the coders used for integer types in RowCoder (and thus
>> SchemaCoder) are inconsistent. For int32 and int64, we use VarIntCoder and
>> VarLongCoder which encode those types with variable width, but for byte and
>> int16 we use ByteCoder and BigEndianShortCoder, which are fixed width.
>>
>> Is it a conscious choice to use variable width coders just for the larger
>> width integers (where they could have the most benefit), or should we
>> consider normalizing these coders to always be fixed width?
>>
>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-31 Thread Robert Bradshaw
Yep, Python support under active development, e.g.
https://github.com/apache/beam/pull/9188

On Wed, Jul 31, 2019 at 9:24 AM jincheng sun 
wrote:

> Thanks a lot for sharing the link. I take a quick look at the design and
> the implementation in Java and think it could address my concern. It seems
> that it's still not supported in the Python SDK Harness. Is there any plan
> on that?
>
> Robert Bradshaw  于2019年7月30日周二 下午12:33写道:
>
>> On Tue, Jul 30, 2019 at 11:52 AM jincheng sun 
>> wrote:
>>
>>>
>>>>> Is it possible to add an interface such as `isSelfContained()` to the
>>>>> `Coder`? This interface indicates
>>>>> whether the serialized bytes are self contained. If it returns true,
>>>>> then there is no need to add a prefixing length.
>>>>> In this way, there is no need to introduce an extra protocol,  Please
>>>>> correct me if I missed something :)
>>>>>
>>>>
>>>> The question is how it is self contained. E.g. DoubleCoder is self
>>>> contained because it always uses exactly 8 bytes, but one needs to know the
>>>> double coder to leverage this. VarInt coder is self-contained a different
>>>> way, as is StringCoder (which does just do prefixing).
>>>>
>>>
>>> Yes, you are right! I think it again that we can not add such interface
>>> for the coder, due to runner can not call it. And just one more thought:
>>> does it make sense to add a method such as "registerSelfContained
>>> Coder(xxx)" or so to let users register the coders which can be processed
>>> in the SDK Harness?  It's the responsibility of the SDK harness to ensure
>>> that the coder is supported.
>>>
>>
>> Basically, a "please don't add length prefixing to this coder, assume
>> everyone else can understand it (and errors will ensue if anyone doesn't)"
>> at the user level? Seems a bit dangerous. Also, there is not "the
>> SDK"--there may be multiple other SDKs in general, and of course runner
>> components, some of which may understand the coder in question and some of
>> which may not.
>>
>> I would say that if this becomes a problem, we could look at the pros and
>> cons of various remedies, this being one alternative.
>>
>>
>>>
>>>
>>>> I am hopeful that schemas give us a rich enough way to encode the vast
>>>> majority of types that we will want to transmit across language barriers
>>>> (possibly with some widening promotions). For high performance one will
>>>> want to use formats like arrow rather than one-off coders as well, which
>>>> also biases us towards the schema work. The set of StandardCoders is not
>>>> closed, and nor is the possibility of figuring out a way to communicate
>>>> outside this set for a particular pair of languages, but I think it makes
>>>> sense to avoid going that direction unless we have to due to the increased
>>>> API surface aread and complexity it imposes on all runners and SDKs.
>>>>
>>>
>>> Great! Could you share some links about the schema work. It seems very
>>> interesting and promising.
>>>
>>
>> https://beam.apache.org/contribute/design-documents/#sql--schema and of
>> particular relevance https://s.apache.org/beam-schemas
>>
>>
>>
>


Re: [VOTE] Release 2.14.0, release candidate #1

2019-07-31 Thread Robert Bradshaw
On Wed, Jul 31, 2019 at 11:22 AM Valentyn Tymofieiev 
wrote:

> I have checked Portable Wordcount example on Flink and Spark on Python 2
> and Python 3.
>
> To do so, I had to checkout Beam from git repo, since using the source
> distribution does not include gradlew, and gradelw_orig did not work for
> me. Commands I ran:
>
> git checkout tags/v2.14.0-RC1
> ./gradlew :sdks:python:container:py3:docker
> ./gradlew :runners:flink:1.5:job-server:runShadow# Use  ./gradlew
> :runners:spark:job-server:runShadow for Spark
> ./gradlew :sdks:python:test-suites:portable:py35:portableWordCountBatch
>  -PjobEndpoint=localhost:8099 -PenvironmentType=LOOPBACK
> cat /tmp/py-wordcount-direct* # to verify results.
>
> Loopback scenarios worked, however DOCKER scenarios did not. Opened
> several Jiras to follow up:
>
> https://issues.apache.org/jira/browse/BEAM-7857
> https://issues.apache.org/jira/browse/BEAM-7858
> https://issues.apache.org/jira/browse/BEAM-7859
> <https://issues.apache.org/jira/browse/BEAM-7859?filter=-2>
>

I commented on the bugs, and I think this is due to trying to use Docker
mode with local files (a known issue).


> The gradle targets that were required to run these tests are not present
> in 2.13.0 branch, so I don't consider it a regression and still cast +1.
>

Agreed.


> On Wed, Jul 31, 2019 at 11:31 AM Ismaël Mejía  wrote:
>
>> Oups Robert pointed to me that I have probably not counted correctly.
>> There were indeed already 3 PMC +1 votes. Pablo, Robert and Ahmet.
>> Please excuse me for the extra noise.
>>
>> On Wed, Jul 31, 2019 at 9:46 AM Ismaël Mejía  wrote:
>> >
>> > To complete the release we need to have at least three +1 binding
>> > votes (votes from PMC members) as stated in [1]. So far we have only
>> > 2.
>> >
>> > Thomas (and the others). The blog post PR is now open [2] please help
>> > us add missing features or maybe to highlight the ones you consider
>> > important in the PR comments.
>> >
>> > Here it is the missing +1 (binding). Validated SHAs+signatures,
>> > beam-samples and one internal company project with the new jars.
>> > Compared source file vs tagged git repo. Everything looks ok.
>> >
>> > [1] https://www.apache.org/foundation/voting.html#ReleaseVotes
>> > [2] https://github.com/apache/beam/pull/9201/files
>> >
>> > On Wed, Jul 31, 2019 at 6:27 AM Anton Kedin  wrote:
>> > >
>> > > Ran various postcommits, validates runners, and nexmark against the
>> release branch. All looks good so far.
>> > >
>> > > Will take another look at the docs/blog and the nexmark numbers
>> tomorrow, but if nothing comes up I will close the vote tomorrow
>> (Wednesday) by 6pm PST (= Thursday 01:00am UTC) since it's over 72hours
>> since the vote has started and we have a number of +1s including PMC
>> members and no -1s.
>> > >
>> > > Regards,
>> > > Anton
>> > >
>> > > On Tue, Jul 30, 2019 at 8:13 PM Valentyn Tymofieiev <
>> valen...@google.com> wrote:
>> > >>
>> > >> I also ran unit tests for Python 3.7 and they passed as well. Cython
>> tests for python3.7 require  `apt-get install python3.7-dev`.
>> > >>
>> > >> On Wed, Jul 31, 2019 at 3:16 AM Pablo Estrada 
>> wrote:
>> > >>>
>> > >>> +1
>> > >>>
>> > >>> I installed from source, and ran unit tests for Python in 2.7, 3.5,
>> 3.6.
>> > >>>
>> > >>> Also ran a number of integration tests on Py 3.5 on Dataflow and
>> DirectRunner.
>> > >>> Best
>> > >>> -P.
>> > >>>
>> > >>> On Tue, Jul 30, 2019 at 11:09 AM Hannah Jiang <
>> hannahji...@google.com> wrote:
>> > >>>>
>> > >>>> I checked Py3 tests using .zip, mainly with direct runners, and
>> everything looks good, so +1.
>> > >>>>
>> > >>>> On Tue, Jul 30, 2019 at 2:08 AM Robert Bradshaw <
>> rober...@google.com> wrote:
>> > >>>>>
>> > >>>>> I checked all the artifact signatures and ran a couple test
>> pipelines with the wheels (Py2 and Py3) and everything looked good to me,
>> so +1.
>> > >>>>>
>> > >>>>> On Mon, Jul 29, 2019 at 8:29 PM Valentyn Tymofieiev <
>> valen...@google.com> wrote:
>> > >>>>>>
>> > >>>>>&g

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-30 Thread Robert Bradshaw
On Tue, Jul 30, 2019 at 11:52 AM jincheng sun 
wrote:

>
>>> Is it possible to add an interface such as `isSelfContained()` to the
>>> `Coder`? This interface indicates
>>> whether the serialized bytes are self contained. If it returns true,
>>> then there is no need to add a prefixing length.
>>> In this way, there is no need to introduce an extra protocol,  Please
>>> correct me if I missed something :)
>>>
>>
>> The question is how it is self contained. E.g. DoubleCoder is self
>> contained because it always uses exactly 8 bytes, but one needs to know the
>> double coder to leverage this. VarInt coder is self-contained a different
>> way, as is StringCoder (which does just do prefixing).
>>
>
> Yes, you are right! I think it again that we can not add such interface
> for the coder, due to runner can not call it. And just one more thought:
> does it make sense to add a method such as "registerSelfContained
> Coder(xxx)" or so to let users register the coders which can be processed
> in the SDK Harness?  It's the responsibility of the SDK harness to ensure
> that the coder is supported.
>

Basically, a "please don't add length prefixing to this coder, assume
everyone else can understand it (and errors will ensue if anyone doesn't)"
at the user level? Seems a bit dangerous. Also, there is not "the
SDK"--there may be multiple other SDKs in general, and of course runner
components, some of which may understand the coder in question and some of
which may not.

I would say that if this becomes a problem, we could look at the pros and
cons of various remedies, this being one alternative.


>
>
>> I am hopeful that schemas give us a rich enough way to encode the vast
>> majority of types that we will want to transmit across language barriers
>> (possibly with some widening promotions). For high performance one will
>> want to use formats like arrow rather than one-off coders as well, which
>> also biases us towards the schema work. The set of StandardCoders is not
>> closed, and nor is the possibility of figuring out a way to communicate
>> outside this set for a particular pair of languages, but I think it makes
>> sense to avoid going that direction unless we have to due to the increased
>> API surface aread and complexity it imposes on all runners and SDKs.
>>
>
> Great! Could you share some links about the schema work. It seems very
> interesting and promising.
>

https://beam.apache.org/contribute/design-documents/#sql--schema and of
particular relevance https://s.apache.org/beam-schemas


Re: [VOTE] Release 2.14.0, release candidate #1

2019-07-30 Thread Robert Bradshaw
I checked all the artifact signatures and ran a couple test pipelines with
the wheels (Py2 and Py3) and everything looked good to me, so +1.

On Mon, Jul 29, 2019 at 8:29 PM Valentyn Tymofieiev 
wrote:

> I have checked Python 3 batch and streaming quickstarts on Dataflow runner
> using .zip and wheel distributions. So far +1 from me.
>
> On Mon, Jul 29, 2019 at 7:53 PM Ahmet Altay  wrote:
>
>> +1, validated python 2 quickstarts.
>>
>> On Fri, Jul 26, 2019 at 5:46 PM Ahmet Altay  wrote:
>>
>>> To confirm, I manuall validated leader board on python. It is working.
>>>
>>> On Fri, Jul 26, 2019 at 5:23 PM Yifan Zou  wrote:
>>>
 AFAIK, there should not be any special prerequisites for this. Things
 the script does including:
 1. download the python rc in zip
 2. start virtualenv and install the sdk.
 3. verify hash.
 4. config settings.xml and start a Java pubsub message injector.
 5. run game examples and validate.

 Could you double check if the sdk was installed properly (step 1&2)?

>>>
>>> I also guessing this is the case. Probably something earlier in the
>>> validation script did not run as expected.
>>>
>>>


>>> Yifan

 On Fri, Jul 26, 2019 at 2:38 PM Anton Kedin  wrote:

> Validation script fails for me when I try to run [1] python
> leaderboard with direct runner:
>
> ```
> *
> * Running Python Leaderboard with DirectRunner
> *
> /usr/bin/python: No module named apache_beam.examples.complete.game
> ```
>
> If someone has more context, what are the prerequisites for this step?
> How does it look up the module?
>
> [1]
> https://github.com/apache/beam/blob/master/release/src/main/scripts/run_rc_validation.sh#L424
>
> Regards,
> Anton
>
> On Fri, Jul 26, 2019 at 10:23 AM Anton Kedin  wrote:
>
>> Cool, will make the post and will update the release guide as well
>> then
>>
>> On Fri, Jul 26, 2019 at 10:20 AM Chad Dombrova 
>> wrote:
>>
>>> I think the release guide needs to be updated to remove the
 optionality of blog creation and avoid confusion. Thanks for pointing 
 that
 out.

>>>
>>> +1
>>>
>>>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-29 Thread Robert Bradshaw
On Mon, Jul 29, 2019 at 4:14 PM jincheng sun 
wrote:

> Hi Robert,
>
> Thanks for your detail comments, I would have added a few pointers inline.
>
> Best,
> Jincheng
>
> Robert Bradshaw  于2019年7月29日周一 下午12:35写道:
>
>> On Sun, Jul 28, 2019 at 6:51 AM jincheng sun 
>> wrote:
>> >
>> > Hi, Thomas & Robert, Thanks for your comments and providing relevant
>> discussions and JIRA links, very helpful to me!
>> >
>> > I am glad to see your affirmative response,  And I am glad to add my
>> thoughts on the comment you left:
>> > -
>> >
>> > >> There are two distinct levels at which one can talk about a certain
>> type of state being supported: the user-visible SDK's API and the runner's
>> API. For example, BagState, ValueState, ReducingState, CombiningState,  can
>> all be implemented on top of a runner-offered MapState in the SDK. On the
>> one hand, there's a desire to keep the number of "primitive" states types
>> to a minimum (to ease the use of authoring runners), but if a runner can
>> perform a specific optimization due to knowing about the particular state
>> type it might be preferable to pass it through rather than emulate it in
>> the SDK.
>> > ---
>> > Agree. Regarding MapState, it's definitely needed as it cannot be
>> implemented on top of the existing BagState.
>> > Regarding ValueState, it can be implemented on top of BagState.
>> However, we can do optimization if we know a state is ValueState.
>> > For example, if a key is updated with a new value, if the ValueState is
>> implemented on top of BagState, two RPC calls are needed
>> > to write the new value back to runner: clear + append; if we know it's
>> ValueState, just one RPC call is enough: set.
>> > We can discuss case by case whether a state type is needed.
>>
>> In the Beam APIs [1] multiple state requests are consumed as a stream
>> in a single RPC, so clear followed by append still has low overhead.
>> Is that optimization not sufficient?
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L573
>>
>>
> Actually there are two kinds of overhead:
> 1) the RPC overhead(I think in this point  may be sufficient for RPC)
> 2) the state read/write overhead, i.e., If there is no optimization, the
> runner needs to clear the state firstly and then set a new value for the
> state.
>

It's certainly an option to keep open. I'd avoid prematurely optimizing
until we have evidence that it matters.


>
> > ---
>> >
>> > >> Note that in the protos, the GRPC ports have a coder attribute
>> > specifically to allow this kind of customization (and the SDKs should
>> > be respecting that). We've also talked about going beyond per-element
>> > encodings (e.g. using arrow to serialize entire batches across the
>> > whire). I think all the runner code simply uses the default and we
>> > could be more intelligent there.
>> > ---
>> >
>> > Yes, the gRPC allows to use customization coder. However, I'm afraid
>> that this is not enough as we want to use
>> > Beam's portability framework by depending on the modules used
>> (beam-runners-java-fn-execution and the Python SDK Harness) instead
>> > of copying that part of code to Flink. So it should also allow to use
>> the customization coder in beam-runners-java-fn-execution.
>> > Otherwise, we have to copy a lot of code to Flink to use the
>> customization coder.
>>
>> Agreed, beam-runners-java-fn-execution does not take advantage of the
>> full flexibility of the protocol, and would make a lot of sense to
>> enhance it to be able to.
>>
>> > ---
>> >
>> > >> I'm wary of having too many buffer size configuration options (is
>> > there a compelling reason to make it bigger or smaller?) but something
>> > timebased would be very useful.
>> > ---
>> >
>> > I think the default values of buffer size are not needed to change for
>> most cases. I'm not sure for just one case: _DEFAULT_FLUSH_THRESHOLD=10MB.
>> > Will 1MB makes more sense?
>>
>> IIRC, 10MB was the point at which, according to benchmarks Luke did
>> quite a while ago, there was clearly no performance benefit in making
>> it larger. Coupled with a time-based threshold, I don't see much of an
>> advantage to low

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-29 Thread Robert Bradshaw
ment types that can be passed through "standard"
> coders.
> ---
>
> The design makes sense to me. My concern is that if a coder is not among the 
> StandardCoders, it will be prefixed with a length even if the harness knows 
> how to decode it.

If the harness knows how to decode it, the length prefixing is just a
lost optimization opportunity, but it'll still work. Whether this is a
big enough loss to merit introducing an extra protocol to negotiate on
the set of commonly known coders beyond standard coders is still TBD,
but probably not for v1 (and possibly not ever, especially as schemas
become more expressive).

> Besides, I'm also curious about the standard whether a coder can be put into 
> StandardCoders.
> For example, I noticed that FLOAT is not among StandardCoders, while DOUBLE 
> is among it.

StandardCoders is supposed to be some sort of lowest common
denominator, but theres no hard and fast criteria. For this example,
some languages (e.g. Python) don't have the notion of FLOAT, and using
a FLOAT coder for Python floats (whose underling representation is
double) gets tricky as this coder is not faithful. We also don't have
specific int coders for smaller-than-64-bit types which, like float,
are easily promoted.

> Best, Jincheng
>
> Robert Bradshaw  于2019年7月25日周四 下午2:00写道:
>>
>> On Thu, Jul 25, 2019 at 5:31 AM Thomas Weise  wrote:
>> >
>> > Hi Jincheng,
>> >
>> > It is very exciting to see this follow-up, that you have done your 
>> > research on the current state and that there is the intention to join 
>> > forces on the portability effort!
>> >
>> > I have added a few pointers inline.
>> >
>> > Several of the issues you identified affect our usage of Beam as well. 
>> > These present an opportunity for collaboration.
>>
>> +1, a lot of this aligns with improvements we'd like to make as well.
>>
>> > On Wed, Jul 24, 2019 at 2:53 AM jincheng sun  
>> > wrote:
>> >>
>> >> Hi all,
>> >>
>> >> Thanks Max and all of your kind words. :)
>> >>
>> >> Sorry for the late reply as I'm busy working on the Flink 1.9 release. 
>> >> For the next major release of Flink, we plan to add Python user defined 
>> >> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam 
>> >> portability framework and think that it is perfect for our requirements. 
>> >> However we also find some improvements needed for Beam:
>> >>
>> >> Must Have:
>> >> 
>> >> 1) Currently only BagState is supported in gRPC protocol and I think we 
>> >> should support more kinds of state types, such as MapState, ValueState, 
>> >> ReducingState, CombiningState(AggregatingState in Flink), etc. That's 
>> >> because these kinds of state will be used in both user-defined function 
>> >> or Flink Python DataStream API.
>> >
>> > There has been discussion about the need for different state types and to 
>> > efficiently support those on the runner side there may be a need to look 
>> > at the over the wire representation also.
>> >
>> > https://lists.apache.org/thread.html/ccc0d548e440b63897b6784cd7896c266498df64c9c63ce6c52ae098@%3Cdev.beam.apache.org%3E
>> > https://lists.apache.org/thread.html/ccf8529a49003a7be622b4d3403eba2c633caeaf5ced033e25d4c2e2@%3Cdev.beam.apache.org%3E
>>
>> There are two distinct levels at which one can talk about a certain
>> type of state being supported: the user-visible SDK's API and the
>> runner's API. For example, BagState, ValueState, ReducingState,
>> CombiningState,  can all be implemented on top of a runner-offered
>> MapState in the SDK. On the one hand, there's a desire to keep the
>> number of "primitive" states types to a minimum (to ease the use of
>> authoring runners), but if a runner can perform a specific
>> optimization due to knowing about the particular state type it might
>> be preferable to pass it through rather than emulate it in the SDK.
>>
>> >> 2) There are warnings that Python 3 is not fully supported in Beam 
>> >> (beam/sdks/python/setup.py). We should support Python 3.x for the beam 
>> >> portability framework due to Python 2 will be not supported officially.
>> >
>> > This must be obsolete per latest comments on: 
>> > https://issues.apache.org/jira/browse/BEAM-1251
>> >
>> >> 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory 
&g

Re: Sort Merge Bucket - Action Items

2019-07-26 Thread Robert Bradshaw
On Thu, Jul 25, 2019 at 11:09 PM Eugene Kirpichov  wrote:
>
> Hi Gleb,
>
> Regarding the future of io.Read: ideally things would go as follows
> - All runners support SDF at feature parity with Read (mostly this is just 
> the Dataflow runner's liquid sharding and size estimation for bounded 
> sources, and backlog for unbounded sources, but I recall that a couple of 
> other runners also used size estimation)
> - Bounded/UnboundedSource APIs are declared "deprecated" - it is forbidden to 
> add any new implementations to SDK, and users shouldn't use them either 
> (note: I believe it's already effectively forbidden to use them for cases 
> where a DoFn/SDF at the current level of support will be sufficient)
> - People one by one rewrite existing Bounded/UnboundedSource based 
> PTransforms in the SDK to use SDFs instead
> - Read.from() is rewritten to use a wrapper SDF over the given Source, and 
> explicit support for Read is deleted from runners
> - In the next major version of Beam - presumably 3.0 - the Read transform 
> itself is deleted
>
> I don't know what's the current status of SDF/Read feature parity, maybe Luke 
> or Cham can comment. An alternative path is offered in 
> http://s.apache.org/sdf-via-source.

Python supports initial splitting for SDF of all sources on portable
runners. Dataflow support for batch SDF is undergoing testing, not yet
rolled out. Dataflow support for streaming SDF is awaiting portable
state/timer support.

> On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov  wrote:
>>
>> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going away 
>> in favor of SDF, or we are always going to have both?
>>
>> I was looking into AvroIO.read and AvroIO.readAll, both of them use 
>> AvroSource. AvroIO.readAll is using SDF, and it's implemented with 
>> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at 
>> ReadAllViaFileBasedSource I find it not necessary to use Source, it 
>> should be enough to have something like (KV, 
>> OutputReceiver), as we have discussed in this thread, and that should be 
>> fine for SMB as well. It would require duplicating code from AvroSource, but 
>> in the end, I don't see it as a problem if AvroSource is going away.
>>
>> I'm attaching a small diagram I put for myself to better understand the code.
>>
>> AvroIO.readAll :: PTransform> ->
>>
>> FileIO.matchAll :: PTransform, 
>> PCollection>
>> FileIO.readMatches :: PTransform, 
>> PCollection>
>> AvroIO.readFiles :: PTransform, 
>> PCollection> ->
>>
>> ReadAllViaFileBasedSource :: PTransform, 
>> PCollection> ->
>>
>> ParDo.of(SplitIntoRangesFn :: DoFn> OffsetRange>>) (splittable do fn)
>>
>> Reshuffle.viaRandomKey()
>>
>> ParDo.of(ReadFileRangesFn(createSource) :: DoFn> OffsetRange>, T>) where
>>
>> createSource :: String -> FileBasedSource
>>
>> createSource = AvroSource
>>
>>
>> AvroIO.read without getHintMatchedManyFiles() :: PTransform> PCollection> ->
>>
>> Read.Bounded.from(createSource) where
>>
>> createSource :: String -> FileBasedSource
>>
>> createSource = AvroSource
>>
>>
>> Gleb
>>
>>
>> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw  wrote:
>>>
>>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles  wrote:
>>> >
>>> > From the peanut gallery, keeping a separate implementation for SMB seems 
>>> > fine. Dependencies are serious liabilities for both upstream and 
>>> > downstream. It seems like the reuse angle is generating extra work, and 
>>> > potentially making already-complex implementations more complex, instead 
>>> > of helping things.
>>>
>>> +1
>>>
>>> To be clear, what I care about is that WriteFiles(X) and
>>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
>>> TFRecord, ...}. In other words composability of the API (vs. manually
>>> filling out the matrix). If WriteFiles and WriteSmbFiles find
>>> opportunities for (easy, clean) implementation sharing, that'd be
>>> nice, but not the primary goal.
>>>
>>> (Similarly for reading, though that's seem less obvious. Certainly
>>> whatever T is useful for ReadSmb(T) could be useful for a
>>> (non-liquid-shading) ReadAll(T) however.)
>>>
>>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li  wrote:
>>> >>
>>> >> I spoke too soon. Turns out

Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread Robert Bradshaw
On Thu, Jul 25, 2019 at 6:34 PM rahul patwari
 wrote:
>
> So, If an RPC call has to be performed for a batch of Rows(PCollection), 
> instead of each Row, the recommended way is to batch the Rows in 
> startBundle() of 
> DoFn(https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?

Yes.

> I thought Stateful and Timely Processing could be helpful here.

The upside is that you can persist state across bundles (which is
especially helpful when bundles are small, e.g. for streaming
pipelines). The downside is that you can't persist state across keys
(and it also enforces a shuffle to colocate the data by key).

If you get to choose your keys, you would want to have about as many
keys as you have concurrent bundles (or some small multiple, to ensure
they're not lumpily distributed). Keying by something like
System.identityHashCode(this) in the body of a DoFn might be
sufficient.

> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw  wrote:
>>
>> Though it's not obvious in the name, Stateful ParDos can only be
>> applied to keyed PCollections, similar to GroupByKey. (You could,
>> however, assign every element to the same key and then apply a
>> Stateful DoFn, though in that case all elements would get processed on
>> the same worker.)
>>
>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>>  wrote:
>> >
>> > Hi,
>> >
>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an 
>> > example of assigning an arbitrary-but-consistent index to each element on 
>> > a per key-and-window basis.
>> >
>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say, 
>> > PCollection with Fixed Windows, the state is maintained per window 
>> > and every element in the window will be assigned a consistent index?
>> > Does this mean every element belonging to the window will be processed in 
>> > a single DoFn Instance, which otherwise could have been done in multiple 
>> > parallel instances, limiting performance?
>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?
>> >
>> > Thanks,
>> > Rahul


<    2   3   4   5   6   7   8   9   10   11   >