Re: Custom shardingFn for FileIO

2019-05-08 Thread Jozef Vilcek
Yes, I was able to use it in Flink and I do see performance gain. I also
see, which is important for me, more predictable and uniform memory usage
among workers

On Wed, May 8, 2019 at 7:19 AM Reuven Lax  wrote:

> So you were able to use this in Flink? Did you see performance gains?
>
> On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek  wrote:
>
>> Sorry, it took a while. I wanted to actually use this extension for
>> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
>> PR is at https://github.com/apache/beam/pull/8499
>>
>> On Thu, May 2, 2019 at 3:22 PM Reuven Lax  wrote:
>>
>>> Great, let me know when to take another look at the PR!
>>>
>>> Reuven
>>>
>>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek 
>>> wrote:
>>>
 That coder is added extra as a re-map stage from "original" key to new
 ShardAwareKey ... But pipeline might get broken I guess.
 Very fair point. I am having a second thought pass over this and will
 try to simplify it much more

 On Wed, May 1, 2019 at 2:12 PM Reuven Lax  wrote:

> I haven't looked at the PR in depth yet, but it appears that someone
> running a pipeline today who then tries to update post this PR will have
> the coder change to DefaultShardKeyCoder, even if they haven't picked any
> custom function. Is that correct, or am I misreading things?
>
> Reuven
>
> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek 
> wrote:
>
>> Hm, what would be the scenario? Have version A running with original
>> random sharding and then start version B where I change sharding to some
>> custom function?
>> So I have to enable the pipeline to digest old keys from GBK restored
>> state and also work with new keys produced to GBK going forward?
>>
>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax  wrote:
>>
>>> Initial thought on PR: we usually try to limit changing coders in
>>> these types of transforms to better support runners that allow in-place
>>> updates of pipelines. Can this be done without changing the coder?
>>>
>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek 
>>> wrote:
>>>
 I have created a PR for enhancing WriteFiles for custom sharding
 function.
 https://github.com/apache/beam/pull/8438

 If this sort of change looks good, then next step would be to use
 in in Flink runner transform override. Let me know what do you think

 On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
 wrote:

> I guess it is fine to enable shardingFn control only on WriteFiles
> level rather than FileIO. On WriteFiles it can be manipulated in
> PTransformOverride by runner.
>
> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax 
> wrote:
>
>> Yes, a hook would have to be added to allow specifying a
>> different function for choosing the shard number (I assume the 
>> problem is
>> that there are cases where the current random assignment is not 
>> good?).
>> However this can be set using PTransformOverride, we ideally 
>> shouldn't
>> force the user to know details of the runner when writing their code.
>>
>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <
>> m...@apache.org> wrote:
>>
>>> Reuven is talking about PTransformOverride, e.g.
>>> FlinkTransformOverrides. We already use this to determine the
>>> number of
>>> shards in case of Runner-determined sharding.
>>>
>>> Not sure if that would work for Jozef's case because setting the
>>> number
>>> of shards is not enough. We want to set the shard key directly
>>> and that
>>> logic is buried inside WriteFiles.
>>>
>>> -Max
>>>
>>> On 25.04.19 16:30, Reuven Lax wrote:
>>> > Actually the runner is free to perform surgery on the graph.
>>> The
>>> > FlinkRunner can insert a custom function to determine the
>>> sharding keys.
>>> >
>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>> jozo.vil...@gmail.com
>>> > > wrote:
>>> >
>>> > Right now, sharding can be specified only via target
>>> `shardCount`,
>>> > be it user or runner. Next to configurable shardCount, I am
>>> > proposing to be able to pass also a function which will
>>> allow to the
>>> > user (or runner) control how is shard determined and what
>>> key will
>>> > be used to represent it
>>> >
>>> > interface ShardingFunction[UserT, DestinationT,
>>> ShardKeyT]  extends
>>> > Serializable {
>>> > ShardKeyT assign(DestinationT destination, UserT
>>> element,
>>> > shardCo

Re: [DISCUSS] Portability representation of schemas

2019-05-08 Thread Udi Meiri
>From a Python type hints perspective, how do schemas fit? Type hints are
currently used to determine which coder to use.
It seems that given a schema field, it would be useful to be able to
convert it to a coder (using URNs?), and to convert the coder into a typing
type.
This would allow for pipeline-construction-time type compatibility checks.

Some questions:
1. Why are there 4 types of int (byte, int16, int32, int64)? Is it to
maintain type fidelity when writing back? If so, what happens in languages
that only have "int"?
2. What is encoding_position? How does it differ from id (which is also a
position)?
3. When are schema protos constructed? Are they available during pipeline
construction or afterwards?
4. Once data is read into a Beam pipeline and a schema inferred, do we
maintain the schema types throughout the pipeline or use language-local
types?


On Wed, May 8, 2019 at 6:39 PM Robert Bradshaw  wrote:

> From: Reuven Lax 
> Date: Wed, May 8, 2019 at 10:36 PM
> To: dev
>
> > On Wed, May 8, 2019 at 1:23 PM Robert Bradshaw 
> wrote:
> >>
> >> Very excited to see this. In particular, I think this will be very
> >> useful for cross-language pipelines (not just SQL, but also for
> >> describing non-trivial data (e.g. for source and sink reuse).
> >>
> >> The proto specification makes sense to me. The only thing that looks
> >> like it's missing (other than possibly iterable, for arbitrarily-large
> >> support) is multimap. Another basic type, should we want to support
> >> it, is union (though this of course can get messy).
> >
> > multimap is an interesting suggestion. Do you have a use case in mind?
> >
> > union (or oneof) is also a good suggestion. There are good use cases for
> this, but this is a more fundamental change.
>
> No specific usecase, they just seemed to round out the options.
>
> >> I'm curious what the rational was for going with a oneof for type_info
> >> rather than an repeated components like we do with coders.
> >
> > No strong reason. Do you think repeated components is better than oneof?
>
> It's more consistent with how we currently do coders (which has pros and
> cons).
>
> >> Removing DATETIME as a logical coder on top of INT64 may cause issues
> >> of insufficient resolution and/or timespan. Similarly with DECIMAL (or
> >> would it be backed by string?)
> >
> > There could be multiple TIMESTAMP types for different resolutions, and
> they don't all need the same backing field type. E.g. the backing type for
> nanoseconds could by Row(INT64, INT64), or it could just be a byte array.
>
> Hmm What would the value be in supporting different types of
> timestamps? Would all SDKs have to support all of them? Can one
> compare, take differences, etc. across timestamp types? (As Luke
> points out, the other conversation on timestamps is likely relevant
> here as well.)
>
> >> The biggest question, as far as portability is concerned at least, is
> >> the notion of logical types. serialized_class is clearly not portable,
> >> and I also think we'll want a way to share semantic meaning across
> >> SDKs (especially if things like dates become logical types). Perhaps
> >> URNs (+payloads) would be a better fit here?
> >
> > Yes, URN + payload is probably the better fit for portability.
> >
> >> Taking a step back, I think it's worth asking why we have different
> >> types, rather than simply making everything a LogicalType of bytes
> >> (aka coder). Other than encoding format, the answer I can come up with
> >> is that the type decides the kinds of operations that can be done on
> >> it, e.g. does it support comparison? Arithmetic? Containment?
> >> Higher-level date operations? Perhaps this should be used to guide the
> >> set of types we provide.
> >
> > Also even though we could make everything a LogicalType (though at least
> byte array would have to stay primitive), I think  it's useful to have a
> slightly larger set of primitive types.  It makes things easier to
> understand and debug, and it makes it simpler for the various SDKs to map
> them to their types (e.g. mapping to POJOs).
>
>  This would be the case if one didn't have LogicalType at all, but
> once one introduces that one now has this more complicated two-level
> hierarchy of types which doesn't seem simpler to me.
>
> I'm trying to understand what information Schema encodes that a
> NamedTupleCoder (or RowCoder) would/could not. (Coders have the
> disadvantage that there are multiple encodings of a single value, e.g.
> BigEndian vs. VarInt, but if we have multiple resolutions of timestamp
> that would still seem to be an issue. Possibly another advantage is
> encoding into non-record-oriented formats, e.g. Parquet or Arrow, that
> have a set of primitives.)
>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: [DISCUSS] Portability representation of schemas

2019-05-08 Thread Robert Bradshaw
From: Reuven Lax 
Date: Wed, May 8, 2019 at 10:36 PM
To: dev

> On Wed, May 8, 2019 at 1:23 PM Robert Bradshaw  wrote:
>>
>> Very excited to see this. In particular, I think this will be very
>> useful for cross-language pipelines (not just SQL, but also for
>> describing non-trivial data (e.g. for source and sink reuse).
>>
>> The proto specification makes sense to me. The only thing that looks
>> like it's missing (other than possibly iterable, for arbitrarily-large
>> support) is multimap. Another basic type, should we want to support
>> it, is union (though this of course can get messy).
>
> multimap is an interesting suggestion. Do you have a use case in mind?
>
> union (or oneof) is also a good suggestion. There are good use cases for 
> this, but this is a more fundamental change.

No specific usecase, they just seemed to round out the options.

>> I'm curious what the rational was for going with a oneof for type_info
>> rather than an repeated components like we do with coders.
>
> No strong reason. Do you think repeated components is better than oneof?

It's more consistent with how we currently do coders (which has pros and cons).

>> Removing DATETIME as a logical coder on top of INT64 may cause issues
>> of insufficient resolution and/or timespan. Similarly with DECIMAL (or
>> would it be backed by string?)
>
> There could be multiple TIMESTAMP types for different resolutions, and they 
> don't all need the same backing field type. E.g. the backing type for 
> nanoseconds could by Row(INT64, INT64), or it could just be a byte array.

Hmm What would the value be in supporting different types of
timestamps? Would all SDKs have to support all of them? Can one
compare, take differences, etc. across timestamp types? (As Luke
points out, the other conversation on timestamps is likely relevant
here as well.)

>> The biggest question, as far as portability is concerned at least, is
>> the notion of logical types. serialized_class is clearly not portable,
>> and I also think we'll want a way to share semantic meaning across
>> SDKs (especially if things like dates become logical types). Perhaps
>> URNs (+payloads) would be a better fit here?
>
> Yes, URN + payload is probably the better fit for portability.
>
>> Taking a step back, I think it's worth asking why we have different
>> types, rather than simply making everything a LogicalType of bytes
>> (aka coder). Other than encoding format, the answer I can come up with
>> is that the type decides the kinds of operations that can be done on
>> it, e.g. does it support comparison? Arithmetic? Containment?
>> Higher-level date operations? Perhaps this should be used to guide the
>> set of types we provide.
>
> Also even though we could make everything a LogicalType (though at least byte 
> array would have to stay primitive), I think  it's useful to have a slightly 
> larger set of primitive types.  It makes things easier to understand and 
> debug, and it makes it simpler for the various SDKs to map them to their 
> types (e.g. mapping to POJOs).

 This would be the case if one didn't have LogicalType at all, but
once one introduces that one now has this more complicated two-level
hierarchy of types which doesn't seem simpler to me.

I'm trying to understand what information Schema encodes that a
NamedTupleCoder (or RowCoder) would/could not. (Coders have the
disadvantage that there are multiple encodings of a single value, e.g.
BigEndian vs. VarInt, but if we have multiple resolutions of timestamp
that would still seem to be an issue. Possibly another advantage is
encoding into non-record-oriented formats, e.g. Parquet or Arrow, that
have a set of primitives.)


Re: [DISCUSS] Portability representation of schemas

2019-05-08 Thread Lukasz Cwik
Are you suggesting that schemas become an explicit field on PCollection or
that the coder on PCollections has a well known schema coder type that has
a payload that has field names, ids, type, ...?
I'm much more for the latter since it allows for versioning schema
representations over time without needing a change to the protos.

On Wed, May 8, 2019 at 1:36 PM Reuven Lax  wrote:

>
>
> On Wed, May 8, 2019 at 1:23 PM Robert Bradshaw 
> wrote:
>
>> Very excited to see this. In particular, I think this will be very
>> useful for cross-language pipelines (not just SQL, but also for
>> describing non-trivial data (e.g. for source and sink reuse).
>>
>> The proto specification makes sense to me. The only thing that looks
>> like it's missing (other than possibly iterable, for arbitrarily-large
>> support) is multimap. Another basic type, should we want to support
>> it, is union (though this of course can get messy).
>>
>
> multimap is an interesting suggestion. Do you have a use case in mind?
>
> union (or oneof) is also a good suggestion. There are good use cases for
> this, but this is a more fundamental change.
>
>
>> I'm curious what the rational was for going with a oneof for type_info
>> rather than an repeated components like we do with coders.
>>
>
> No strong reason. Do you think repeated components is better than oneof?
>
>
>> Removing DATETIME as a logical coder on top of INT64 may cause issues
>> of insufficient resolution and/or timespan. Similarly with DECIMAL (or
>> would it be backed by string?)
>>
>
> There could be multiple TIMESTAMP types for different resolutions, and
> they don't all need the same backing field type. E.g. the backing type for
> nanoseconds could by Row(INT64, INT64), or it could just be a byte array.
>

This seems to overlap heavily with the discussion about timestamp precision
in this other ML thread[1].


>
>
>>
>> The biggest question, as far as portability is concerned at least, is
>> the notion of logical types. serialized_class is clearly not portable,
>> and I also think we'll want a way to share semantic meaning across
>> SDKs (especially if things like dates become logical types). Perhaps
>> URNs (+payloads) would be a better fit here?
>>
>
> Yes, URN + payload is probably the better fit for portability.
>

+1


>
>>
>> Taking a step back, I think it's worth asking why we have different
>> types, rather than simply making everything a LogicalType of bytes
>> (aka coder). Other than encoding format, the answer I can come up with
>> is that the type decides the kinds of operations that can be done on
>> it, e.g. does it support comparison? Arithmetic? Containment?
>> Higher-level date operations? Perhaps this should be used to guide the
>> set of types we provide.
>>
>
> Also even though we could make everything a LogicalType (though at least
> byte array would have to stay primitive), I think  it's useful to have a
> slightly larger set of primitive types.  It makes things easier to
> understand and debug, and it makes it simpler for the various SDKs to map
> them to their types (e.g. mapping to POJOs).
>
>
>> (Also, +1 to optional over nullable.)
>>
>
> sounds good. do others prefer optional as well?
>

Can rows backed by schemas have unset fields? If so, wouldn't you want to
differentiate between unset and null which means you would need to support
both null and optional?
I know in proto2, unset vs null was distinct but with proto3, that
distinction was removed.


>
>>
>> From: Reuven Lax 
>> Date: Wed, May 8, 2019 at 6:54 PM
>> To: dev
>>
>> > Beam Java's support for schemas is just about done: we infer schemas
>> from a variety of types, we have a variety of utility transforms (join,
>> aggregate, etc.) for schemas, and schemas are integrated with the ParDo
>> machinery. The big remaining task I'm working on is writing documentation
>> and examples for all of this so that users are aware. If you're interested,
>> these slides from the London Beam meetup show a bit more how schemas can be
>> used and how they simplify the API.
>> >
>> > I want to start integrating schemas into portability so that they can
>> be used from other languages such as Python (in particular this will also
>> allow BeamSQL to be invoked from other languages). In order to do this, the
>> Beam portability protos must have a way of representing schemas. Since this
>> has not been discussed before, I'm starting this discussion now on the list.
>> >
>> > As a reminder: a schema represents the type of a PCollection as a
>> collection of fields. Each field has a name, an id (position), and a field
>> type. A field type can be either a primitive type (int, long, string, byte
>> array, etc.), a nested row (itself with a schema), an array, or a map.
>> >
>> > We also support logical types. A logical type is a way for the user to
>> embed their own types in schema fields. A logical type is always backed by
>> a schema type, and contains a function for mapping the user's logical type
>> to the field 

Re: [discuss] Reducing log verbosity for Python failures?

2019-05-08 Thread Ahmet Altay
In that case compaction makes much more sense. My experience was when a
single test in a suite fails, I end up seeing lots of logs (usually proto
dumps) from unrelated tests.

*From: *Pablo Estrada 
*Date: *Wed, May 8, 2019 at 3:48 PM
*To: *dev

My impression - and I might be wrong [see meme at 1], is that these are
> logged in debug mode, but when the test fails, the test runner dumps them
> all to stdout.
> Best
> -P.
>
> [1]
> https://i1.wp.com/gifrific.com/wp-content/uploads/2015/02/Chris-Farley-Oh-God-Theyre-Gonna-Know-Im-Dumb-Conan-Interview.gif?ssl=1
>
> *From: *Ahmet Altay 
> *Date: *Wed, May 8, 2019 at 3:13 PM
> *To: *dev
>
> +1 It is hard to debug with lots logs messages. And if anybody is using
>> them for development we can make those logs debug logs and hide them by
>> default.
>>
>> *From: *Robert Bradshaw 
>> *Date: *Wed, May 8, 2019 at 3:01 PM
>> *To: *dev
>>
>> +1 to making them significantly more compact in most cases.
>>>
>>> From: Pablo Estrada 
>>> Date: Wed, May 8, 2019 at 11:35 PM
>>> To: dev
>>>
>>> > Hello all,
>>> > Some tests in Python have the problem that when they fail, lots of
>>> internal logging is dumped onto stdout, and we end up having to scroll way
>>> up to find the actual stack trace for the failed test. This logging, as far
>>> as i can tell, is dumping of fn api protos.
>>> >
>>> > Does anyone use these logs to look into the test failure? I would like
>>> to find a way to make these more compact, or maybe just stop logging them
>>> (people who need them can choose to log them in their local setup?).
>>> >
>>> > I lean towards making them more compact (by, for instance, writing
>>> functions that log their information in a more compact fashion); but I
>>> would like to hear thoughts from others.
>>> >
>>> > So thoughts? : )
>>> > -P.
>>>
>>


