Re: SparkRunner Combine.perKey performance

2019-06-17 Thread Jan Lukavský
> It depends on the typical number of elements per window. E.g. 
consider sliding windows with a size of one hour and a periodicity of 
one minute. If we have one datum per hour, better to not explode. If we 
have one datum per minute, it's a wash. If we have one datum per second, 
much better to explode.


I'm not sure if I follow. I'd say it works the opposite. If there is one 
record per second, then exploding windows before shuffle would mean we 
have to shuffle 60x more data. That becomes critical as more and more 
data is added, so exploding a dataset with 1000s of records per second 
will become really bad.


Either way, these are runtime conditions that are not known in advance, 
so it cannot be used in pipeline translation.


I have created JIRA [1] to track that and will try some experiments.

Jan

[1] https://issues.apache.org/jira/browse/BEAM-7574

On 6/14/19 1:42 PM, Robert Bradshaw wrote:

On Fri, Jun 14, 2019 at 1:02 PM Jan Lukavský  wrote:

  > Interesting. However, there we should never need to sort the windows
of the input, only the set of live windows (of which there may be any
number regardless of whether WindowFn does singleton assignments, and
only then in the merging case).

Ack, however the current implementation creates accumulator from input
and therefore sorts windows for all elements. Moreover, it doesn't
distinguish between merging and non-merging windows.

Yes, the create accumulator from input (and then merging the created
accumulators) is where the insane overheads are coming from. We should
not use that pattern.


  > This seems like a merging vs. non-merging choice, not a
single-vs-multiple window choice.

I'd say the condition should be non-merging && not many windows per
element, because otherwise it makes sense to group the windows although
it is non-merging windowing (sliding with small slide step). Otherwise
we would explode the data too much.

It depends on the typical number of elements per window. E.g. consider
sliding windows with a size of one hour and a periodicity of one
minute. If we have one datum per hour, better to not explode. If we
have one datum per minute, it's a wash. If we have one datum per
second, much better to explode.


On 6/14/19 12:19 PM, Robert Bradshaw wrote:

On Fri, Jun 14, 2019 at 12:10 PM Jan Lukavský  wrote:

Hi Robert,

thanks for the discussion. I will create a JIRA with summary of this.
Some comments inline.

Jan

On 6/14/19 10:49 AM, Robert Bradshaw wrote:

On Thu, Jun 13, 2019 at 8:43 PM Jan Lukavský  wrote:

On 6/13/19 6:10 PM, Robert Bradshaw wrote:

On Thu, Jun 13, 2019 at 5:28 PM Jan Lukavský  wrote:

On 6/13/19 4:31 PM, Robert Bradshaw wrote:

The comment fails to take into account the asymmetry between calling addInput 
vs. mergeAccumulators. It also focuses a lot on the asymptotic behavior, when 
the most common behavior is likely having a single (global) window.

Yes, occurred to me too. There are more questions here:

a) it would help if WindowedValue#explodeWindows would return List instead 
of Iterable, because then optimizations would be possible based on number of 
windows (e.g. when there is only single window, there is no need to sort 
anything). This should be simple change, as it already is a List.

Well, I'm wary of this change, but we could always create a list out of it (via 
cast or ImmutableList.copyOf) if we needed.

Why so? I thought this would be the least objectionable change, because it 
actually *is* a List, and there is no interface, it is just a public method, 
that needs to be changed and state the fact correctly. A Collection would be 
the same. Iterable is for cases, where you don't exactly know if the data is 
stored in memory, or loaded from somewhere else and whence the size cannot be 
determined in advance. This is not the case for WindowFn.

It constrains us in that if WindowFn returns an iterable and we want
to store it as such we can no longer do so. I'm also not seeing what
optimization there is here--the thing we'd want to sort is the set of
existing windows (plus perhaps this one). Even if we wanted to do the
sort here, sort of a 1-item list should be insanely cheap.