Re: [discuss] Reducing log verbosity for Python failures?

2019-05-08 Thread Pablo Estrada
My impression - and I might be wrong [see meme at 1], is that these are
logged in debug mode, but when the test fails, the test runner dumps them
all to stdout.
Best
-P.

[1]
https://i1.wp.com/gifrific.com/wp-content/uploads/2015/02/Chris-Farley-Oh-God-Theyre-Gonna-Know-Im-Dumb-Conan-Interview.gif?ssl=1

*From: *Ahmet Altay 
*Date: *Wed, May 8, 2019 at 3:13 PM
*To: *dev

+1 It is hard to debug with lots logs messages. And if anybody is using
> them for development we can make those logs debug logs and hide them by
> default.
>
> *From: *Robert Bradshaw 
> *Date: *Wed, May 8, 2019 at 3:01 PM
> *To: *dev
>
> +1 to making them significantly more compact in most cases.
>>
>> From: Pablo Estrada 
>> Date: Wed, May 8, 2019 at 11:35 PM
>> To: dev
>>
>> > Hello all,
>> > Some tests in Python have the problem that when they fail, lots of
>> internal logging is dumped onto stdout, and we end up having to scroll way
>> up to find the actual stack trace for the failed test. This logging, as far
>> as i can tell, is dumping of fn api protos.
>> >
>> > Does anyone use these logs to look into the test failure? I would like
>> to find a way to make these more compact, or maybe just stop logging them
>> (people who need them can choose to log them in their local setup?).
>> >
>> > I lean towards making them more compact (by, for instance, writing
>> functions that log their information in a more compact fashion); but I
>> would like to hear thoughts from others.
>> >
>> > So thoughts? : )
>> > -P.
>>
>


Re: [discuss] Reducing log verbosity for Python failures?

2019-05-08 Thread Ahmet Altay
+1 It is hard to debug with lots logs messages. And if anybody is using
them for development we can make those logs debug logs and hide them by
default.

*From: *Robert Bradshaw 
*Date: *Wed, May 8, 2019 at 3:01 PM
*To: *dev

+1 to making them significantly more compact in most cases.
>
> From: Pablo Estrada 
> Date: Wed, May 8, 2019 at 11:35 PM
> To: dev
>
> > Hello all,
> > Some tests in Python have the problem that when they fail, lots of
> internal logging is dumped onto stdout, and we end up having to scroll way
> up to find the actual stack trace for the failed test. This logging, as far
> as i can tell, is dumping of fn api protos.
> >
> > Does anyone use these logs to look into the test failure? I would like
> to find a way to make these more compact, or maybe just stop logging them
> (people who need them can choose to log them in their local setup?).
> >
> > I lean towards making them more compact (by, for instance, writing
> functions that log their information in a more compact fashion); but I
> would like to hear thoughts from others.
> >
> > So thoughts? : )
> > -P.
>


Re: [discuss] Reducing log verbosity for Python failures?

2019-05-08 Thread Robert Bradshaw
+1 to making them significantly more compact in most cases.

From: Pablo Estrada 
Date: Wed, May 8, 2019 at 11:35 PM
To: dev

> Hello all,
> Some tests in Python have the problem that when they fail, lots of internal 
> logging is dumped onto stdout, and we end up having to scroll way up to find 
> the actual stack trace for the failed test. This logging, as far as i can 
> tell, is dumping of fn api protos.
>
> Does anyone use these logs to look into the test failure? I would like to 
> find a way to make these more compact, or maybe just stop logging them 
> (people who need them can choose to log them in their local setup?).
>
> I lean towards making them more compact (by, for instance, writing functions 
> that log their information in a more compact fashion); but I would like to 
> hear thoughts from others.
>
> So thoughts? : )
> -P.


Re: Python SDK timestamp precision

2019-05-08 Thread Robert Bradshaw
From: Kenneth Knowles 
Date: Wed, May 8, 2019 at 6:50 PM
To: dev

> This got pretty long, but I don't yet want to end it, because there's not 
> quite yet a solution that will allow a user to treat timestamps from most 
> systems as Beam timestamps.

+1, it'd be really nice to find a solution to this.

> I'm cutting pieces just to make inline replies easier to read.
>
> On Tue, Apr 23, 2019 at 9:03 AM Robert Bradshaw  wrote:
>>
>> On Tue, Apr 23, 2019 at 4:20 PM Kenneth Knowles  wrote:
>> >  -  WindowFn must receive exactly the data that came from the user's data 
>> > source. So that cannot be rounded.
>> >  - The user's WindowFn assigns to a window, so it can contain arbitrary 
>> > precision as it should be grouped as bytes.
>> >  - End of window, timers, watermark holds, etc, are all treated only as 
>> > bounds, so can all be rounded based on their use as an upper or lower 
>> > bound.
>> >
>> > We already do a lot of this - Pubsub publish timestamps are microsecond 
>> > precision (you could say our current connector constitutes data loss) as 
>> > are Windmill timestamps (since these are only combines of Beam timestamps 
>> > here there is no data loss). There are undoubtedly some corner cases I've 
>> > missed, and naively this might look like duplicating timestamps so that 
>> > could be an unacceptable performance concern.
>>
>> If I understand correctly, in this scheme WindowInto assignment is
>> paramaterized by a function that specifies how to parse/extract the
>> timestamp from the data element (maybe just a field specifier for
>> schema'd data) rather than store the (exact) timestamp in a standard
>> place in the WindowedValue, and the window merging always goes back to
>> the SDK rather than the possibility of it being handled runner-side.
>
> This sounds promising. You could also store the extracted approximate 
> timestamp somewhere, of course.
>
>> Even if the runner doesn't care about interpreting the window, I think
>> we'll want to have compatible window representations (and timestamp
>> representations, and windowing fns) across SDKs (especially for
>> cross-language) which favors choosing a consistent resolution.
>>
>> The end-of-window, for firing, can be approximate, but it seems it
>> should be exact for timestamp assignment of the result (and similarly
>> with the other timestamp combiners).
>
> I was thinking that the window itself should be stored as exact data, while 
> just the firing itself is approximated, since it already is, because of 
> watermarks and timers.

I think this works where we can compare encoded windows, but some
portable interpretation of windows is required for runner-side
implementation of merging windows (for example).

There may also be issues if windows (or timestamps) are assigned to a
high precision in one SDK, then inspected/acted on in another SDK, and
then passed back to the original SDK where the truncation would be
visible.

> You raise a good point that min/max timestamp combiners require actually 
> understanding the higher-precision timestamp. I can think of a couple things 
> to do. One is the old "standardize all 3 or for precisions we need" and the 
> other is that combiners other than EOW exist primarily to hold the watermark, 
> and that hold does not require the original precision. Still, neither of 
> these is that satisfying.

In the current model, the output timestamp is user-visible.

>> > A correction: Java *now* uses nanoseconds [1]. It uses the same breakdown 
>> > as proto (int64 seconds since epoch + int32 nanos within second). It has 
>> > legacy classes that use milliseconds, and Joda itself now encourages 
>> > moving back to Java's new Instant type. Nanoseconds should complicate the 
>> > arithmetic only for the one person authoring the date library, which they 
>> > have already done.
>>
>> The encoding and decoding need to be done in a language-consistent way
>> as well.
>
> I honestly am not sure what you mean by "language-consistent" here.

If we want to make reading and writing of timestamps, windows
cross-language, we can't rely on language-specific libraries to do the
encoding.

>> Also, most date libraries don't division, etc. operators, so
>> we have to do that as well. Not that it should be *that* hard.
>
> If the libraries dedicated to time handling haven't found it needful, is 
> there a specific reason you raise this? We do some simple math to find the 
> window things fall into; is that it?

Yes. E.g.

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77

would be a lot messier if there were no mapping date libraries to raw
ints that we can do arithmetic on. Writing this with the (seconds,
nanos) representation is painful. But I suppose we'd only have to do
it once per SDK.

>> >> It would also be really nice to clean up the infinite-future being the
>> >> somewhat arbitrary max micros rounded to millis, and
>> >> end-of-global-

[discuss] Reducing log verbosity for Python failures?

2019-05-08 Thread Pablo Estrada
Hello all,
Some tests in Python have the problem that when they fail, lots of internal
logging is dumped onto stdout, and we end up having to scroll way up to
find the actual stack trace for the failed test. This logging, as far as i
can tell, is dumping of fn api protos.

Does anyone use these logs to look into the test failure? I would like to
find a way to make these more compact, or maybe just stop logging them
(people who need them can choose to log them in their local setup?).

I lean towards making them more compact (by, for instance, writing
functions that log their information in a more compact fashion); but I
would like to hear thoughts from others.

So thoughts? : )
-P.


Re: [discuss] A tweak to the Python API for SDF?

2019-05-08 Thread Pablo Estrada
Hello all,
The API has been updated for Python (See
https://github.com/apache/beam/pull/8430). Please, if you catch any
documentation that needs updating, flag to me or just propose the change : )

As for Java - we didn't end up determining whether it makes sense to update
the API as well. Thoughts from others?

In any case, I've filed https://jira.apache.org/jira/browse/BEAM-7250 to
track this for Java.

Best
-P.

On Mon, Apr 29, 2019 at 2:41 PM Lukasz Cwik  wrote:

> Pablo, all the SplittableDoFn stuff is marked as @Experimental so one is
> able to change it. There really is only one complicated one to change in
> Watch.java, the rest are quite straightforward.
>
> On Mon, Apr 29, 2019 at 2:23 PM Pablo Estrada  wrote:
>
>> Thanks all,
>>  @Luke - I imagine that would be an improvement to the API, but this may
>> be harder as this is already available to users, and there are those who
>> have implemented SDFs under the current API. Would it be possible to make a
>> backwards-compatible change to the API here?
>>
>> For the Python changes, I've proposed a pull request:
>> https://github.com/apache/beam/pull/8430 - it was smaller than I thought
>> : ) - All comments welcome please.
>>
>> +Boyuan Zhang  I am happy to wait for your
>> SyntheticSource PR to be merged and make the appropriate changes if you'd
>> like.
>> Best
>> -P.
>>
>> On Mon, Apr 29, 2019 at 8:23 AM Lukasz Cwik  wrote:
>>
>>> Would it make sense to also do this in the Java SDK?
>>>
>>> The would make the restriction provider also mirror the TimerSpec and
>>> StateSpec which use annotations similar to how its done in Python.
>>>
>>> On Mon, Apr 29, 2019 at 3:42 AM Robert Bradshaw 
>>> wrote:
>>>
 +1 to introducing this Param for consistency (and making the
 substitution more obvious), and I think SDF is still new/experimental
 enough we can do this. I don't know if we need Spec in addition to
 Param and Provider.

 On Sat, Apr 27, 2019 at 1:07 AM Chamikara Jayalath <
 chamik...@google.com> wrote:
 >
 >
 >
 > On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada 
 wrote:
 >>
 >> Hi all,
 >> Sorry about the wall of text.
 >> So, first of all, I thought about this while reviewing a PR by
 Boyuan with an example of an SDF[1]. This is very exciting btw : ).
 >>
 >> Anyway... I certainly have a limited view of the whole SDF effort,
 but I think it's worth discussing this particular point about the API
 before finalizing SDF and making it widely available. So here I go:
 >>
 >> The Python API for SDF asks users to provide a restriction provider
 in their process function signature. More or less the following:
 >>
 >> class MyOwnLittleSDF(beam.DoFn):
 >>   def process(self, element,
 >>   restriction_tracker=MyOwnLittleRestrictionProvider()):
 >> # My DoFn logic...
 >>
 >> This is all fine, but something that I found a little odd is that
 the restriction provider gets replaced at runtime with a restriction
 tracker:
 >>
 >> class MyOwnLittleSDF(beam.DoFn):
 >>   def process(self, element,
 >>   restriction_tracker=MyOwnLittleRestrictionProvider()):
 >> # This assert succeeds : )
 >> assert not isinstance(restriction_tracker,
 >>   MyOwnLittleRestrictionProvider)
 >>
 >> After thinking a little bit about it, I realized that the default
 argument simply allows us to inform the runner where to find the
 restriction provider; but that the thing that we need at runtime is NOT the
 restriction provider - but rather, the restriction tracker.
 >>
 >> A similar pattern occurs with state and timers, where the runner
 needs to know the sort of state, the coder for the values in that state (or
 the time domain for timers); but the runtime parameter is different[2]. For
 state and timers (and window, timestamp, pane, etc.) we provide a pattern
 where users give a default value that is clearly a placeholder:
 beam.DoFn.TimerParam, or beam.DoFn.StateParam.
 >
 >
 > This is the way (new) DoFn work for Python SDK. SDK (harness)
 identifies meanings of different (potential) arguments to a DoFn using
 pre-defined default values.
 >
 >>
 >>
 >> In this case, the API is fairly similar, but (at least in my
 imagination), it is much more clear about how the DoFnParam will be
 replaced with something else at runtime. A similar change could be done for
 SDF:
 >>
 >> class MyOwnLittleSDF(beam.DoFn):
 >>   MY_RESTRICTION = \
 >>   RestrictionSpec(provider=MyOwnLittleRestrictionProvider())
 >>
 >>   def process(
 >>   self, element,
 >>
  restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)):
 >> # My DoFn logic..
 >
 >
 >
 > If I understood correctly, what you propose is similar to the
 existi

Re: Requesting contributor permission for Beam JIRA tickets

2019-05-08 Thread Lukasz Cwik
Welcome, I have assigned BEAM-7240 to you.

On Wed, May 8, 2019 at 10:30 AM Ajo Thomas  wrote:

> Hello,
>
> I am Ajo Thomas and I was hoping to work on making some improvements to
> the beam KinesisIO java SDK.
> I have created a ticket for it [
> https://issues.apache.org/jira/browse/BEAM-7240 ] and was hoping to
> assign it to myself.
>
> Requesting the admins to please add me as a contributor for Beam's JIRA
> issue tracker.
> My ASF JIRA Id: ajo.thomas24
>
> Thanks,
> Ajo Thomas
>
>
>
> ᐧ
>


Re: [DISCUSS] Backwards compatibility of @Experimental features

2019-05-08 Thread Kenneth Knowles
On Wed, May 8, 2019 at 9:29 AM Ahmet Altay  wrote:

>
>
> *From: *Kenneth Knowles 
> *Date: *Wed, May 8, 2019 at 9:24 AM
> *To: *dev
>
>
>>
>> On Fri, Apr 19, 2019 at 3:09 AM Ismaël Mejía  wrote:
>>
>>> It seems we mostly agree that @Experimental is important, and that API
>>> changes (removals) on experimental features should happen quickly but still
>>> give some time to users so the Experimental purpose is not lost.
>>>
>>> Ahmet proposal given our current release calendar is close to 2
>>> releases. Can we settle this on 2 releases as a 'minimum time' before
>>> removal? (This will let maintainers the option to choose to support it more
>>> time if they want as discussed in the related KafkaIO thread but still be
>>> friendly with users).
>>>
>>> Do we agree?
>>>
>>
>> This sounds pretty good to me.
>>
>
> Sounds good to me too.
>
>
>> How can we manage this? Right now we tie most activities (like
>> re-triaging flakes) to the release process, since it is the only thing that
>> happens regularly for the community. If we don't have some forcing then I
>> expect the whole thing will just be forgotten.
>>
>
> Can we pre-create a list of future releases in JIRA, and for each
> experimental feature require that a JIRA issue is created for resolving the
> experimental status and tag it with the release that will happen after the
> minimum time period?
>

Great idea. I just created the 2.15.0 release so it reaches far enough
ahead for right now.

Kenn


>
>> Kenn
>>
>>
>>>
>>> Note: for the other subjects (e.g. when an Experimental feature should
>>> become not experimental) I think we will hardly find an agreement so I
>>> think this should be treated in a per case basis by the maintainers, but if
>>> you want to follow up on that discussion we can open another thread for
>>> this.
>>>
>>>
>>>
>>> On Sat, Apr 6, 2019 at 1:04 AM Ahmet Altay  wrote:
>>>
 I agree that Experimental feature is still very useful. I was trying to
 argue that we diluted its value so +1 to reclaim that.

 Back to the original question, in my opinion removing existing
 "experimental and deprecated" features in n=1 release will confuse users.
 This will likely be a surprise to them because we have been maintaining
 this state release after release now. I would propose in the next release
 warning users of such a change happening and give them at least 3 months to
 upgrade to suggested newer paths. In the future we can have a shorter
 timelines assuming that we will set the user expectations right.

 On Fri, Apr 5, 2019 at 3:01 PM Ismaël Mejía  wrote:

> I agree 100% with Kenneth on the multiple advantages that the
> Experimental feature gave us. I also can count multiple places where this
> has been essential in other modules than core. I disagree on the fact that
> the @Experimental annotation has lost sense, it is simply ill defined, and
> probably it is by design because its advantages come from it.
>
> Most of the topics in this thread are a consequence of the this loose
> definition, e.g. (1) not defining how a feature becomes stable, and (2)
> what to do when we want to remove an experimental feature, are ideas that
> we need to decide if we define just continue to handle as we do today.
>
> Defining a target for graduating an Experimental feature is a bit too
> aggressive with not much benefit, in this case we could be losing the
> advantages of Experimental (save if we could change the proposed version 
> in
> the future). This probably makes sense for the removal of features but
> makes less sense to decide when some feature becomes stable. Of course in
> the case of the core SDKs packages this is probably more critical but
> nothing guarantees that things will be ready when we expect too. When will
> we tag for stability things like SDF or portability APIs?. We cannot
> predict the future for completion of features.
>
> Nobody has mentioned the LTS releases couldn’t be these like the
> middle points for these decisions? That at least will give LTS some value
> because so far I still have issues to understand the value of this idea
> given that we can do a minor release of any pre-released version.
>
> This debate is super important and nice to have, but we lost focus on
> my initial question. I like the proposal to remove a deprecated
> experimental feature (or part of it) after one release, in particular if
> the feature has a clear replacement path, however for cases like the
> removal of previously supported versions of Kafka one release may be too
> short. Other opinions on this? (or the other topics).
>
> On Fri, Apr 5, 2019 at 10:52 AM Robert Bradshaw 
> wrote:
>
>> if it's technically feasible, I am also in favor of requiring
>> experimental features to be (per-tag, Python should be updated) opt-in
>> only. We 

Re: [DISCUSS] Portability representation of schemas

2019-05-08 Thread Reuven Lax
On Wed, May 8, 2019 at 1:23 PM Robert Bradshaw  wrote:

> Very excited to see this. In particular, I think this will be very
> useful for cross-language pipelines (not just SQL, but also for
> describing non-trivial data (e.g. for source and sink reuse).
>
> The proto specification makes sense to me. The only thing that looks
> like it's missing (other than possibly iterable, for arbitrarily-large
> support) is multimap. Another basic type, should we want to support
> it, is union (though this of course can get messy).
>

multimap is an interesting suggestion. Do you have a use case in mind?

union (or oneof) is also a good suggestion. There are good use cases for
this, but this is a more fundamental change.


> I'm curious what the rational was for going with a oneof for type_info
> rather than an repeated components like we do with coders.
>

No strong reason. Do you think repeated components is better than oneof?


> Removing DATETIME as a logical coder on top of INT64 may cause issues
> of insufficient resolution and/or timespan. Similarly with DECIMAL (or
> would it be backed by string?)
>

There could be multiple TIMESTAMP types for different resolutions, and they
don't all need the same backing field type. E.g. the backing type for
nanoseconds could by Row(INT64, INT64), or it could just be a byte array.



>
> The biggest question, as far as portability is concerned at least, is
> the notion of logical types. serialized_class is clearly not portable,
> and I also think we'll want a way to share semantic meaning across
> SDKs (especially if things like dates become logical types). Perhaps
> URNs (+payloads) would be a better fit here?
>

Yes, URN + payload is probably the better fit for portability.


>
>
> Taking a step back, I think it's worth asking why we have different
> types, rather than simply making everything a LogicalType of bytes
> (aka coder). Other than encoding format, the answer I can come up with
> is that the type decides the kinds of operations that can be done on
> it, e.g. does it support comparison? Arithmetic? Containment?
> Higher-level date operations? Perhaps this should be used to guide the
> set of types we provide.
>

Also even though we could make everything a LogicalType (though at least
byte array would have to stay primitive), I think  it's useful to have a
slightly larger set of primitive types.  It makes things easier to
understand and debug, and it makes it simpler for the various SDKs to map
them to their types (e.g. mapping to POJOs).


> (Also, +1 to optional over nullable.)
>

sounds good. do others prefer optional as well?


>
>
> From: Reuven Lax 
> Date: Wed, May 8, 2019 at 6:54 PM
> To: dev
>
> > Beam Java's support for schemas is just about done: we infer schemas
> from a variety of types, we have a variety of utility transforms (join,
> aggregate, etc.) for schemas, and schemas are integrated with the ParDo
> machinery. The big remaining task I'm working on is writing documentation
> and examples for all of this so that users are aware. If you're interested,
> these slides from the London Beam meetup show a bit more how schemas can be
> used and how they simplify the API.
> >
> > I want to start integrating schemas into portability so that they can be
> used from other languages such as Python (in particular this will also
> allow BeamSQL to be invoked from other languages). In order to do this, the
> Beam portability protos must have a way of representing schemas. Since this
> has not been discussed before, I'm starting this discussion now on the list.
> >
> > As a reminder: a schema represents the type of a PCollection as a
> collection of fields. Each field has a name, an id (position), and a field
> type. A field type can be either a primitive type (int, long, string, byte
> array, etc.), a nested row (itself with a schema), an array, or a map.
> >
> > We also support logical types. A logical type is a way for the user to
> embed their own types in schema fields. A logical type is always backed by
> a schema type, and contains a function for mapping the user's logical type
> to the field type. You can think of this as a generalization of a coder:
> while a coder always maps the user type to a byte array, a logical type can
> map to an int, or a string, or any other schema field type (in fact any
> coder can always be used as a logical type for mapping to byte-array field
> types). Logical types are used extensively by Beam SQL to represent SQL
> types that have no correspondence in Beam's field types (e.g. SQL has 4
> different date/time types). Logical types for Beam schemas have a lot of
> similarities to AVRO logical types.
> >
> > An initial proto representation for schemas is here. Before we go
> further with this, I would like community consensus on what this
> representation should be. I can start by suggesting a few possible changes
> to this representation (and hopefully others will suggest others):
> >
> > Kenn Knowles has suggested removin

Re: [DISCUSS] Portability representation of schemas

2019-05-08 Thread Robert Bradshaw
Very excited to see this. In particular, I think this will be very
useful for cross-language pipelines (not just SQL, but also for
describing non-trivial data (e.g. for source and sink reuse).

The proto specification makes sense to me. The only thing that looks
like it's missing (other than possibly iterable, for arbitrarily-large
support) is multimap. Another basic type, should we want to support
it, is union (though this of course can get messy).

I'm curious what the rational was for going with a oneof for type_info
rather than an repeated components like we do with coders.

Removing DATETIME as a logical coder on top of INT64 may cause issues
of insufficient resolution and/or timespan. Similarly with DECIMAL (or
would it be backed by string?)

The biggest question, as far as portability is concerned at least, is
the notion of logical types. serialized_class is clearly not portable,
and I also think we'll want a way to share semantic meaning across
SDKs (especially if things like dates become logical types). Perhaps
URNs (+payloads) would be a better fit here?


Taking a step back, I think it's worth asking why we have different
types, rather than simply making everything a LogicalType of bytes
(aka coder). Other than encoding format, the answer I can come up with
is that the type decides the kinds of operations that can be done on
it, e.g. does it support comparison? Arithmetic? Containment?
Higher-level date operations? Perhaps this should be used to guide the
set of types we provide.

(Also, +1 to optional over nullable.)


From: Reuven Lax 
Date: Wed, May 8, 2019 at 6:54 PM
To: dev

> Beam Java's support for schemas is just about done: we infer schemas from a 
> variety of types, we have a variety of utility transforms (join, aggregate, 
> etc.) for schemas, and schemas are integrated with the ParDo machinery. The 
> big remaining task I'm working on is writing documentation and examples for 
> all of this so that users are aware. If you're interested, these slides from 
> the London Beam meetup show a bit more how schemas can be used and how they 
> simplify the API.
>
> I want to start integrating schemas into portability so that they can be used 
> from other languages such as Python (in particular this will also allow 
> BeamSQL to be invoked from other languages). In order to do this, the Beam 
> portability protos must have a way of representing schemas. Since this has 
> not been discussed before, I'm starting this discussion now on the list.
>
> As a reminder: a schema represents the type of a PCollection as a collection 
> of fields. Each field has a name, an id (position), and a field type. A field 
> type can be either a primitive type (int, long, string, byte array, etc.), a 
> nested row (itself with a schema), an array, or a map.
>
> We also support logical types. A logical type is a way for the user to embed 
> their own types in schema fields. A logical type is always backed by a schema 
> type, and contains a function for mapping the user's logical type to the 
> field type. You can think of this as a generalization of a coder: while a 
> coder always maps the user type to a byte array, a logical type can map to an 
> int, or a string, or any other schema field type (in fact any coder can 
> always be used as a logical type for mapping to byte-array field types). 
> Logical types are used extensively by Beam SQL to represent SQL types that 
> have no correspondence in Beam's field types (e.g. SQL has 4 different 
> date/time types). Logical types for Beam schemas have a lot of similarities 
> to AVRO logical types.
>
> An initial proto representation for schemas is here. Before we go further 
> with this, I would like community consensus on what this representation 
> should be. I can start by suggesting a few possible changes to this 
> representation (and hopefully others will suggest others):
>
> Kenn Knowles has suggested removing DATETIME as a primitive type, and instead 
> making it a logical type backed by INT64 as this keeps our primitive types 
> closer to "classical" PL primitive types. This also allows us to create 
> multiple versions of this type - e.g. TIMESTAMP(millis), TIMESTAMP(micros), 
> TIMESTAMP(nanos).
> If we do the above, we can also consider removing DECIMAL and making that a 
> logical type as well.
> The id field is currently used for some performance optimizations only. If we 
> formalized the idea of schema types having ids, then we might be able to use 
> this to allow self-recursive schemas (self-recursive types are not currently 
> allowed).
> Beam Schemas currently have an ARRAY type. However Beam supports "large 
> iterables" (iterables that don't fit in memory that the runner can page in), 
> and this doesn't match well to arrays. I think we need to add an ITERABLE 
> type as well to support things like GroupByKey results.
>
> It would also be interesting to explore allowing well-known metadata tags on 
> fields that Beam interprets. e.g. key and 