Agree, that this should be probably suffice to do on the WindowFn. Then
there is no need to retrieve the number of windows for element, because
what actually matters most is whether the count is == 1 or > 1. The
WindowFn can give such information. Sorting on 1-item list on the other
hand is not that cheap as it might look. It invokes TimSort and does
sone calculations that appeared on my CPU profile quite often.

Interesting. However, there we should never need to sort the windows
of the input, only the set of live windows (of which there may be any
number regardless of whether WindowFn does singleton assignments, and
only then in the merging case).


b) I'm a little confused why it is better to keep key with all its windows 
in single WindowedValue in general. My first thoughts would really be - explode 
all windows t

Re: SparkRunner Combine.perKey performance

2019-06-17 Thread Robert Bradshaw
On Mon, Jun 17, 2019 at 10:46 AM Jan Lukavský  wrote:

>  > It depends on the typical number of elements per window. E.g.
> consider sliding windows with a size of one hour and a periodicity of
> one minute. If we have one datum per hour, better to not explode. If we
> have one datum per minute, it's a wash. If we have one datum per second,
> much better to explode.
>
> I'm not sure if I follow. I'd say it works the opposite. If there is one
> record per second, then exploding windows before shuffle would mean we
> have to shuffle 60x more data. That becomes critical as more and more
> data is added, so exploding a dataset with 1000s of records per second
> will become really bad.
>

I was referring specifically to the case where there's a CombineFn, so we'd
be able to do more of the combining before the shuffle.


> Either way, these are runtime conditions that are not known in advance,
> so it cannot be used in pipeline translation.
>

True. (I suppose one could track and make such decisions dynamically, but
that might not be worth the cost/complexity.)


> I have created JIRA [1] to track that and will try some experiments.
>

Thanks.


>
> Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7574
>
> On 6/14/19 1:42 PM, Robert Bradshaw wrote:
> > On Fri, Jun 14, 2019 at 1:02 PM Jan Lukavský  wrote:
> >>   > Interesting. However, there we should never need to sort the windows
> >> of the input, only the set of live windows (of which there may be any
> >> number regardless of whether WindowFn does singleton assignments, and
> >> only then in the merging case).
> >>
> >> Ack, however the current implementation creates accumulator from input
> >> and therefore sorts windows for all elements. Moreover, it doesn't
> >> distinguish between merging and non-merging windows.
> > Yes, the create accumulator from input (and then merging the created
> > accumulators) is where the insane overheads are coming from. We should
> > not use that pattern.
> >
> >>   > This seems like a merging vs. non-merging choice, not a
> >> single-vs-multiple window choice.
> >>
> >> I'd say the condition should be non-merging && not many windows per
> >> element, because otherwise it makes sense to group the windows although
> >> it is non-merging windowing (sliding with small slide step). Otherwise
> >> we would explode the data too much.
> > It depends on the typical number of elements per window. E.g. consider
> > sliding windows with a size of one hour and a periodicity of one
> > minute. If we have one datum per hour, better to not explode. If we
> > have one datum per minute, it's a wash. If we have one datum per
> > second, much better to explode.
> >
> >> On 6/14/19 12:19 PM, Robert Bradshaw wrote:
> >>> On Fri, Jun 14, 2019 at 12:10 PM Jan Lukavský  wrote:
>  Hi Robert,
> 
>  thanks for the discussion. I will create a JIRA with summary of this.
>  Some comments inline.
> 
>  Jan
> 
>  On 6/14/19 10:49 AM, Robert Bradshaw wrote:
> > On Thu, Jun 13, 2019 at 8:43 PM Jan Lukavský 
> wrote:
> >> On 6/13/19 6:10 PM, Robert Bradshaw wrote:
> >>
> >> On Thu, Jun 13, 2019 at 5:28 PM Jan Lukavský 
> wrote:
> >>> On 6/13/19 4:31 PM, Robert Bradshaw wrote:
> >>>
> >>> The comment fails to take into account the asymmetry between
> calling addInput vs. mergeAccumulators. It also focuses a lot on the
> asymptotic behavior, when the most common behavior is likely having a
> single (global) window.
> >>>
> >>> Yes, occurred to me too. There are more questions here:
> >>>
> >>> a) it would help if WindowedValue#explodeWindows would return
> List instead of Iterable, because then optimizations would be possible
> based on number of windows (e.g. when there is only single window, there is
> no need to sort anything). This should be simple change, as it already is a
> List.
> >> Well, I'm wary of this change, but we could always create a list
> out of it (via cast or ImmutableList.copyOf) if we needed.
> >>
> >> Why so? I thought this would be the least objectionable change,
> because it actually *is* a List, and there is no interface, it is just a
> public method, that needs to be changed and state the fact correctly. A
> Collection would be the same. Iterable is for cases, where you don't
> exactly know if the data is stored in memory, or loaded from somewhere else
> and whence the size cannot be determined in advance. This is not the case
> for WindowFn.
> > It constrains us in that if WindowFn returns an iterable and we want
> > to store it as such we can no longer do so. I'm also not seeing what
> > optimization there is here--the thing we'd want to sort is the set of
> > existing windows (plus perhaps this one). Even if we wanted to do the
> > sort here, sort of a 1-item list should be insanely cheap.
>  Agree, that this should be probably suffice to do on the WindowFn.
> Then
>  there is no need to retrieve the number 