Re: Kotlin iterator error

2019-05-08 Thread Reuven Lax
Does Kotlin have a different Iterable than the standard Java one?

On Wed, May 8, 2019 at 10:47 AM Kenneth Knowles  wrote:

> Filed https://issues.apache.org/jira/browse/BEAM-7247 and assigned
> initially to +Reuven Lax  who is knowledgeable about
> the @Element annotation and analysis.
>
> Kenn
>
> On Wed, May 8, 2019 at 2:40 AM Maximilian Michels  wrote:
>
>> Hi Ankur,
>>
>> I've left a comment. Looking at the stack trace [1], this looks like a
>> problem with our Reflection analysis.
>>
>> -Max
>>
>> [1]
>>
>> https://gist.github.com/marcoslin/e1e19afdbacac9757f6974592cfd8d7f#file-stack-trace-txt
>>
>> On 04.05.19 00:56, Ankur Goenka wrote:
>> > Hi,
>> >
>> > A beam user on stackoverflow has posted issue while using kotlin sdk.
>> >
>> https://stackoverflow.com/questions/55908999/kotlin-iterable-not-supported-in-apache-beam/55911859#55911859
>> > I am not very familiar with kotlin so can someone please take a look.
>> >
>> > Thanks,
>> > Ankur
>>
>


Re: Coder Evolution

2019-05-08 Thread Lukasz Cwik
There was a thread about coder update in the past here[1]. Also, Reuven
sent out a doc[2] about pipeline drain and update which was discussed in
this thread[3]. I believe there have been more references to pipeline
update in other threads when people tried to change coder encodings in the
past as well.

Reuven/Dan are the best contacts about this on how this works inside of
Google, the limitations and other ideas that had been proposed.

1:
https://lists.apache.org/thread.html/f3b2daa740075cc39dc04f08d1eaacfcc2d550cecc04e27be024cf52@%3Cdev.beam.apache.org%3E
2:
https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#
3:
https://lists.apache.org/thread.html/37c9aa4aa5011801a7f060bf3f53e687e539fa6154cc9f1c544d4f7a@%3Cdev.beam.apache.org%3E

On Wed, May 8, 2019 at 11:45 AM Maximilian Michels  wrote:

> Hi,
>
> I'm looking into updating the Flink Runner to Flink version 1.8. Since
> version 1.7 Flink has a new optional interface for Coder evolution*.
>
> When a Flink pipeline is checkpointed, CoderSnapshots are written out
> alongside with the checkpointed data. When the pipeline is restored from
> that checkpoint, the CoderSnapshots are restored and used to
> reinstantiate the Coders.
>
> Furthermore, there is a compatibility and migration check between the
> old and the new Coder. This allows to determine whether
>
>   - The serializer did not change or is compatible (ok)
>   - The serialization format of the coder changed (ok after migration)
>   - The coder needs to be reconfigured and we know how to that based on
> the old version (ok after reconfiguration)
>   - The coder is incompatible (error)
>
> I was wondering about the Coder evolution story in Beam. The current
> state is that checkpointed Beam pipelines are only guaranteed to run
> with the same Beam version and pipeline version. A newer version of
> either might break the checkpoint format without any way to migrate the
> state.
>
> Should we start thinking about supporting Coder evolution in Beam?
>
> Thanks,
> Max
>
>
> * Coders are called TypeSerializers in Flink land. The interface is
> TypeSerializerSnapshot.
>


Re: [DISCUSS] Portability representation of schemas

2019-05-08 Thread Reuven Lax
On Wed, May 8, 2019 at 10:57 AM Rui Wang  wrote:

> Regarding to DATETIME, I totally agree it should be removed as
> primitive type to avoid that each language has to find their time libraries
> (and if they could not find any, they will likely go to logical type and
> use int64 from Schema).
>
> I have two questions regarding to the representation:
>
> 1. There is nullable field for FieldType. I am not an expert of
> programming language. So does this field in proto means "null" is common in
> programming languages? Or this field is really optional, that if a language
> does not need "null", they can just ignore this field?
>

We could also call this optional instead. I used nullable because that
seemed consistent with what SQL does.

A programming language that supports optional is free to implement this
using optionals instead of null values. However all the current Beam
languages (Java, Python, Go) all support null.


>
> 2. How's time zone is dealt with?
>

DATETIME is poorly named. It's really a timestamp type, so it has no time
zone.


>
>
> -Rui
>
>
>
>
> *From: *Reuven Lax 
> *Date: *Wed, May 8, 2019 at 9:54 AM
> *To: *dev
>
> Beam Java's support for schemas is just about done: we infer schemas from
>> a variety of types, we have a variety of utility transforms (join,
>> aggregate, etc.) for schemas, and schemas are integrated with the ParDo
>> machinery. The big remaining task I'm working on is writing documentation
>> and examples for all of this so that users are aware. If you're interested,
>> these slides
>> 
>>  from
>> the London Beam meetup show a bit more how schemas can be used and how they
>> simplify the API.
>>
>> I want to start integrating schemas into portability so that they can be
>> used from other languages such as Python (in particular this will also
>> allow BeamSQL to be invoked from other languages). In order to do this, the
>> Beam portability protos must have a way of representing schemas. Since this
>> has not been discussed before, I'm starting this discussion now on the list.
>>
>> As a reminder: a schema represents the type of a PCollection as a
>> collection of fields. Each field has a name, an id (position), and a field
>> type. A field type can be either a primitive type (int, long, string, byte
>> array, etc.), a nested row (itself with a schema), an array, or a map.
>>
>> We also support logical types. A logical type is a way for the user to
>> embed their own types in schema fields. A logical type is always backed by
>> a schema type, and contains a function for mapping the user's logical type
>> to the field type. You can think of this as a generalization of a coder:
>> while a coder always maps the user type to a byte array, a logical type can
>> map to an int, or a string, or any other schema field type (in fact any
>> coder can always be used as a logical type for mapping to byte-array field
>> types). Logical types are used extensively by Beam SQL to represent SQL
>> types that have no correspondence in Beam's field types (e.g. SQL has 4
>> different date/time types). Logical types for Beam schemas have a lot of
>> similarities to AVRO logical types.
>>
>> An initial proto representation for schemas is here
>> .
>> Before we go further with this, I would like community consensus on what
>> this representation should be. I can start by suggesting a few possible
>> changes to this representation (and hopefully others will suggest others):
>>
>>- Kenn Knowles has suggested removing DATETIME as a primitive type,
>>and instead making it a logical type backed by INT64 as this keeps our
>>primitive types closer to "classical" PL primitive types. This also allows
>>us to create multiple versions of this type - e.g. TIMESTAMP(millis),
>>TIMESTAMP(micros), TIMESTAMP(nanos).
>>- If we do the above, we can also consider removing DECIMAL and
>>making that a logical type as well.
>>- The id field is currently used for some performance optimizations
>>only. If we formalized the idea of schema types having ids, then we might
>>be able to use this to allow self-recursive schemas (self-recursive types
>>are not currently allowed).
>>- Beam Schemas currently have an ARRAY type. However Beam supports
>>"large iterables" (iterables that don't fit in memory that the runner can
>>page in), and this doesn't match well to arrays. I think we need to add an
>>ITERABLE type as well to support things like GroupByKey results.
>>
>> It would also be interesting to explore allowing well-known metadata tags
>> on fields that Beam interprets. e.g. key and value, to allow Beam to
>> interpret any two-field schema as a KV, or window and timestamp to allow
>> automatically filling those out. However this would be an extension to the
>>

Coder Evolution

2019-05-08 Thread Maximilian Michels

Hi,

I'm looking into updating the Flink Runner to Flink version 1.8. Since 
version 1.7 Flink has a new optional interface for Coder evolution*.


When a Flink pipeline is checkpointed, CoderSnapshots are written out 
alongside with the checkpointed data. When the pipeline is restored from 
that checkpoint, the CoderSnapshots are restored and used to 
reinstantiate the Coders.


Furthermore, there is a compatibility and migration check between the 
old and the new Coder. This allows to determine whether


 - The serializer did not change or is compatible (ok)
 - The serialization format of the coder changed (ok after migration)
 - The coder needs to be reconfigured and we know how to that based on
   the old version (ok after reconfiguration)
 - The coder is incompatible (error)

I was wondering about the Coder evolution story in Beam. The current 
state is that checkpointed Beam pipelines are only guaranteed to run 
with the same Beam version and pipeline version. A newer version of 
either might break the checkpoint format without any way to migrate the 
state.


Should we start thinking about supporting Coder evolution in Beam?

Thanks,
Max


* Coders are called TypeSerializers in Flink land. The interface is 
TypeSerializerSnapshot.


Re: [DISCUSS] Portability representation of schemas

2019-05-08 Thread Rui Wang
Regarding to DATETIME, I totally agree it should be removed as
primitive type to avoid that each language has to find their time libraries
(and if they could not find any, they will likely go to logical type and
use int64 from Schema).

I have two questions regarding to the representation:

1. There is nullable field for FieldType. I am not an expert of programming
language. So does this field in proto means "null" is common in programming
languages? Or this field is really optional, that if a language does not
need "null", they can just ignore this field?

2. How's time zone is dealt with?


-Rui




*From: *Reuven Lax 
*Date: *Wed, May 8, 2019 at 9:54 AM
*To: *dev

Beam Java's support for schemas is just about done: we infer schemas from a
> variety of types, we have a variety of utility transforms (join, aggregate,
> etc.) for schemas, and schemas are integrated with the ParDo machinery. The
> big remaining task I'm working on is writing documentation and examples for
> all of this so that users are aware. If you're interested, these slides
> 
>  from
> the London Beam meetup show a bit more how schemas can be used and how they
> simplify the API.
>
> I want to start integrating schemas into portability so that they can be
> used from other languages such as Python (in particular this will also
> allow BeamSQL to be invoked from other languages). In order to do this, the
> Beam portability protos must have a way of representing schemas. Since this
> has not been discussed before, I'm starting this discussion now on the list.
>
> As a reminder: a schema represents the type of a PCollection as a
> collection of fields. Each field has a name, an id (position), and a field
> type. A field type can be either a primitive type (int, long, string, byte
> array, etc.), a nested row (itself with a schema), an array, or a map.
>
> We also support logical types. A logical type is a way for the user to
> embed their own types in schema fields. A logical type is always backed by
> a schema type, and contains a function for mapping the user's logical type
> to the field type. You can think of this as a generalization of a coder:
> while a coder always maps the user type to a byte array, a logical type can
> map to an int, or a string, or any other schema field type (in fact any
> coder can always be used as a logical type for mapping to byte-array field
> types). Logical types are used extensively by Beam SQL to represent SQL
> types that have no correspondence in Beam's field types (e.g. SQL has 4
> different date/time types). Logical types for Beam schemas have a lot of
> similarities to AVRO logical types.
>
> An initial proto representation for schemas is here
> .
> Before we go further with this, I would like community consensus on what
> this representation should be. I can start by suggesting a few possible
> changes to this representation (and hopefully others will suggest others):
>
>- Kenn Knowles has suggested removing DATETIME as a primitive type,
>and instead making it a logical type backed by INT64 as this keeps our
>primitive types closer to "classical" PL primitive types. This also allows
>us to create multiple versions of this type - e.g. TIMESTAMP(millis),
>TIMESTAMP(micros), TIMESTAMP(nanos).
>- If we do the above, we can also consider removing DECIMAL and making
>that a logical type as well.
>- The id field is currently used for some performance optimizations
>only. If we formalized the idea of schema types having ids, then we might
>be able to use this to allow self-recursive schemas (self-recursive types
>are not currently allowed).
>- Beam Schemas currently have an ARRAY type. However Beam supports
>"large iterables" (iterables that don't fit in memory that the runner can
>page in), and this doesn't match well to arrays. I think we need to add an
>ITERABLE type as well to support things like GroupByKey results.
>
> It would also be interesting to explore allowing well-known metadata tags
> on fields that Beam interprets. e.g. key and value, to allow Beam to
> interpret any two-field schema as a KV, or window and timestamp to allow
> automatically filling those out. However this would be an extension to the
> current schema concept and deserves a separate discussion thread IMO.
>
> I ask that we please limit this discussion to the proto representation of
> schemas. If people want to discuss (or rediscuss) other things around Beam
> schemas, I'll be happy to create separate threads for those discussions.
>
> Thank you!
>
> Reuven
>


Re: request for beam minor release

2019-05-08 Thread Kenneth Knowles
For the benefit of the thread, I will also call out our incubating LTS
(Long-Term Support) policy. For critical fixes, we will issue patch
releases on the 2.7 branch. We are currently gathering proposals for
cherry-picks to 2.7.1 to do that release. Other than that, releases are a
lot of work so we focus on a steady rate of minor version releases instead
of patching non-LTS versions.

You can read at https://beam.apache.org/community/policies/.

Kenn

On Wed, May 8, 2019 at 9:22 AM Moorhead,Richard <
richard.moorhe...@cerner.com> wrote:

> Assuming 2.13 will include or otherwise be supported by flink-runner-1.7
> then this should not be an issue.
>
> --
> *From:* Jean-Baptiste Onofré 
> *Sent:* Wednesday, May 8, 2019 10:09 AM
> *To:* dev@beam.apache.org
> *Subject:* Re: request for beam minor release
>
> I second Max here. If you are just looking for this specific commit, you
> can take a next release that will include it.
>
> Regards
> JB
>
> On 08/05/2019 16:27, Maximilian Michels wrote:
> > Hi Richard,
> >
> > Would it be an option to use the upcoming 2.13.0 release? The commit
> > will be part of that release.
> >
> > Thanks,
> > Max
> >
> > On 08.05.19 15:43, Jean-Baptiste Onofré wrote:
> >> Hi,
> >>
> >> Any release are tagging. We create a branch based on a master commit.
> >>
> >> Are you requesting 2.10.1 maintenance release ?
> >>
> >> Regards
> >> JB
> >>
> >> On 08/05/2019 15:10, Moorhead,Richard wrote:
> >>> Is there a process for tagging a commit in master for a minor release?
> >>>
> >>> I am trying to get this
> >>> <
> https://github.com/apache/beam/pull/8503/commits/ffa5632bca8c7264993702c39c6ca013a9f6ecdb
> > commit
> >>>
> >>> released into 2.10.1
> >>>
> >>> CONFIDENTIALITY NOTICE This message and any included attachments are
> >>> from Cerner Corporation and are intended only for the addressee. The
> >>> information contained in this message is confidential and may
> constitute
> >>> inside or non-public information under international, federal, or state
> >>> securities laws. Unauthorized forwarding, printing, copying,
> >>> distribution, or use of such information is strictly prohibited and may
> >>> be unlawful. If you are not the addressee, please promptly delete this
> >>> message and notify the sender of the delivery error by e-mail or you
> may
> >>> call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
> >>> (816)221-1024.
> >>>
> >>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>


Re: Kotlin iterator error

2019-05-08 Thread Kenneth Knowles
Filed https://issues.apache.org/jira/browse/BEAM-7247 and assigned
initially to +Reuven Lax  who is knowledgeable about
the @Element annotation and analysis.

Kenn

On Wed, May 8, 2019 at 2:40 AM Maximilian Michels  wrote:

> Hi Ankur,
>
> I've left a comment. Looking at the stack trace [1], this looks like a
> problem with our Reflection analysis.
>
> -Max
>
> [1]
>
> https://gist.github.com/marcoslin/e1e19afdbacac9757f6974592cfd8d7f#file-stack-trace-txt
>
> On 04.05.19 00:56, Ankur Goenka wrote:
> > Hi,
> >
> > A beam user on stackoverflow has posted issue while using kotlin sdk.
> >
> https://stackoverflow.com/questions/55908999/kotlin-iterable-not-supported-in-apache-beam/55911859#55911859
> > I am not very familiar with kotlin so can someone please take a look.
> >
> > Thanks,
> > Ankur
>


Requesting contributor permission for Beam JIRA tickets

2019-05-08 Thread Ajo Thomas
Hello,

I am Ajo Thomas and I was hoping to work on making some improvements to the
beam KinesisIO java SDK.
I have created a ticket for it [
https://issues.apache.org/jira/browse/BEAM-7240 ] and was hoping to assign
it to myself.

Requesting the admins to please add me as a contributor for Beam's JIRA
issue tracker.
My ASF JIRA Id: ajo.thomas24

Thanks,
Ajo Thomas



ᐧ


Re: Artifact staging in cross-language pipelines

2019-05-08 Thread Chamikara Jayalath
On Tue, May 7, 2019 at 10:21 AM Maximilian Michels  wrote:

> Here's the first draft:
>
> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
>
> It's rather high-level. We may want to add more details once we have
> finalized the design. Feel free to make comments and edits.
>

Thanks Max. Added some comments.


>
> > All of this goes back to the idea that I think the listing of
> > artifacts (or more general dependencies) should be a property of the
> > environment themselves.
>
> +1 I came to the same conclusion while thinking about how to store
> artifact information for deferred execution of the pipeline.
>
> -Max
>
> On 07.05.19 18:10, Robert Bradshaw wrote:
> > Looking forward to your writeup, Max. In the meantime, some comments
> below.
> >
> >
> > From: Lukasz Cwik 
> > Date: Thu, May 2, 2019 at 6:45 PM
> > To: dev
> >
> >>
> >>
> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw 
> wrote:
> >>>
> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik  wrote:
> 
>  We should stick with URN + payload + artifact metadata[1] where the
> only mandatory one that all SDKs and expansion services understand is the
> "bytes" artifact type. This allows us to add optional URNs for file://,
> http://, Maven, PyPi, ... in the future. I would make the artifact
> staging service use the same URN + payload mechanism to get compatibility
> of artifacts across the different services and also have the artifact
> staging service be able to be queried for the list of artifact types it
> supports.
> >>>
> >>> +1
> >>>
>  Finally, we would need to have environments enumerate the artifact
> types that they support.
> >>>
> >>> Meaning at runtime, or as another field statically set in the proto?
> >>
> >>
> >> I don't believe runners/SDKs should have to know what artifacts each
> environment supports at runtime and instead have environments enumerate
> them explicitly in the proto. I have been thinking about a more general
> "capabilities" block on environments which allow them to enumerate URNs
> that the environment understands. This would include artifact type URNs,
> PTransform URNs, coder URNs, ... I haven't proposed anything specific down
> this line yet because I was wondering how environment resources (CPU, min
> memory, hardware like GPU, AWS/GCP/Azure/... machine types) should/could
> tie into this.
> >>
> >>>
>  Having everyone have the same "artifact" representation would be
> beneficial since:
>  a) Python environments could install dependencies from a
> requirements.txt file (something that the Google Cloud Dataflow Python
> docker container allows for today)
>  b) It provides an extensible and versioned mechanism for SDKs,
> environments, and artifact staging/retrieval services to support additional
> artifact types
>  c) Allow for expressing a canonical representation of an artifact
> like a Maven package so a runner could merge environments that the runner
> deems compatible.
> 
>  The flow I could see is:
>  1) (optional) query artifact staging service for supported artifact
> types
>  2) SDK request expansion service to expand transform passing in a
> list of artifact types the SDK and artifact staging service support, the
> expansion service returns a list of artifact types limited to those
> supported types + any supported by the environment
> >>>
> >>> The crux of the issue seems to be how the expansion service returns
> >>> the artifacts themselves. Is this going with the approach that the
> >>> caller of the expansion service must host an artifact staging service?
> >>
> >>
> >> The caller would not need to host an artifact staging service (but
> would become effectively a proxy service, see my comment below for more
> details) as I would have expected this to be part of the expansion service
> response.
> >>
> >>>
> >>> There is also the question here is how the returned artifacts get
> >>> attached to the various environments, or whether they get implicitly
> >>> applied to all returned stages (which need not have a consistent
> >>> environment)?
> >>
> >>
> >> I would suggest returning additional information that says what
> artifact is for which environment. Applying all artifacts to all
> environments is likely to cause issues since some environments may not
> understand certain artifact types or may get conflicting versions of
> artifacts. I would see this happening since an expansion service that
> aggregates other expansion services seems likely, for example:
> >>   /-> ExpansionSerivce(Python)
> >> ExpansionService(Aggregator) --> ExpansionService(Java)
> >>   \-> ExpansionSerivce(Go)
> >
> > All of this goes back to the idea that I think the listing of
> > artifacts (or more general dependencies) should be a property of the
> > environment themselves.
> >
>  3) SDK converts any artifact types that the artifact staging service
> or environment doesn'

[DISCUSS] Portability representation of schemas

2019-05-08 Thread Reuven Lax
Beam Java's support for schemas is just about done: we infer schemas from a
variety of types, we have a variety of utility transforms (join, aggregate,
etc.) for schemas, and schemas are integrated with the ParDo machinery. The
big remaining task I'm working on is writing documentation and examples for
all of this so that users are aware. If you're interested, these slides

from
the London Beam meetup show a bit more how schemas can be used and how they
simplify the API.

I want to start integrating schemas into portability so that they can be
used from other languages such as Python (in particular this will also
allow BeamSQL to be invoked from other languages). In order to do this, the
Beam portability protos must have a way of representing schemas. Since this
has not been discussed before, I'm starting this discussion now on the list.

As a reminder: a schema represents the type of a PCollection as a
collection of fields. Each field has a name, an id (position), and a field
type. A field type can be either a primitive type (int, long, string, byte
array, etc.), a nested row (itself with a schema), an array, or a map.