Beam Dependency Check Report (2019-06-17)

2019-06-17 Thread Apache Jenkins Server

High Priority Dependency Updates Of Beam Python SDK:


  Dependency Name
  Current Version
  Latest Version
  Release Date Of the Current Used Version
  Release Date Of The Latest Release
  JIRA Issue
  
google-cloud-bigquery
1.6.1
1.15.0
2019-01-21
2019-06-17BEAM-5537
google-cloud-core
0.29.1
1.0.2
2019-02-04
2019-06-17BEAM-5538
mock
2.0.0
3.0.5
2019-05-20
2019-05-20BEAM-7369
oauth2client
3.0.0
4.1.3
2018-12-10
2018-12-10BEAM-6089
Sphinx
1.8.5
2.1.1
2019-05-20
2019-06-17BEAM-7370
High Priority Dependency Updates Of Beam Java SDK:


  Dependency Name
  Current Version
  Latest Version
  Release Date Of the Current Used Version
  Release Date Of The Latest Release
  JIRA Issue
  
com.google.auto.service:auto-service
1.0-rc2
1.0-rc5
2014-10-25
2019-03-25BEAM-5541
com.github.ben-manes.versions:com.github.ben-manes.versions.gradle.plugin
0.17.0
0.21.0
2019-02-11
2019-03-04BEAM-6645
org.conscrypt:conscrypt-openjdk
1.1.3
2.1.0
2018-06-04
2019-04-03BEAM-5748
javax.servlet:javax.servlet-api
3.1.0
4.0.1
2013-04-25
2018-04-20BEAM-5750
org.eclipse.jetty:jetty-server
9.2.10.v20150310
9.4.19.v20190610
2015-03-10
2019-06-10BEAM-5752
org.eclipse.jetty:jetty-servlet
9.2.10.v20150310
9.4.19.v20190610
2015-03-10
2019-06-10BEAM-5753
junit:junit
4.13-beta-1
4.13-beta-3
2018-11-25
2019-05-05BEAM-6127
com.github.spotbugs:spotbugs-annotations
3.1.11
4.0.0-beta2
2019-01-21
2019-05-22BEAM-6951

 A dependency update is high priority if it satisfies one of following criteria: 

 It has major versions update available, e.g. org.assertj:assertj-core 2.5.0 -> 3.10.0; 


 It is over 3 minor versions behind the latest version, e.g. org.tukaani:xz 1.5 -> 1.8; 


 The current version is behind the later version for over 180 days, e.g. com.google.auto.service:auto-service 2014-10-24 -> 2017-12-11. 

 In Beam, we make a best-effort attempt at keeping all dependencies up-to-date.
 In the future, issues will be filed and tracked for these automatically,
 but in the meantime you can search for existing issues or open a new one.

 For more information:  Beam Dependency Guide  

[Reminder] Beam 2.14 Release to be cut on Wed, June 19 at 6pm UTC