We also support logical types. A logical type is a way for the user to
embed their own types in schema fields. A logical type is always backed by
a schema type, and contains a function for mapping the user's logical type
to the field type. You can think of this as a generalization of a coder:
while a coder always maps the user type to a byte array, a logical type can
map to an int, or a string, or any other schema field type (in fact any
coder can always be used as a logical type for mapping to byte-array field
types). Logical types are used extensively by Beam SQL to represent SQL
types that have no correspondence in Beam's field types (e.g. SQL has 4
different date/time types). Logical types for Beam schemas have a lot of
similarities to AVRO logical types.

An initial proto representation for schemas is here
.
Before we go further with this, I would like community consensus on what
this representation should be. I can start by suggesting a few possible
changes to this representation (and hopefully others will suggest others):

   - Kenn Knowles has suggested removing DATETIME as a primitive type, and
   instead making it a logical type backed by INT64 as this keeps our
   primitive types closer to "classical" PL primitive types. This also allows
   us to create multiple versions of this type - e.g. TIMESTAMP(millis),
   TIMESTAMP(micros), TIMESTAMP(nanos).
   - If we do the above, we can also consider removing DECIMAL and making
   that a logical type as well.
   - The id field is currently used for some performance optimizations
   only. If we formalized the idea of schema types having ids, then we might
   be able to use this to allow self-recursive schemas (self-recursive types
   are not currently allowed).
   - Beam Schemas currently have an ARRAY type. However Beam supports
   "large iterables" (iterables that don't fit in memory that the runner can
   page in), and this doesn't match well to arrays. I think we need to add an
   ITERABLE type as well to support things like GroupByKey results.

It would also be interesting to explore allowing well-known metadata tags
on fields that Beam interprets. e.g. key and value, to allow Beam to
interpret any two-field schema as a KV, or window and timestamp to allow
automatically filling those out. However this would be an extension to the
current schema concept and deserves a separate discussion thread IMO.

I ask that we please limit this discussion to the proto representation of
schemas. If people want to discuss (or rediscuss) other things around Beam
schemas, I'll be happy to create separate threads for those discussions.

Thank you!

Reuven


Re: Python SDK timestamp precision

2019-05-08 Thread Kenneth Knowles
This got pretty long, but I don't yet want to end it, because there's not
quite yet a solution that will allow a user to treat timestamps from most
systems as Beam timestamps.

I'm cutting pieces just to make inline replies easier to read.

On Tue, Apr 23, 2019 at 9:03 AM Robert Bradshaw  wrote:

> On Tue, Apr 23, 2019 at 4:20 PM Kenneth Knowles  wrote:
> >  -  WindowFn must receive exactly the data that came from the user's
> data source. So that cannot be rounded.
> >  - The user's WindowFn assigns to a window, so it can contain arbitrary
> precision as it should be grouped as bytes.
> >  - End of window, timers, watermark holds, etc, are all treated only as
> bounds, so can all be rounded based on their use as an upper or lower bound.
> >
> > We already do a lot of this - Pubsub publish timestamps are microsecond
> precision (you could say our current connector constitutes data loss) as
> are Windmill timestamps (since these are only combines of Beam timestamps
> here there is no data loss). There are undoubtedly some corner cases I've
> missed, and naively this might look like duplicating timestamps so that
> could be an unacceptable performance concern.
>
> If I understand correctly, in this scheme WindowInto assignment is
> paramaterized by a function that specifies how to parse/extract the
> timestamp from the data element (maybe just a field specifier for
> schema'd data) rather than store the (exact) timestamp in a standard
> place in the WindowedValue, and the window merging always goes back to
> the SDK rather than the possibility of it being handled runner-side.
>

This sounds promising. You could also store the extracted approximate
timestamp somewhere, of course.

Even if the runner doesn't care about interpreting the window, I think
> we'll want to have compatible window representations (and timestamp
> representations, and windowing fns) across SDKs (especially for
> cross-language) which favors choosing a consistent resolution.

The end-of-window, for firing, can be approximate, but it seems it
> should be exact for timestamp assignment of the result (and similarly
> with the other timestamp combiners).
>

I was thinking that the window itself should be stored as exact data, while
just the firing itself is approximated, since it already is, because of
watermarks and timers. You raise a good point that min/max timestamp
combiners require actually understanding the higher-precision timestamp. I
can think of a couple things to do. One is the old "standardize all 3 or
for precisions we need" and the other is that combiners other than EOW
exist primarily to hold the watermark, and that hold does not require the
original precision. Still, neither of these is that satisfying.


> > A correction: Java *now* uses nanoseconds [1]. It uses the same
> breakdown as proto (int64 seconds since epoch + int32 nanos within second).
> It has legacy classes that use milliseconds, and Joda itself now encourages
> moving back to Java's new Instant type. Nanoseconds should complicate the
> arithmetic only for the one person authoring the date library, which they
> have already done.
>
> The encoding and decoding need to be done in a language-consistent way
> as well.


I honestly am not sure what you mean by "language-consistent" here.

Also, most date libraries don't division, etc. operators, so
> we have to do that as well. Not that it should be *that* hard.


If the libraries dedicated to time handling haven't found it needful, is
there a specific reason you raise this? We do some simple math to find the
window things fall into; is that it?


> >> It would also be really nice to clean up the infinite-future being the
> >> somewhat arbitrary max micros rounded to millis, and
> >> end-of-global-window being infinite-future minus 1 hour (IIRC), etc.
> >> as well as the ugly logic in Python to cope with millis-micros
> >> conversion.
> >
> > I actually don't have a problem with this. If you are trying to keep the
> representation compact, not add bytes on top of instants, then you just
> have to choose magic numbers, right?
>
> It's not about compactness, it's the (historically-derived?)
> arbitrariness of the numbers.


What I mean is that the only reason to fit them into an integer at all is
compactness. Otherwise, you could use a proper disjoint union representing
your intent directly, and all fiddling goes away, like `Timestamp ::=
PosInf | NegInf | EndOfGlobalWindow | ActualTime(Instant)`. It costs a
couple of bits.

Kenn

For example, the bounds are chosen to
> fit within 64-bit mircos despite milliseconds being the "chosen"
> granularity, and care was taken that
>
> WindowInto(Global) | GBK | WindowInto(Minute) | GBK
>
> works, but
>
> WindowInto(Global) | GBK | WindowInto(Day) | GBK
>
> may produce elements with timestamps greater than MaxTimestamp.
>
> >
> > Kenn
> >
> > [1] https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html
> >
> >>
> >> > On Wed, Apr 17, 2019 at 3:13 PM Robert Bur

Re: [DISCUSS] Backwards compatibility of @Experimental features

2019-05-08 Thread Ahmet Altay
*From: *Kenneth Knowles 
*Date: *Wed, May 8, 2019 at 9:24 AM
*To: *dev


>
> On Fri, Apr 19, 2019 at 3:09 AM Ismaël Mejía  wrote:
>
>> It seems we mostly agree that @Experimental is important, and that API
>> changes (removals) on experimental features should happen quickly but still
>> give some time to users so the Experimental purpose is not lost.
>>
>> Ahmet proposal given our current release calendar is close to 2 releases.
>> Can we settle this on 2 releases as a 'minimum time' before removal? (This
>> will let maintainers the option to choose to support it more time if they
>> want as discussed in the related KafkaIO thread but still be friendly with
>> users).
>>
>> Do we agree?
>>
>
> This sounds pretty good to me.
>

Sounds good to me too.


> How can we manage this? Right now we tie most activities (like re-triaging
> flakes) to the release process, since it is the only thing that happens
> regularly for the community. If we don't have some forcing then I expect
> the whole thing will just be forgotten.
>

Can we pre-create a list of future releases in JIRA, and for each
experimental feature require that a JIRA issue is created for resolving the
experimental status and tag it with the release that will happen after the
minimum time period?


>
> Kenn
>
>
>>
>> Note: for the other subjects (e.g. when an Experimental feature should
>> become not experimental) I think we will hardly find an agreement so I
>> think this should be treated in a per case basis by the maintainers, but if
>> you want to follow up on that discussion we can open another thread for
>> this.
>>
>>
>>
>> On Sat, Apr 6, 2019 at 1:04 AM Ahmet Altay  wrote:
>>
>>> I agree that Experimental feature is still very useful. I was trying to
>>> argue that we diluted its value so +1 to reclaim that.
>>>
>>> Back to the original question, in my opinion removing existing
>>> "experimental and deprecated" features in n=1 release will confuse users.
>>> This will likely be a surprise to them because we have been maintaining
>>> this state release after release now. I would propose in the next release
>>> warning users of such a change happening and give them at least 3 months to
>>> upgrade to suggested newer paths. In the future we can have a shorter
>>> timelines assuming that we will set the user expectations right.
>>>
>>> On Fri, Apr 5, 2019 at 3:01 PM Ismaël Mejía  wrote:
>>>
 I agree 100% with Kenneth on the multiple advantages that the
 Experimental feature gave us. I also can count multiple places where this
 has been essential in other modules than core. I disagree on the fact that
 the @Experimental annotation has lost sense, it is simply ill defined, and
 probably it is by design because its advantages come from it.

 Most of the topics in this thread are a consequence of the this loose
 definition, e.g. (1) not defining how a feature becomes stable, and (2)
 what to do when we want to remove an experimental feature, are ideas that
 we need to decide if we define just continue to handle as we do today.

 Defining a target for graduating an Experimental feature is a bit too
 aggressive with not much benefit, in this case we could be losing the
 advantages of Experimental (save if we could change the proposed version in
 the future). This probably makes sense for the removal of features but
 makes less sense to decide when some feature becomes stable. Of course in
 the case of the core SDKs packages this is probably more critical but
 nothing guarantees that things will be ready when we expect too. When will
 we tag for stability things like SDF or portability APIs?. We cannot
 predict the future for completion of features.

 Nobody has mentioned the LTS releases couldn’t be these like the middle
 points for these decisions? That at least will give LTS some value because
 so far I still have issues to understand the value of this idea given that
 we can do a minor release of any pre-released version.

 This debate is super important and nice to have, but we lost focus on
 my initial question. I like the proposal to remove a deprecated
 experimental feature (or part of it) after one release, in particular if
 the feature has a clear replacement path, however for cases like the
 removal of previously supported versions of Kafka one release may be too
 short. Other opinions on this? (or the other topics).

 On Fri, Apr 5, 2019 at 10:52 AM Robert Bradshaw 
 wrote:

> if it's technically feasible, I am also in favor of requiring
> experimental features to be (per-tag, Python should be updated) opt-in
> only. We should probably regularly audit the set of experimental features
> we ship (I'd say as part of the release, but that process is laborious
> enough, perhaps we should do it on a half-release cycle?) I think imposing
> hard deadlines (chosen when a feature is intro

Plans for Python type hints

2019-05-08 Thread Udi Meiri
Hi,
I've written a document, with input from robertwb@, detailing the direction
forward I want to take type hints in Python 3. The document contains
background, a survey of existing type tools, and example usage.
The summary of proposed changes is:


   1.

   Update Beam's type hinting support to work with Python 3, with minimal
   changes and keeping backwards compatibility.
   1.

  Support Py3 type hints.
  2.

  Fix trivial_inference module to work with Py3 bytecode.
  2.

   Migrate to standard typing module types, to make it easier to migrate to
   using external packages later on.
   3.

   Start using external typing packages to simplify maintenance and add
   features (such as better inference).


Any comments would be welcome here or on the doc.

doc:
https://docs.google.com/document/d/15bsOL3YcUWuIjnxqhi9nanhj2eh9S6-QlLYuL7ufcXY/edit?usp=sharing
JIRA: https://issues.apache.org/jira/browse/BEAM-7060


smime.p7s
Description: S/MIME Cryptographic Signature


Re: [DISCUSS] Backwards compatibility of @Experimental features

2019-05-08 Thread Kenneth Knowles
On Fri, Apr 19, 2019 at 3:09 AM Ismaël Mejía  wrote:

> It seems we mostly agree that @Experimental is important, and that API
> changes (removals) on experimental features should happen quickly but still
> give some time to users so the Experimental purpose is not lost.
>
> Ahmet proposal given our current release calendar is close to 2 releases.
> Can we settle this on 2 releases as a 'minimum time' before removal? (This
> will let maintainers the option to choose to support it more time if they
> want as discussed in the related KafkaIO thread but still be friendly with
> users).
>
> Do we agree?
>

This sounds pretty good to me. How can we manage this? Right now we tie
most activities (like re-triaging flakes) to the release process, since it
is the only thing that happens regularly for the community. If we don't
have some forcing then I expect the whole thing will just be forgotten.

Kenn


>
> Note: for the other subjects (e.g. when an Experimental feature should
> become not experimental) I think we will hardly find an agreement so I
> think this should be treated in a per case basis by the maintainers, but if
> you want to follow up on that discussion we can open another thread for
> this.
>
>
>
> On Sat, Apr 6, 2019 at 1:04 AM Ahmet Altay  wrote:
>
>> I agree that Experimental feature is still very useful. I was trying to
>> argue that we diluted its value so +1 to reclaim that.
>>
>> Back to the original question, in my opinion removing existing
>> "experimental and deprecated" features in n=1 release will confuse users.
>> This will likely be a surprise to them because we have been maintaining
>> this state release after release now. I would propose in the next release
>> warning users of such a change happening and give them at least 3 months to
>> upgrade to suggested newer paths. In the future we can have a shorter
>> timelines assuming that we will set the user expectations right.
>>
>> On Fri, Apr 5, 2019 at 3:01 PM Ismaël Mejía  wrote:
>>
>>> I agree 100% with Kenneth on the multiple advantages that the
>>> Experimental feature gave us. I also can count multiple places where this
>>> has been essential in other modules than core. I disagree on the fact that
>>> the @Experimental annotation has lost sense, it is simply ill defined, and
>>> probably it is by design because its advantages come from it.
>>>
>>> Most of the topics in this thread are a consequence of the this loose
>>> definition, e.g. (1) not defining how a feature becomes stable, and (2)
>>> what to do when we want to remove an experimental feature, are ideas that
>>> we need to decide if we define just continue to handle as we do today.
>>>
>>> Defining a target for graduating an Experimental feature is a bit too
>>> aggressive with not much benefit, in this case we could be losing the
>>> advantages of Experimental (save if we could change the proposed version in
>>> the future). This probably makes sense for the removal of features but
>>> makes less sense to decide when some feature becomes stable. Of course in
>>> the case of the core SDKs packages this is probably more critical but
>>> nothing guarantees that things will be ready when we expect too. When will
>>> we tag for stability things like SDF or portability APIs?. We cannot
>>> predict the future for completion of features.
>>>
>>> Nobody has mentioned the LTS releases couldn’t be these like the middle
>>> points for these decisions? That at least will give LTS some value because
>>> so far I still have issues to understand the value of this idea given that
>>> we can do a minor release of any pre-released version.
>>>
>>> This debate is super important and nice to have, but we lost focus on my
>>> initial question. I like the proposal to remove a deprecated  experimental
>>> feature (or part of it) after one release, in particular if the feature has
>>> a clear replacement path, however for cases like the removal of previously
>>> supported versions of Kafka one release may be too short. Other opinions on
>>> this? (or the other topics).
>>>
>>> On Fri, Apr 5, 2019 at 10:52 AM Robert Bradshaw 
>>> wrote:
>>>
 if it's technically feasible, I am also in favor of requiring
 experimental features to be (per-tag, Python should be updated) opt-in
 only. We should probably regularly audit the set of experimental features
 we ship (I'd say as part of the release, but that process is laborious
 enough, perhaps we should do it on a half-release cycle?) I think imposing
 hard deadlines (chosen when a feature is introduced) is too extreme, but
 might be valuable if opt-in plus regular audit is insufficient.

 On Thu, Apr 4, 2019 at 5:28 AM Kenneth Knowles  wrote:

> This all makes me think that we should rethink how we ship
> experimental features. My experience is also that (1) users don't know if
> something is experimental or don't think hard about it and (2) we don't 
> use
> experimental time period to gather f

Re: request for beam minor release

2019-05-08 Thread Moorhead,Richard
Assuming 2.13 will include or otherwise be supported by flink-runner-1.7 then 
this should not be an issue.


From: Jean-Baptiste Onofré 
Sent: Wednesday, May 8, 2019 10:09 AM
To: dev@beam.apache.org
Subject: Re: request for beam minor release

I second Max here. If you are just looking for this specific commit, you
can take a next release that will include it.

Regards
JB

On 08/05/2019 16:27, Maximilian Michels wrote:
> Hi Richard,
>
> Would it be an option to use the upcoming 2.13.0 release? The commit
> will be part of that release.
>
> Thanks,
> Max
>
> On 08.05.19 15:43, Jean-Baptiste Onofré wrote:
>> Hi,
>>
>> Any release are tagging. We create a branch based on a master commit.
>>
>> Are you requesting 2.10.1 maintenance release ?
>>
>> Regards
>> JB
>>
>> On 08/05/2019 15:10, Moorhead,Richard wrote:
>>> Is there a process for tagging a commit in master for a minor release?
>>>
>>> I am trying to get this
>>> 
>>>  commit
>>>
>>> released into 2.10.1
>>>
>>> CONFIDENTIALITY NOTICE This message and any included attachments are
>>> from Cerner Corporation and are intended only for the addressee. The
>>> information contained in this message is confidential and may constitute
>>> inside or non-public information under international, federal, or state
>>> securities laws. Unauthorized forwarding, printing, copying,
>>> distribution, or use of such information is strictly prohibited and may
>>> be unlawful. If you are not the addressee, please promptly delete this
>>> message and notify the sender of the delivery error by e-mail or you may
>>> call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
>>> (816)221-1024.
>>>
>>

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.


Re: request for beam minor release

2019-05-08 Thread Jean-Baptiste Onofré
I second Max here. If you are just looking for this specific commit, you
can take a next release that will include it.

Regards
JB

On 08/05/2019 16:27, Maximilian Michels wrote:
> Hi Richard,
> 
> Would it be an option to use the upcoming 2.13.0 release? The commit
> will be part of that release.
> 
> Thanks,
> Max
> 
> On 08.05.19 15:43, Jean-Baptiste Onofré wrote:
>> Hi,
>>
>> Any release are tagging. We create a branch based on a master commit.
>>
>> Are you requesting 2.10.1 maintenance release ?
>>
>> Regards
>> JB
>>
>> On 08/05/2019 15:10, Moorhead,Richard wrote:
>>> Is there a process for tagging a commit in master for a minor release?
>>>
>>> I am trying to get this
>>> 
>>>  commit
>>>
>>> released into 2.10.1
>>>  
>>> CONFIDENTIALITY NOTICE This message and any included attachments are
>>> from Cerner Corporation and are intended only for the addressee. The
>>> information contained in this message is confidential and may constitute
>>> inside or non-public information under international, federal, or state
>>> securities laws. Unauthorized forwarding, printing, copying,
>>> distribution, or use of such information is strictly prohibited and may
>>> be unlawful. If you are not the addressee, please promptly delete this
>>> message and notify the sender of the delivery error by e-mail or you may
>>> call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
>>> (816)221-1024.
>>>
>>

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: request for beam minor release

2019-05-08 Thread Maximilian Michels

Hi Richard,

Would it be an option to use the upcoming 2.13.0 release? The commit 
will be part of that release.


Thanks,
Max

On 08.05.19 15:43, Jean-Baptiste Onofré wrote:

Hi,

Any release are tagging. We create a branch based on a master commit.

Are you requesting 2.10.1 maintenance release ?

Regards
JB

On 08/05/2019 15:10, Moorhead,Richard wrote:

Is there a process for tagging a commit in master for a minor release?

I am trying to get this

 commit
released into 2.10.1
  


CONFIDENTIALITY NOTICE This message and any included attachments are
from Cerner Corporation and are intended only for the addressee. The
information contained in this message is confidential and may constitute
inside or non-public information under international, federal, or state
securities laws. Unauthorized forwarding, printing, copying,
distribution, or use of such information is strictly prohibited and may
be unlawful. If you are not the addressee, please promptly delete this
message and notify the sender of the delivery error by e-mail or you may
call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
(816)221-1024.





Re: request for beam minor release

2019-05-08 Thread Jean-Baptiste Onofré
Hi,

Any release are tagging. We create a branch based on a master commit.

Are you requesting 2.10.1 maintenance release ?

Regards
JB

On 08/05/2019 15:10, Moorhead,Richard wrote:
> Is there a process for tagging a commit in master for a minor release?
> 
> I am trying to get this
> 
>  commit
> released into 2.10.1
>  
> 
> CONFIDENTIALITY NOTICE This message and any included attachments are
> from Cerner Corporation and are intended only for the addressee. The
> information contained in this message is confidential and may constitute
> inside or non-public information under international, federal, or state
> securities laws. Unauthorized forwarding, printing, copying,
> distribution, or use of such information is strictly prohibited and may
> be unlawful. If you are not the addressee, please promptly delete this
> message and notify the sender of the delivery error by e-mail or you may
> call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
> (816)221-1024.
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


request for beam minor release

2019-05-08 Thread Moorhead,Richard
Is there a process for tagging a commit in master for a minor release?

I am trying to get 
this
 commit released into 2.10.1


CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.


Re: Kotlin iterator error

2019-05-08 Thread Maximilian Michels

Hi Ankur,

I've left a comment. Looking at the stack trace [1], this looks like a 
problem with our Reflection analysis.


-Max

[1] 
https://gist.github.com/marcoslin/e1e19afdbacac9757f6974592cfd8d7f#file-stack-trace-txt


On 04.05.19 00:56, Ankur Goenka wrote:

Hi,

A beam user on stackoverflow has posted issue while using kotlin sdk.
https://stackoverflow.com/questions/55908999/kotlin-iterable-not-supported-in-apache-beam/55911859#55911859
I am not very familiar with kotlin so can someone please take a look.

Thanks,
Ankur