2019-06-17 Thread Anton Kedin
It's a reminder, I am planning to cut the release branch on Wednesday, June
19, at 11am PDT (Seattle local time, corresponds to [19:00@GMT+1] and
[18:00@UTC]). Please make sure all the code you want in the release is
submitted by that time, and that all blocking Jiras have the release
version attached.

Thank you,
Anton

[1]
https://calendar.google.com/calendar/embed?src=0p73sl034k80oob7seouanigd0%40group.calendar.google.com
[2]
https://issues.apache.org/jira/browse/BEAM-7478?jql=project%20%3D%20BEAM%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20fixVersion%20%3D%202.14.0


GitHub checks not running

2019-06-17 Thread Anton Kedin
Hi dev@,

Does anyone has context on why the checks might not get triggered on pull
requests today? E.g. https://github.com/apache/beam/pull/8822

Regards,
Anton


Re: GitHub checks not running

2019-06-17 Thread Anton Kedin
They are getting triggered now.

On Mon, Jun 17, 2019 at 9:10 AM Anton Kedin  wrote:

> Hi dev@,
>
> Does anyone has context on why the checks might not get triggered on pull
> requests today? E.g. https://github.com/apache/beam/pull/8822
>
> Regards,
> Anton
>


Re: GitHub checks not running

2019-06-17 Thread Udi Meiri
I think we reached an upper limit on the Jenkins queue length (the grey
flat line):
[image: graph.png]
(https://builds.apache.org/label/beam/load-statistics?type=sec10)


On Mon, Jun 17, 2019 at 9:27 AM Anton Kedin  wrote:

> They are getting triggered now.
>
> On Mon, Jun 17, 2019 at 9:10 AM Anton Kedin  wrote:
>
>> Hi dev@,
>>
>> Does anyone has context on why the checks might not get triggered on pull
>> requests today? E.g. https://github.com/apache/beam/pull/8822
>>
>> Regards,
>> Anton
>>
>


smime.p7s
Description: S/MIME Cryptographic Signature


Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-06-17 Thread Claire McGinty
Hey dev@!

Myself and a few other Spotify data engineers have put together a design
doc for SMB Join support in Beam
,
and
have a working Java implementation we've started to put up for PR ([0
], [1
], [2
]). There's more detailed
information in the document, but the tl;dr is that SMB is a strategy to
optimize joins for file-based sources by modifying the initial write
operation to write records in sorted buckets based on the desired join key.
This means that subsequent joins of datasets written in this way are only
sequential file reads, no shuffling involved. We've seen some pretty
substantial performance speedups with our implementation and would love to
get it checked in to Beam's Java SDK.

We'd appreciate any suggestions or feedback on our proposal--the design doc
should be public to comment on.

Thanks!
Claire / Neville


Re: [Forked] BEAM-4046 (was [PROPOSAL] Introduce beam-sdks-java gradle project)

2019-06-17 Thread Lukasz Cwik
I have opened up https://github.com/apache/beam/pull/8881 which migrates
the jenkins runs to use the new names since it has been a month since the
rename went in.
After this, we should remove the deprecated project mappings and require
everyone to use the directory based names instead of the artifact names as
agreed upon in this thread.

On Mon, May 13, 2019 at 1:57 AM Michael Luckey  wrote:

> Thanks Kenn!
>
> This change will be merged in today, about 13:00 Pacific Time.
>
> From then on, will will have ditched the flat project layout. I.e project
> will need to be referenced by full path instead of some 'symbolic' name.
> E.g. project
> ':beam-sdks-java-core' will be referenced as ':sdks:java:core', which is
> just the relative folder path to the project.
>
> We wlll not run the seed job immediately, because some compatibility layer
> for Gradle command line is implemented, which enables running task
> referenced by the old style, i.e.
> ':beam-sdks-java-core:build' is still working for a while. After running
> the seed job, conflicting PRs might not be able to run on Jenkins anymore,
> but a rebase is required anyway
> so this should be done as soon as possible.
>
> Regarding the IDE, an import is required This should work apart from the
> known issues as before. Unfortunately I was unable to test every new option
> available. E.g. now we do have those
> 'runners:validateRunner' et al. task which did not 'exist' before. They
> should work, but if any issues arise, please open a jira and feel free to
> ping/assign to me.
>
> Thanks,
>
> michel
>
> On Thu, May 2, 2019 at 6:00 PM Kenneth Knowles  wrote:
>
>> The issue has been discussed for a full month, with no objections. I'd
>> call that lazy consensus. And since you have found a way to be backwards
>> compatible, it doesn't even have to impact docs or scripts. This is great.
>>
>> Kenn
>>
>> On Thu, May 2, 2019 at 8:43 AM Michael Luckey 
>> wrote:
>>
>>> Hi,
>>>
>>> after implementing the required changes to switch from the current flat
>>> Gradle project structure to the hierarchical represented by the folder
>>> hierarchy I propose to merge the changes [1] after cut of next release
>>> branch (which is scheduled around May, 8th.)
>>>
>>> Does anyone have any concerns or objections doing this change (or doing
>>> it now) or can we proceed as suggested? Any questions about this?
>>>
>>> Best,
>>>
>>> michel
>>>
>>> [1] https://github.com/apache/beam/pull/8194
>>>
>>>
>>>
>>> On Thu, Apr 11, 2019 at 1:04 AM Michael Luckey 
>>> wrote:
>>>
 To my understanding, that's it, yes. Of course, there might be other
 places/plugins which use plugin.group. But maven coordinates are definitely
 those which need to be consistent.

 On Thu, Apr 11, 2019 at 12:57 AM Lukasz Cwik  wrote:

> We may be saying the same thing but wanted to be clear that we only
> need to override the default that publishing plugin uses to always be
> "org.apache.beam" instead of defaulting to project.group
>
> On Wed, Apr 10, 2019 at 3:22 PM Kenneth Knowles 
> wrote:
>
>> So, if we set the "group" on projects only as part of publishing then
>> everything starts to work? That sounds ideal.
>>
>> Kenn
>>
>> On Tue, Apr 9, 2019 at 3:49 PM Lukasz Cwik  wrote:
>>
>>> It would be good if we did as much as possible to make our project
>>> as much as a conventional Gradle project. It means that more people 
>>> will be
>>> familiar with the setup, our setup will likely require less maintenance
>>> with version bumps in gradle and also that examples/solutions online 
>>> will
>>> relate better to our project.
>>>
>>> On Mon, Apr 8, 2019 at 6:22 PM Michael Luckey 
>>> wrote:
>>>
 After playing around, it turns out to be rather straightforward.
 The problem is not really caused by a Gradle bug, but more by the usual
 issue that deviating from gradle defaults/conventions often causes
 headaches.

 In this case the conflicts are caused by beam eagerly setting
 project.group for all modules [1]. Of course this implies removing
 structure and as such causing these name conflicts. I do not think, we 
 need
 to have that unique group set on our projects. So not globally 
 rewriting,
 but using the default group (== project.path) resolves this issue. Of
 course, we then do have to set values accordingly on all places, which
 default to project.group, where we would like to have our maven group 
 id,
 e.g. [2]

 Now before I am going to invest more time for testing, I would like
 to start the discussion, whether we would like to move to this more
 hierarchical project representation or prefer to stop here and stay 
 with
 the current state. If  we prefer the current flat structure, we might
 consider to re

Re: [DISCUSS] Portability representation of schemas

2019-06-17 Thread Brian Hulette
Realized I completely ignored one of your points, added another response
inline.

On Fri, Jun 14, 2019 at 2:20 AM Robert Bradshaw  wrote:

> On Thu, Jun 13, 2019 at 8:42 PM Reuven Lax  wrote:
> >
> > Spoke to Brian about his proposal. It is essentially this:
> >
> > We create PortableSchemaCoder, with a well-known URN. This coder is
> parameterized by the schema (i.e. list of field name -> field type pairs).
>
> Given that we have a field type that is (list of field names -> field
> type pairs), is there a reason to do this enumeration at the top level
> as well? This would likely also eliminate some of the strangeness
> where we want to treat a PCollection with a single-field row as a
> PCollection with just that value instead.
>

This is part of what I was suggesting in my "Root schema is a logical type"
alternative [1], except that the language about SDK-specific logical types
is now obsolete. I'll update it to better reflect this alternative.
I do think at the very least we should just have one (list of field names
-> field type pairs) that is re-used, which is what I did in my PR [2].

[1]
https://docs.google.com/document/d/1uu9pJktzT_O3DxGd1-Q2op4nRk4HekIZbzi-0oTAips/edit#heading=h.7570feur1qin
[2]
https://github.com/apache/beam/pull/8853/files#diff-f0d64c2cfc4583bfe2a7e5ee59818ae2L686


>
> > Java also continues to have its own CustomSchemaCoder. This is
> parameterized by the schema as well as the to/from functions needed to make
> the Java API "nice."
> >
> > When the expansion service expands a Java PTransform for usage across
> languages, it will add a transform mapping the  PCollection with
> CustomSchemaCoder to a PCollection which has PortableSchemaCoder. This way
> Java can maintain the information needed to maintain its API (and Python
> can do the same), but there's no need to shove this information into the
> well-known portable representation.
> >
> > Brian, can you confirm that this was your proposal? If so, I like it.
>
> The major downside of this that I see is that it assumes that
> transparency is only needed at certain "boundaries" and everything
> between these boundaries is opaque. I think we'd be better served by a
> format where schemas are transparently represented throughout. For
> example, the "boundaries" between runner and SDK are not known at
> pipeline construction time, and we want the runner <-> SDK
> communication to understand the schemas to be able to use more
> efficient transport mechanisms (e.g. batches of arrow records). It may
> also be common for a pipeline in language X to invoke two transforms
> in language Y in succession (e.g. two SQL statements) in which case
> introducing two extra transforms in the expansion service would be
> wasteful. I also think we want to allow the flexibility for runners to
> swap out transforms an optimizations regardless of construction-time
> boundaries (e.g. implementing a projection natively, rather than
> outsourcing to the SDK).
>
> Are the to/from conversion functions the only extra information needed
> to make the Java APIs nice? If so, can they be attached to the
> operations themselves (where it seems they're actually needed/used),
> rather than to the schema/coder of the PCollection? Alternatively, I'd
> prefer this be opaque metadata attached to a transparent schema rather
> than making the whole schema opaque.
>
> > We've gone back and forth discussing abstracts for over a month now. I
> suggest that the next step should be to create a PR, and move discussion to
> that PR. Having actual code can often make discussion much more concrete.
>
> +1 to a PR, though I feel like there are fundamental high-level issues
> that are still not decided. (I suppose we should be open to throwing
> whole PRs away in that case.) There are certainly pieces that we'll
> know that we need (like the ability to serialize a row consistently in
> all languages) we can get in immediately.
>
> > Reuven
> >
> > On Thu, Jun 13, 2019 at 6:28 AM Robert Bradshaw 
> wrote:
> >>
> >> On Thu, Jun 13, 2019 at 5:47 AM Reuven Lax  wrote:
> >>>
> >>>
> >>> On Wed, Jun 12, 2019 at 8:29 PM Kenneth Knowles 
> wrote:
> 
>  Can we choose a first step? I feel there's consensus around:
> 
>   - the basic idea of what a schema looks like, ignoring logical types
> or SDK-specific bits
>   - the version of logical type which is a standardized URN+payload
> plus a representation
> 
>  Perhaps we could commit this and see what it looks like to try to use
> it?
> >>
> >>
> >> +1
> >>
> 
>  It also seems like there might be consensus around the idea of each
> of:
> 
>   - a coder that simply encodes rows; its payload is just a schema; it
> is minimalist, canonical
> 
>   - a coder that encodes a non-row using the serialization format of a
> row; this has to be a coder (versus Convert transforms) so that to/from row
> conversions can be elided when primitives are fused (just like to/from
> bytes is elided)
> >>
> >>
>