Re: [PROPOSAL] Prepare Beam 2.9.0 release

2018-12-05 Thread David Morávek
Would it be possible to cherry pick this patch into 2.9? It removes
forgotten kryo dependency from Euphoria DSL, which we don't want user to
start depending on, unless *beam-sdks-java-extension-kryo* dependency is
explicitly used.

https://github.com/apache/beam/pull/7195

Thanks,
D.

On Tue, Dec 4, 2018 at 6:52 AM Chamikara Jayalath 
wrote:

> Update:
>
> All runners except Dataflow passed quickstart validation on the release
> branch.
>
> Dataflow validation is blocked due to a Google internal issue that
> prevents building Dataflow containers based on latest code. Created blocker
> https://issues.apache.org/jira/browse/BEAM-6173 since release is
> essentially blocked by this.
>
> I'll validate DataflowRunner and create a release candidate when when this
> blocker is resolved.
>
> Thanks,
> Cham
>
> On Mon, Dec 3, 2018 at 7:53 AM Chamikara Jayalath 
> wrote:
>
>> I've been running tests on the existing branch. Will try to cut a
>> candidate today.
>>
>> Seems like this is a small fix that fixes a flake. So feel free to send a
>> cherry-pick.
>>
>> Thanks,
>> Cham
>>
>> On Mon, Dec 3, 2018 at 4:28 AM Maximilian Michels  wrote:
>>
>>> How far are we with the release? If the release branch hasn't been
>>> frozen, I'd like to cherry-pick
>>> https://github.com/apache/beam/pull/7171/files
>>>
>>> Thanks,
>>> Max
>>>
>>> On 30.11.18 04:17, Lukasz Cwik wrote:
>>> > I got to thank Steve Niemitz for double checking my work and pointing
>>> > out an error which helped narrow down the BEAM-6102 issue.
>>> >
>>> > On Thu, Nov 29, 2018 at 2:05 PM Chamikara Jayalath <
>>> chamik...@google.com
>>> > > wrote:
>>> >
>>> > Blockers were resolved and fixes were cherry-picked to the release
>>> > branch. I'll continue the release process.
>>> >
>>> > Thanks,
>>> > Cham
>>> >
>>> > On Mon, Nov 26, 2018 at 10:50 AM Lukasz Cwik >> > > wrote:
>>> >
>>> > I'm working on BEAM-6102 and after 12 hours on the issue I have
>>> > not made much real progress. I initially suspected its a
>>> shading
>>> > issue with the Dataflow worker jar but can't reproduce the
>>> issue
>>> > without running a full Dataflow pipeline. Any help would
>>> > be appreciated, context of what I have tried is on the JIRA and
>>> > you can reach out to me on Slack.
>>> >
>>> > On Mon, Nov 26, 2018 at 9:50 AM Chamikara Jayalath
>>> > mailto:chamik...@google.com>> wrote:
>>> >
>>> > Hi All,
>>> >
>>> > Currently there are two blockers for the 2.9.0 release.
>>> >
>>> > * Dataflow cannot deserialize DoFns -
>>> > https://issues.apache.org/jira/browse/BEAM-6102
>>> > * [SQL] Nexmark 5, 7 time out -
>>> > https://issues.apache.org/jira/browse/BEAM-6082
>>> >
>>> > We'll postpone cutting the release candidate till these
>>> > issues are resolved.
>>> >
>>> > Thanks,
>>> > Cham
>>> >
>>> >
>>> > On Wed, Nov 21, 2018 at 1:22 PM Kenneth Knowles
>>> > mailto:k...@apache.org>> wrote:
>>> >
>>> > You could `git checkout -b release-2.9.0
>>> > `. But cherrypicking fixes is also
>>> easy.
>>> >
>>> > Kenn
>>> >
>>> > On Wed, Nov 21, 2018 at 1:06 PM Chamikara Jayalath
>>> > mailto:chamik...@google.com>>
>>> wrote:
>>> >
>>> > I went through Jenkins test suites and failures
>>> > seems to be known issues with JIRAs that are
>>> release
>>> > blockers. So we'll cherry-pick fixes to these.
>>> > In general though I think it might be hard to pick
>>> > an exact "green" time for cutting the release just
>>> > by eyeballing since different test suites run at
>>> > different times.
>>> >
>>> > - Cham
>>> >
>>> >
>>> >
>>> > On Wed, Nov 21, 2018 at 12:59 PM Valentyn
>>> Tymofieiev
>>> > mailto:valen...@google.com>>
>>> > wrote:
>>> >
>>> > It looks like 2.9.0 branch includes commits
>>> from
>>> > https://github.com/apache/beam/pull/7029,
>>> which
>>> > break Python Postcommit Test suite. Rollback is
>>> > in flight:
>>> > https://github.com/apache/beam/pull/7107, and
>>> > will need to be cherry-picked to release
>>> branch.
>>> >
>>> > I think we should try to adjust release branch
>>> > cutting process so that all relevant test
>>> suites
>>> > pass on the release branch when we cut it.
>>> >
>>> > On Wed, Nov 21, 2018 at 11:31 AM Chamikara
>>> >  

Re: [PROPOSAL] Prepare Beam 2.9.0 release

2018-12-05 Thread David Morávek
I don’t think it is a blocker. If you already did RC cut, please ignore this.

Thanks,
D.

Sent from my iPhone

> On 5 Dec 2018, at 11:03, Chamikara Jayalath  wrote:
> 
> Hi David,
> 
> I have already build the RC1 and started running some tests.
> 
>  Do you think this is a blocker ? If not, I think we should hold it till 2.10 
> (approx. 4 - 6 weeks). If you think it's a blocker I can build the candidate 
> again and run tests.
> 
> Thanks,
> Cham
> 
>> On Wed, Dec 5, 2018 at 1:37 AM David Morávek  wrote:
>> Would it be possible to cherry pick this patch into 2.9? It removes 
>> forgotten kryo dependency from Euphoria DSL, which we don't want user to 
>> start depending on, unless beam-sdks-java-extension-kryo dependency is 
>> explicitly used.
>> 
>> https://github.com/apache/beam/pull/7195
>> 
>> Thanks,
>> D.
>> 
>>> On Tue, Dec 4, 2018 at 6:52 AM Chamikara Jayalath  
>>> wrote:
>>> Update:
>>> 
>>> All runners except Dataflow passed quickstart validation on the release 
>>> branch.
>>> 
>>> Dataflow validation is blocked due to a Google internal issue that prevents 
>>> building Dataflow containers based on latest code. Created blocker 
>>> https://issues.apache.org/jira/browse/BEAM-6173 since release is 
>>> essentially blocked by this.
>>> 
>>> I'll validate DataflowRunner and create a release candidate when when this 
>>> blocker is resolved.
>>> 
>>> Thanks,
>>> Cham
>>> 
>>>> On Mon, Dec 3, 2018 at 7:53 AM Chamikara Jayalath  
>>>> wrote:
>>>> I've been running tests on the existing branch. Will try to cut a 
>>>> candidate today.
>>>> 
>>>> Seems like this is a small fix that fixes a flake. So feel free to send a 
>>>> cherry-pick.
>>>> 
>>>> Thanks,
>>>> Cham
>>>> 
>>>>> On Mon, Dec 3, 2018 at 4:28 AM Maximilian Michels  wrote:
>>>>> How far are we with the release? If the release branch hasn't been 
>>>>> frozen, I'd like to cherry-pick 
>>>>> https://github.com/apache/beam/pull/7171/files
>>>>> 
>>>>> Thanks,
>>>>> Max
>>>>> 
>>>>> On 30.11.18 04:17, Lukasz Cwik wrote:
>>>>> > I got to thank Steve Niemitz for double checking my work and pointing 
>>>>> > out an error which helped narrow down the BEAM-6102 issue.
>>>>> > 
>>>>> > On Thu, Nov 29, 2018 at 2:05 PM Chamikara Jayalath 
>>>>> > >>>> > <mailto:chamik...@google.com>> wrote:
>>>>> > 
>>>>> > Blockers were resolved and fixes were cherry-picked to the release
>>>>> > branch. I'll continue the release process.
>>>>> > 
>>>>> > Thanks,
>>>>> > Cham
>>>>> > 
>>>>> > On Mon, Nov 26, 2018 at 10:50 AM Lukasz Cwik >>>> > <mailto:lc...@google.com>> wrote:
>>>>> > 
>>>>> > I'm working on BEAM-6102 and after 12 hours on the issue I have
>>>>> > not made much real progress. I initially suspected its a shading
>>>>> > issue with the Dataflow worker jar but can't reproduce the issue
>>>>> > without running a full Dataflow pipeline. Any help would
>>>>> > be appreciated, context of what I have tried is on the JIRA and
>>>>> > you can reach out to me on Slack.
>>>>> > 
>>>>> > On Mon, Nov 26, 2018 at 9:50 AM Chamikara Jayalath
>>>>> > mailto:chamik...@google.com>> wrote:
>>>>> > 
>>>>> > Hi All,
>>>>> > 
>>>>> > Currently there are two blockers for the 2.9.0 release.
>>>>> > 
>>>>> > * Dataflow cannot deserialize DoFns -
>>>>> > https://issues.apache.org/jira/browse/BEAM-6102
>>>>> > * [SQL] Nexmark 5, 7 time out -
>>>>> > https://issues.apache.org/jira/browse/BEAM-6082
>>>>> > 
>>>>> > We'll postpone cutting the release candidate till these
>>>>> > issues are resolved.
>>>>> > 
>>

Re: Spark-optimized Shuffle (SOS) any update?

2018-12-20 Thread David Morávek
This is an awesome news! Is there anything we can do to help? We are
currently facing huge performance penalties due this issue.

Thanks,
David

On Wed, Dec 19, 2018 at 5:43 PM Ilan Filonenko  wrote:

> Recently, the community has actively been working on this. The JIRA to
> follow is:
> https://issues.apache.org/jira/browse/SPARK-25299. A group of various
> companies including Bloomberg and Palantir are in the works of a WIP
> solution that implements a varied version of Option #5 (which is elaborated
> upon in the google doc linked in the JIRA summary).
>
> On Wed, Dec 19, 2018 at 5:20 AM  wrote:
>
>> Hi everyone,
>> we are facing same problems as Facebook had, where shuffle service is
>> a bottleneck. For now we solved that with large task size (2g) to reduce
>> shuffle I/O.
>>
>> I saw very nice presentation from Brian Cho on Optimizing shuffle I/O at
>> large scale[1]. It is a implementation of white paper[2].
>> Brian Cho at the end of the lecture kindly mentioned about plans to
>> contribute it back to Spark[3]. I checked mailing list and spark JIRA and
>> didn't find any ticket on this topic.
>>
>> Please, does anyone has a contact on someone from Facebook who could know
>> more about this? Or are there some plans to bring similar optimization to
>> Spark?
>>
>> [1] https://databricks.com/session/sos-optimizing-shuffle-i-o
>> [2] https://haoyuzhang.org/publications/riffle-eurosys18.pdf
>> [3]
>> https://image.slidesharecdn.com/5brianchoerginseyfe-180613004126/95/sos-optimizing-shuffle-io-with-brian-cho-and-ergin-seyfe-30-638.jpg?cb=1528850545
>>
>


Re: Spark-optimized Shuffle (SOS) any update?

2018-12-20 Thread David Morávek
I've just quickly went trough the design doc and it seems that it has
nothing to do with the paper Marek has mentioned. The paper is trying to
solving the problem of io ops required for shuffle are growing
quadratically with number of tasks (shuffle files), therefore we need to
keep number of tasks low.

Am I missing something?

Thanks,
D.

On Thu, Dec 20, 2018 at 12:27 PM David Morávek  wrote:

> This is an awesome news! Is there anything we can do to help? We are
> currently facing huge performance penalties due this issue.
>
> Thanks,
> David
>
> On Wed, Dec 19, 2018 at 5:43 PM Ilan Filonenko  wrote:
>
>> Recently, the community has actively been working on this. The JIRA to
>> follow is:
>> https://issues.apache.org/jira/browse/SPARK-25299. A group of various
>> companies including Bloomberg and Palantir are in the works of a WIP
>> solution that implements a varied version of Option #5 (which is elaborated
>> upon in the google doc linked in the JIRA summary).
>>
>> On Wed, Dec 19, 2018 at 5:20 AM  wrote:
>>
>>> Hi everyone,
>>> we are facing same problems as Facebook had, where shuffle service
>>> is a bottleneck. For now we solved that with large task size (2g) to reduce
>>> shuffle I/O.
>>>
>>> I saw very nice presentation from Brian Cho on Optimizing shuffle I/O at
>>> large scale[1]. It is a implementation of white paper[2].
>>> Brian Cho at the end of the lecture kindly mentioned about plans to
>>> contribute it back to Spark[3]. I checked mailing list and spark JIRA and
>>> didn't find any ticket on this topic.
>>>
>>> Please, does anyone has a contact on someone from Facebook who could
>>> know more about this? Or are there some plans to bring similar optimization
>>> to Spark?
>>>
>>> [1] https://databricks.com/session/sos-optimizing-shuffle-i-o
>>> [2] https://haoyuzhang.org/publications/riffle-eurosys18.pdf
>>> [3]
>>> https://image.slidesharecdn.com/5brianchoerginseyfe-180613004126/95/sos-optimizing-shuffle-io-with-brian-cho-and-ergin-seyfe-30-638.jpg?cb=1528850545
>>>
>>


Re: [ANNOUNCE] New committer announcement: Gleb Kanterov

2019-01-25 Thread David Morávek
Congratulations!

Sent from my iPhone

> On 25 Jan 2019, at 20:41, Kai Jiang  wrote:
> 
> Congratulations!
> 
>> On Fri, Jan 25, 2019 at 10:01 AM Rui Wang  wrote:
>> Congratulations!
>> 
>> -Rui
>> 
>>> On Fri, Jan 25, 2019 at 9:58 AM Ruoyun Huang  wrote:
>>> Congratulations Gleb! 
>>> 
 On Fri, Jan 25, 2019 at 9:18 AM Scott Wegner  wrote:
 Congrats, and welcome Gleb!
 
> On Fri, Jan 25, 2019 at 9:15 AM Suneel Marthi  wrote:
> Congratulations
> 
>> On Fri, Jan 25, 2019 at 12:04 PM Anton Kedin  wrote:
>> Congrats!
>> 
>>> On Fri, Jan 25, 2019 at 8:54 AM Ismaël Mejía  wrote:
>>> Well deserved, congratulations Gleb!
>>> 
>>> On Fri, Jan 25, 2019 at 10:47 AM Etienne Chauchot 
>>>  wrote:
>>> >
>>> > Congrats Gleb and welcome onboard !
>>> >
>>> > Etienne
>>> >
>>> > Le vendredi 25 janvier 2019 à 10:39 +0100, Alexey Romanenko a écrit :
>>> >
>>> > Congrats to Gleb and welcome on board!
>>> >
>>> > On 25 Jan 2019, at 09:22, Tim Robertson  
>>> > wrote:
>>> >
>>> > Welcome Gleb and congratulations!
>>> >
>>> > On Fri, Jan 25, 2019 at 8:06 AM Kenneth Knowles  
>>> > wrote:
>>> >
>>> > Hi all,
>>> >
>>> > Please join me and the rest of the Beam PMC in welcoming a new 
>>> > committer: Gleb Kanterov
>>> >
>>> > Gleb started contributing to Beam and quickly dove deep, doing some 
>>> > sensitive fixes to schemas, also general build issues, Beam SQL, 
>>> > Avro, and more. In consideration of Gleb's technical and community 
>>> > contributions, the Beam PMC trusts Gleb with the responsibilities of 
>>> > a Beam committer [1].
>>> >
>>> > Thank you, Gleb, for your contributions.
>>> >
>>> > Kenn
>>> >
>>> > [1] 
>>> > https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer
>>> >
>>> >
 
 
 -- 
 
 
 
 
 Got feedback? tinyurl.com/swegner-feedback
>>> 
>>> 
>>> -- 
>>> 
>>> Ruoyun  Huang
>>> 


Re: [ANNOUNCE] New committer announcement: Michael Luckey

2019-02-27 Thread David Morávek
Congrats Michael! 🍾

D.

> On 28 Feb 2019, at 03:27, Ismaël Mejía  wrote:
> 
> Congratulations Michael, and thanks for all the contributions!
> 
>> On Wed, Feb 27, 2019 at 6:30 PM Ankur Goenka  wrote:
>> 
>> Congratulations Michael!
>> 
>>> On Wed, Feb 27, 2019 at 2:25 PM Thomas Weise  wrote:
>>> 
>>> Congrats Michael!
>>> 
>>> 
 On Wed, Feb 27, 2019 at 12:41 PM Gleb Kanterov  wrote:
 
 Congratulations and welcome!
 
> On Wed, Feb 27, 2019 at 8:57 PM Connell O'Callaghan  
> wrote:
> 
> Excellent thank you for sharing Kenn!!!
> 
> Michael congratulations for this recognition of your contributions to 
> advancing BEAM
> 
>> On Wed, Feb 27, 2019 at 11:52 AM Kenneth Knowles  wrote:
>> 
>> Hi all,
>> 
>> Please join me and the rest of the Beam PMC in welcoming a new 
>> committer: Michael Luckey
>> 
>> Michael has been contributing to Beam since early 2017. He has fixed 
>> many build and developer environment issues, noted and root-caused 
>> breakages on master, generously reviewed many others' changes to the 
>> build. In consideration of Michael's contributions, the Beam PMC trusts 
>> Michael with the responsibilities of a Beam committer [1].
>> 
>> Thank you, Michael, for your contributions.
>> 
>> Kenn
>> 
>> [1] 
>> https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer
 
 
 
 --
 Cheers,
 Gleb


Re: [Announcement] New Website for Beam Summits

2019-03-20 Thread David Morávek
This is great! Thanks for all of the hard work you're putting into this.

D.

On Wed, Mar 20, 2019 at 1:38 PM Maximilian Michels  wrote:

> Not a bug, it's a feature ;)
>
> On 20.03.19 07:23, Kenneth Knowles wrote:
> > Very nice. I appreciate the emphasis on coffee [1] [2] [3] though I
> > suspect there may be a rendering bug.
> >
> > Kenn
> >
> > [1] https://beamsummit.org/schedule/2019-06-19?sessionId=1
> > [2] https://beamsummit.org/schedule/2019-06-19?sessionId=3
> > [3] https://beamsummit.org/schedule/2019-06-19?sessionId=4
> >
> > On Tue, Mar 19, 2019 at 4:43 AM Łukasz Gajowy  > > wrote:
> >
> > Looks great! Thanks for doing this! :)
> >
> > Łukasz
> >
> > wt., 19 mar 2019 o 12:30 Maximilian Michels  > > napisał(a):
> >
> > Great stuff! Looking forward to seeing many Beam folks in Berlin.
> >
> > In case you want to speak at Beam Summit Europe, the Call for
> > Papers is
> > open until April 1:
> https://sessionize.com/beam-summit-europe-2019
> >
> > -Max
> >
> > On 19.03.19 09:49, Matthias Baetens wrote:
> >  > Awesome Aizhamal! Great work and thanks for your continued
> > efforts on
> >  > this :) Looking forward to the summit.
> >  >
> >  > On Mon, 18 Mar 2019 at 23:17, Aizhamal Nurmamat kyzy
> >  > mailto:aizha...@google.com>
> > >>
> wrote:
> >  >
> >  > Hello everybody!
> >  >
> >  >
> >  > We are thrilled to announce the launch of beamsummit.org
> > 
> >  >  dedicated to Beam Summits!
> >  >
> >  >
> >  > The current version of the website provides information
> > about the
> >  > upcoming Beam Summit in Europe on June 19-20th, 2019. We
> > will update
> >  > it for the upcoming summits in Asia and North America
> > accordingly.
> >  > You can access all necessary information about the
> > conference theme,
> >  > speakers and sessions, the abstract submission timeline
> > and the
> >  > registration process, the conference venues and much more
> > that you
> >  > will find useful until and during the Beam Summits 2019.
> >  >
> >  >
> >  > We are working to make the website easy to use, so that
> > anyone who
> >  > is organizing a Beam event can rely on it. You can find
> > the code for
> >  > it in Github
> > .
> >  >
> >  > The pages will be updated on a regular basis, but we also
> > love
> >  > hearing thoughts from our community! Let us know if you
> > have any
> >  > questions, comments or suggestions, and help us improve.
> > Also, if
> >  > you are thinking of organizing a Beam event, please feel
> > free to
> >  > reach out  > >for support, and to use the
> >  > code in GitHub as well.
> >  >
> >  >
> >  > We sincerely hope that you like the new Beam Summit
> > website and will
> >  > find it useful for accessing information. Enjoy browsing
> > around!
> >  >
> >  >
> >  > Thanks,
> >  >
> >  > Aizhamal
> >  >
> >
>


Re: joda-time dependency version

2019-03-23 Thread David Morávek
If there are no objections from dev@, I'll try to proceed with an upgrade
to the latest version <https://jira.apache.org/jira/browse/BEAM-6895>
(2.10.1).

Kenn, I've found your issue <https://jira.apache.org/jira/browse/BEAM-5827>
for joda-time vendoring, is it still relevant? This might cause a breaking
change as it is part of user facing API.

D.

On Thu, Mar 21, 2019 at 5:44 PM Kenneth Knowles  wrote:

> +dev@
>
> I don't know of any special reason we are using an old version.
>
> Kenn
>
> On Thu, Mar 21, 2019, 09:38 Ismaël Mejía  wrote:
>
>> Does anyone have any context on why we have such an old version of
>> Joda time (2.4 released on  2014!) and if there is any possible issue
>> upgrading it? If not maybe we can try to upgrade it..
>>
>> On Thu, Mar 21, 2019 at 5:35 PM Ismaël Mejía  wrote:
>> >
>> > Mmmm interesting issue. There is also a plan to use a vendored version
>> > of joda-time not sure on the progress on that one.
>> > https://issues.apache.org/jira/browse/BEAM-5827
>> >
>> > For Beam 3 that's the idea but  so far there is not at ETA for Beam 3.
>> > https://issues.apache.org/jira/browse/BEAM-5530
>> >
>> > On Thu, Mar 21, 2019 at 4:15 PM rahul patwari
>> >  wrote:
>> > >
>> > > Hi David,
>> > >
>> > > The only incompatibility we have come across is this:
>> > > We have some timestamp format conversions in our project, where we
>> are converting from a timestamp format to another.
>> > >
>> > > With joda-time 2.4:
>> > > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss"
>> format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12
>> 19-Mar-15 -07:00".
>> > >
>> > > Whereas with joda-time 2.9.3:
>> > > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss"
>> format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12
>> 19-Mar-15 PDT".
>> > >
>> > > The javadoc for both the versions doesn't seem different though, for
>> 'z' DateTimeFormat.
>> > >
>> > > Even though the javadoc says - Zone names: Time zone names ('z')
>> cannot be parsed for both the versions, we are able to parse it in
>> joda-time 2.9.3.
>> > >
>> > > Also, joda-time will be replaced with java time with Beam 3?
>> > >
>> > > Thanks,
>> > > Rahul
>> > >
>> > > On Thu, Mar 21, 2019 at 5:37 PM David Morávek <
>> david.mora...@gmail.com> wrote:
>> > >>
>> > >> Hello Rahul, are there any incompatibilities you are running into
>> with spark version? These versions should be backward compatible.
>> > >>
>> > >> For jodatime doc:
>> > >> The main public API will remain backwards compatible for both source
>> and binary in the 2.x stream.
>> > >>
>> > >> This means you should be able to safely use Spark's version.
>> > >>
>> > >> D.
>> > >>
>> > >> On Thu, Mar 21, 2019 at 5:45 AM rahul patwari <
>> rahulpatwari8...@gmail.com> wrote:
>> > >>>
>> > >>> Hi Ismael,
>> > >>>
>> > >>> We are using Beam with Spark Runner and Spark 2.4 has joda-time
>> 2.9.3 as a dependency. So, we have used joda-time 2.9.3 in our shaded
>> artifact set. As Beam has joda-time 2.4 as a dependency, I was wondering
>> whether it would break anything in Beam.
>> > >>>
>> > >>> Will joda-time be replaced with java time in Beam 3? What is the
>> expected release date of Beam 3?
>> > >>>
>> > >>> Thanks,
>> > >>> Rahul
>> > >>>
>> > >>> On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía 
>> wrote:
>> > >>>>
>> > >>>> Hello,
>> > >>>>
>> > >>>> The long term goal would be to get rid of joda-time but that won't
>> > >>>> happen until Beam 3.
>> > >>>> Any 'particular' reason or motivation to push the upgrade?
>> > >>>>
>> > >>>> Regards,
>> > >>>> Ismaël
>> > >>>>
>> > >>>> On Wed, Mar 20, 2019 at 11:53 AM rahul patwari
>> > >>>>  wrote:
>> > >>>> >
>> > >>>> > Hi,
>> > >>>> >
>> > >>>> > Is there a plan to upgrade the dependency version of joda-time
>> to 2.9.3 or latest version?
>> > >>>> >
>> > >>>> >
>> > >>>> > Thanks,
>> > >>>> > Rahul
>>
>


docs: java-dependencies

2019-03-24 Thread David Morávek
Hello,

I've run into java-dependencies page
, while
upgrading joda-time. This page is supposed to track dependencies across
releases, but the last entry is for 2.9 release.

Do we still want to maintain this? If so, could we somehow include it in
the release process?

Also we should add missing entries for 2.10 and 2.11, is there any script I
can use to generate this?

D.


kafka 0.9 support

2019-04-01 Thread David Morávek
Hello,

is there still a reason to keep Kafka 0.9 support? This unfortunately adds
lot of complexity to KafkaIO implementation.

Kafka 0.9 was released on Nov 2015.

My first shot on removing Kafka 0.9 support would remove second consumer,
which is used for fetching offsets.

WDYT? Is this support worth keeping?

https://github.com/apache/beam/pull/8186

D.


Re: kafka 0.9 support

2019-04-03 Thread David Morávek
I'd say that APIs we use in KafkaIO are pretty much stable since 0.10
release, all reflection based compatibility adapters seem to be aimed for
0.9 release (which is 8 major releases behind current Kafka release).

We may take an inspiration from Flink's kafka connector
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html>,
they maintain separate maven artifact for all supported Kafka APIs. This
may be the best approach as we can still share most of the codebase between
versions, have compile time checks and also run tests against all of the
supported versions.

I'm not really comfortable with reflection based adapters
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java>
as they seem fragile and don't provide compile time checks.

On Tue, Apr 2, 2019 at 11:27 PM Austin Bennett 
wrote:

> I withdraw my concern -- checked on info on the cluster I will eventually
> access.  It is on 0.8, so I was speaking too soon.  Can't speak to rest of
> user base.
>
> On Tue, Apr 2, 2019 at 11:03 AM Raghu Angadi  wrote:
>
>> Thanks to David Morávek for pointing out possible improvement to KafkaIO
>> for dropping support for 0.9 since it avoids having a second consumer just
>> to fetch latest offsets for backlog.
>>
>> Ideally we should be dropping 0.9 support for next major release, in fact
>> better to drop versions before 0.10.1 at the same time. This would further
>> reduce reflection based calls for supporting multiple versions. If the
>> users still on 0.9 could stay on current stable release of Beam, dropping
>> would not affect them. Otherwise, it would be good to hear from them about
>> how long we need to keep support for old versions.
>>
>> I don't think it is good idea to have multiple forks of KafkaIO in the
>> same repo. If we do go that route, we should fork the entire kafka
>> directory and rename the main class KafkaIO_Unmaintained :).
>>
>> IMHO, so far, additional complexity for supporting these versions is not
>> that bad. Most of it is isolated to ConsumerSpEL.java & ProducerSpEL.java.
>> My first preference is dropping support for deprecated versions (and a
>> deprecate a few more versions, may be till the version that added
>> transactions around 0.11.x I think).
>>
>> I haven't looked into what's new in Kafka 2.x. Are there any features
>> that KafkaIO should take advantage of? I have not noticed our existing code
>> breaking. We should certainly certainly support latest releases of Kafka.
>>
>> Raghu.
>>
>> On Tue, Apr 2, 2019 at 10:27 AM Mingmin Xu  wrote:
>>
>>>
>>> We're still using Kafka 0.10 a lot, similar as 0.9 IMO. To expand
>>> multiple versions in KafkaIO is quite complex now, and it confuses users
>>> which is supported / which is not. I would prefer to support Kafka 2.0+
>>> only in the latest version. For old versions, there're some options:
>>> 1). document Kafka-Beam support versions, like what we do in FlinkRunner;
>>> 2). maintain separated KafkaIOs for old versions;
>>>
>>> 1) would be easy to maintain, and I assume there should be no issue to
>>> use Beam-Core 3.0 together with KafkaIO 2.0.
>>>
>>> Any thoughts?
>>>
>>> Mingmin
>>>
>>> On Tue, Apr 2, 2019 at 9:56 AM Reuven Lax  wrote:
>>>
>>>> KafkaIO is marked as Experimental, and the comment already warns that
>>>> 0.9 support might be removed. I think that if users still rely on Kafka 0.9
>>>> we should leave a fork (renamed) of the IO in the tree for 0.9, but we can
>>>> definitely remove 0.9 support from the main IO if we want, especially if
>>>> it's complicated changes to that IO. If we do though, we should fail with a
>>>> clear error message telling users to use the Kafka 0.9 IO.
>>>>
>>>> On Tue, Apr 2, 2019 at 9:34 AM Alexey Romanenko <
>>>> aromanenko@gmail.com> wrote:
>>>>
>>>>> > How are multiple versions of Kafka supported? Are they all in one
>>>>> client, or is there a case for forks like ElasticSearchIO?
>>>>>
>>>>> They are supported in one client but we have additional “ConsumerSpEL”
>>>>> adapter which unifies interface difference among different Kafka client
>>>>> versions (mostly to support old ones 0.9-0.10.0).
>>>>>
>>>>> On the other hand, we warn user in Javadoc of KafkaIO (which is
>>>>> Unstable, btw) by the following:
>>>

Re: [ANNOUNCE] New committer announcement: Mark Liu

2019-05-09 Thread David Morávek
Congrats! 

D.

Sent from my iPhone

> On 9 May 2019, at 10:07, Reuven Lax  wrote:
> 
> Congratulations!
> 
>> On Thu, May 9, 2019 at 5:14 AM Etienne Chauchot  wrote:
>> Congrats !
>>> Le lundi 25 mars 2019 à 10:55 -0700, Chamikara Jayalath a écrit :
>>> Congrats Mark!
>>> 
 On Mon, Mar 25, 2019 at 10:50 AM Alexey Romanenko 
  wrote:
 Congratulations, Mark!
 
> On 25 Mar 2019, at 18:36, Mark Liu  wrote:
> 
> Thank you all! It's a great pleasure to work on Beam!
> 
> Mark
> 
>> On Mon, Mar 25, 2019 at 10:18 AM Robin Qiu  wrote:
>> Congratulations, Mark!
>> 
>>> On Mon, Mar 25, 2019 at 9:31 AM Udi Meiri  wrote:
>>> Congrats Mark!
>>> 
 On Mon, Mar 25, 2019 at 9:24 AM Ahmet Altay  wrote:
 Congratulations, Mark! 🎉
 
> On Mon, Mar 25, 2019 at 7:24 AM Tim Robertson 
>  wrote:
> Congratulations Mark!
> 
> 
>> On Mon, Mar 25, 2019 at 3:18 PM Michael Luckey  
>> wrote:
>> Nice! Congratulations, Mark.
>> 
>>> On Mon, Mar 25, 2019 at 2:42 PM Katarzyna Kucharczyk 
>>>  wrote:
>>> Congratulations, Mark! 🎉
>>> 
 On Mon, Mar 25, 2019 at 11:24 AM Gleb Kanterov  
 wrote:
 Congratulations!
 
> On Mon, Mar 25, 2019 at 10:23 AM Łukasz Gajowy 
>  wrote:
> Congrats! :)
> 
> 
> 
> pon., 25 mar 2019 o 08:11 Aizhamal Nurmamat kyzy 
>  napisał(a):
>> Congratulations, Mark!
>> 
>>> On Sun, Mar 24, 2019 at 23:18 Pablo Estrada 
>>>  wrote:
>>> Yeaah  Mark! : ) Congrats : D
>>> 
 On Sun, Mar 24, 2019 at 10:32 PM Yifan Zou 
  wrote:
 Congratulations Mark!
 
> On Sun, Mar 24, 2019 at 10:25 PM Connell O'Callaghan 
>  wrote:
> Well done congratulations Mark!!! 
> 
>> On Sun, Mar 24, 2019 at 10:17 PM Robert Burke 
>>  wrote:
>> Congratulations Mark! 🎉
>> 
>>> On Sun, Mar 24, 2019, 10:08 PM Valentyn Tymofieiev 
>>>  wrote:
>>> Congratulations, Mark!
>>> 
>>> Thanks for your contributions, in particular for your 
>>> efforts to parallelize test execution for Python SDK and 
>>> increase the speed of Python precommit checks. 
>>> 
 On Sun, Mar 24, 2019 at 9:40 PM Kenneth Knowles 
  wrote:
 Hi all,
 
 Please join me and the rest of the Beam PMC in welcoming a 
 new committer: Mark Liu.
 
 Mark has been contributing to Beam since late 2016! He has 
 proposed 100+ pull requests. Mark was instrumental in 
 expanding test and infrastructure coverage, especially for 
 Python. In consideration of Mark's contributions, the Beam 
 PMC trusts Mark with the responsibilities of a Beam 
 committer [1].
 
 Thank you, Mark, for your contributions.
 
 Kenn
 
 [1] 
 https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer
>> -- 
>> 
>> Aizhamal Nurmamat kyzy
>> Open Source Program Manager
>> 646-355-9740 Mobile
>> 601 North 34th Street, Seattle, WA 98103
>> 
>> 
 
 


Re: [VOTE] Vendored dependencies release process

2019-07-06 Thread David Morávek
+1

Sent from my iPhone

> On 6 Jul 2019, at 11:25, Lukasz Cwik  wrote:
> 
> +1
> 
>> On Wed, Jul 3, 2019 at 10:24 AM Jens Nyman  wrote:
>> +1
>> 
>> On 2019/07/02 23:49:10, Lukasz Cwik  wrote: 
>> > Please vote based on the vendored dependencies release process as> 
>> > discussed[1] and documented[2].> 
>> > 
>> > Please vote as follows:> 
>> > +1: Adopt the vendored dependency release process> 
>> > -1: The vendored release process needs to change because ...> 
>> > 
>> > Since many people in the US may be out due to the holiday schedule, I'll> 
>> > try to close the vote and tally the results on July 9th so please vote> 
>> > before then.> 
>> > 
>> > 1:> 
>> > https://lists.apache.org/thread.html/e2c49a5efaee2ad416b083fbf3b9b6db60fdb04750208bfc34cecaf0@%3Cdev.beam.apache.org%3E>
>> >  
>> > 2: https://s.apache.org/beam-release-vendored-artifacts> 
>> >


Re: [ANNOUNCE] New committer: Jan Lukavský

2019-07-31 Thread David Morávek
Congratulations Jan, well deserved! ;)

D.

On Wed, Jul 31, 2019 at 10:17 AM Ryan Skraba  wrote:

> Congratulations Jan!
>
> On Wed, Jul 31, 2019 at 10:10 AM Ismaël Mejía  wrote:
> >
> > Hi,
> >
> > Please join me and the rest of the Beam PMC in welcoming a new
> > committer: Jan Lukavský.
> >
> > Jan has been contributing to Beam for a while, he was part of the team
> > that contributed the Euphoria DSL extension, and he has done
> > interesting improvements for the Spark and Direct runner. He has also
> > been active in the community discussions around the Beam model and
> > other subjects.
> >
> > In consideration of Jan's contributions, the Beam PMC trusts him with
> > the responsibilities of a Beam committer [1].
> >
> > Thank you, Jan, for your contributions and looking forward to many more!
> >
> > Ismaël, on behalf of the Apache Beam PMC
> >
> > [1] https://beam.apache.org/committer/committer
>


Re: [ANNOUNCE] New committer: Robert Burke

2019-08-01 Thread David Morávek
Congratulations Robert!

On Thu, Aug 1, 2019 at 8:44 PM Maximilian Michels  wrote:

> Go Robert! ;) Congrats
>
> On 30.07.19 19:38, Mark Liu wrote:
> > Congratulations Robert!
> >
> > On Thu, Jul 18, 2019 at 9:49 AM 송원욱  > > wrote:
> >
> > Congrats Robert!
> >
> >
> >   Wonook
> >
> >
> >
> > 2019년 7월 18일 (목) 오전 6:47, Kyle Weaver  > >님이 작성:
> >
> > +1 to faster Go SDK iteration! Well-deserved, Rebo
> >
> > Kyle Weaver | Software Engineer | github.com/ibzib
> >  | kcwea...@google.com
> >  | +1650203
> >
> >
> > On Wed, Jul 17, 2019 at 2:44 PM Robert Burke  > > wrote:
> >
> > Thanks all! Hopefully this does mean reduced latency to
> > merge when folks send me Go SDK reviews. Let's get Beam
> GOing!
> >
> > On Wed, Jul 17, 2019, 11:22 AM Melissa Pashniak
> > mailto:meliss...@google.com>> wrote:
> >
> >
> > Congratulations!
> >
> >
> > On Wed, Jul 17, 2019 at 6:06 AM Alexey Romanenko
> >  > > wrote:
> >
> > Congratulations, Robert!
> >
> > > On 17 Jul 2019, at 14:49, Tim Robertson
> > >  > > > wrote:
> > >
> > > Congratulations Robert!
> > >
> > > On Wed, Jul 17, 2019 at 2:47 PM Gleb Kanterov
> > > mailto:g...@spotify.com>>
> wrote:
> > >
> > > Congratulations, Robert!
> > >
> > > On Wed, Jul 17, 2019 at 1:50 PM Robert
> > > Bradshaw  > > > wrote:
> > >
> > > Congratulations!
> > >
> > > On Wed, Jul 17, 2019, 12:56 PM Katarzyna
> > > Kucharczyk  > > > wrote:
> > >
> > > Congratulations! :)
> > >
> > > On Wed, Jul 17, 2019 at 12:46 PM
> > > Michał Walenia
> > >  > > >
> > > wrote:
> > >
> > > Congratulations, Robert! :)
> > >
> > > On Wed, Jul 17, 2019 at 12:45 PM
> > > Łukasz Gajowy  > > >
> wrote:
> > >
> > > Congratulations! :)
> > >
> > > śr., 17 lip 2019 o
> > > 04:30 Rakesh Kumar
> > >  > > >
> > > napisał(a):
> > >
> > > Congrats Rob!!!
> > >
> > > On Tue, Jul 16, 2019 at
> > > 10:24 AM Ahmet Altay
> > >  > > >
> > > wrote:
> > >
> > > Hi,
> > >
> > > Please join me and the
> > > rest of the Beam PMC
> > > in welcoming a
> > > new committer: Robert
> > > Burke.
> > >
> > > Robert has been
> > > contributing to Beam
> > > and actively involved
> > > in the community for
> > > over a year. He has
> > > been actively working
> > > on Go SDK, helping
> > > users, and making it
> > > easier for others to
> > > contribute [1].
> > >
> > > In consideration of
> > > Robert's
> > > contributions, the
> >

Re: Support ZetaSQL as a new SQL dialect in BeamSQL

2019-08-04 Thread David Morávek
Hi Rui,

This is definitely an interesting topic! Can you please elaborate little
bit more about the benefits, that this will bring to the end user? All the
documents only cover technical details and I'm still not sure what you're
trying to achieve product-wise.

Best,
D.

On Sun, Aug 4, 2019 at 8:07 PM Rui Wang  wrote:

> I created a google doc to explain basic design on Beam ZetaSQL:
> https://docs.google.com/document/d/14Yi4oEMzqS3n9-LfSNi6Q6kQpEP3gWTHzX0HxqUksdc/edit?usp=sharing
>
>
>
> -Rui
>
> On Sun, Aug 4, 2019 at 10:02 AM Rui Wang  wrote:
>
>> Thanks Manu for you feedback! Some comments inlined:
>>
>>
>> On Sat, Aug 3, 2019 at 8:41 PM Manu Zhang 
>> wrote:
>>
>>> A question to the community, does the size of the change require any
 process besides the usual PR reviews?

>>>
>>> I think so. This is a big change and has come as kind of a surprise
>>> (sorry if I've missed previous discussions).
>>>
>>> Rui, could you explain more on how things will play out between BeamSQL
>>> and ZetaSQL (A design doc including the pluggable interface would be
>>> perfect).
>>>
>>
>> I see. I will have a document about some basic idea on Beam ZetaSQL (this
>> is my way to call "ZetaSQL as a SQL dialect in BeamSQL", and I usually use
>> Beam CalciteSQL to refer to Calcite's SQL dialect.).
>>
>> At least from users perspective, it's simple to use: setup planner name
>> in BeamSqlPipelineOptions
>> 
>>  and
>> BeamSQL will initialize different planners: either Calcite or ZetaSQL is
>> supported now.
>>
>>
>>> From GitHub, ZetaSQL is mainly in C++ so what you are doing is a port or
>>> a connector to ZetaSQL? Do we need to depend on
>>> https://github.com/google/zetasql ? ZetaSQL looks interesting but I
>>> could barely find any doc for end users.
>>>
>>
>> ZetaSQL provides a Java interface which calls c++ binary through JNI. For
>> using ZetaSQL in BeamSQL, we only need to depend on ZetaSQL jars in maven
>> central (https://mvnrepository.com/search?q=zetasql). These jars
>> contains all we need to call ZetaSQL analyzer by Java.
>>
>>
>>> Also, I'd prefer the PR to be split into two, one for the pluggable
>>> interface and one for the ZetaSQL.
>>>
>>> Pluggable planner is already a separate PR merged before:
>> https://github.com/apache/beam/pull/7745
>>
>>
>> -Rui
>>
>>
>>
>>> Thanks,
>>> Manu
>>>
>>>
>>>
>>> On Sat, Aug 3, 2019 at 10:06 AM Ahmet Altay  wrote:
>>>
 Thank you Rui for the heads up.

 A question to the community, does the size of the change require any
 process besides the usual PR reviews?

 On Fri, Aug 2, 2019 at 10:23 AM Rui Wang  wrote:

> Hi community,
>
> I have been working on supporting ZetaSQL[1] as a SQL dialect in
> BeamSQL. ZetaSQL is a SQL analyzer open sourced by Google. Here is
> ZetaSQL's documentation[2].
>
> Birfely, the design of integrating ZetaSQL with BeamSQL is, I made a
> plugable query planner interface in BeamSQL, and we can easily plug in a
> new planner[3] (in my case, ZetaSQL planner). Actually anyone can add new
> planners by this way (e.g. PostgreSQL dialect).
>
> I want to contribute ZetaSQL planner and its related code(~10k) to
> Beam repo(#9210 ). This
> contribution barely touch existing Beam code (because the idea is plugable
> planner).
>
>
> *Acknowledgement*
> Thanks to all the people who provided help during Beam ZetaSQL
> development: Matthew Brown, Brian Hulette, Andrew Pilloud, Kenneth 
> Knowles,
> Anton Kedin and Mikhail Gryzykhin. This list is not exhausted and also
> thanks to contributions which are not listed.
>
>
> [1]: https://github.com/google/zetasql
> [2]: https://github.com/google/zetasql/tree/master/docs
> [3]:
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
>
>
> -Rui
>



Re: [ANNOUNCE] New committer: Kyle Weaver

2019-08-06 Thread David Morávek
Congratulations Kyle!!

Sent from my iPhone

> On 6 Aug 2019, at 18:47, Anton Kedin  wrote:
> 
> Congrats!
> 
>> On Tue, Aug 6, 2019, 9:37 AM Ankur Goenka  wrote:
>> Congratulations Kyle!
>> 
>>> On Tue, Aug 6, 2019 at 9:35 AM Ahmet Altay  wrote:
>>> Hi,
>>> 
>>> Please join me and the rest of the Beam PMC in welcoming a new committer: 
>>> Kyle Weaver.
>>> 
>>> Kyle has been contributing to Beam for a while now. And in that time period 
>>> Kyle got the portable spark runner feature complete for batch processing. 
>>> [1] 
>>> 
>>> In consideration of Kyle's contributions, the Beam PMC trusts him with the 
>>> responsibilities of a Beam committer [2].
>>> 
>>> Thank you, Kyle, for your contributions and looking forward to many more!
>>> 
>>> Ahmet, on behalf of the Apache Beam PMC
>>> 
>>> [1] 
>>> https://lists.apache.org/thread.html/c43678fc24c9a1dc9f48c51c51950aedcb9bc0fd3b633df16c3d595a@%3Cuser.beam.apache.org%3E
>>> [2] 
>>> https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer


Re: Cassandra flaky on Jenkins?

2019-09-03 Thread David Morávek
I’m running into these failures too

D.

Sent from my iPhone

> On 3 Sep 2019, at 14:34, Jean-Baptiste Onofré  wrote:
> 
> Hi,
> 
> Let me take a look. Do you always have this issue on Jenkins or randomly ?
> 
> Regards
> JB
> 
>> On 03/09/2019 14:19, Alex Van Boxel wrote:
>> Hi, is it only me that are bumping on the flaky Cassandra on Jenkins? I
>> like to get my PR approved but I can't get past the Cassandra error...
>> 
>>  * org.apache.beam.sdk.io.cassandra.CassandraIOTest.classMethod
>>
>> 
>> 
>> 
>> 
>>  _/
>> _/ Alex Van Boxel
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com


Re: Cassandra flaky on Jenkins?

2019-09-03 Thread David Morávek
yes, that looks similar. example:

https://github.com/apache/beam/pull/9464

D.

> On 3 Sep 2019, at 15:18, Jean-Baptiste Onofré  wrote:
> 
> Thanks David,
> 
> the build is running on my machine to see if I can reproduce locally.
> 
> It sounds like https://issues.apache.org/jira/browse/BEAM-7355 right ?
> 
> Regards
> JB
> 
>> On 03/09/2019 15:11, David Morávek wrote:
>> I’m running into these failures too
>> 
>> D.
>> 
>> Sent from my iPhone
>> 
>>> On 3 Sep 2019, at 14:34, Jean-Baptiste Onofré  wrote:
>>> 
>>> Hi,
>>> 
>>> Let me take a look. Do you always have this issue on Jenkins or randomly ?
>>> 
>>> Regards
>>> JB
>>> 
>>>> On 03/09/2019 14:19, Alex Van Boxel wrote:
>>>> Hi, is it only me that are bumping on the flaky Cassandra on Jenkins? I
>>>> like to get my PR approved but I can't get past the Cassandra error...
>>>> 
>>>> * org.apache.beam.sdk.io.cassandra.CassandraIOTest.classMethod
>>>>   
>>>> <https://builds.apache.org/job/beam_PreCommit_Java_Phrase/1300/testReport/junit/org.apache.beam.sdk.io.cassandra/CassandraIOTest/classMethod/>
>>>> 
>>>> 
>>>> 
>>>> _/
>>>> _/ Alex Van Boxel
>>> 
>>> -- 
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com


Re: Cassandra flaky on Jenkins?

2019-09-04 Thread David Morávek
Hi, temporarily disabling the test
<https://github.com/apache/beam/pull/9470>, until BEAM-8025
<https://jira.apache.org/jira/browse/BEAM-8025> is resolved (marking it as
blocker for 2.16), so we can unblock ongoing pull requests.

Best,
D.

On Tue, Sep 3, 2019 at 3:57 PM Jean-Baptiste Onofré  wrote:

> Hi Max,
>
> yup, I'm starting the investigation.
>
> I keep you posted.
>
> Regards
> JB
>
> On 03/09/2019 15:34, Maximilian Michels wrote:
> > The newest incarnation of this is here:
> > https://jira.apache.org/jira/browse/BEAM-8025
> >
> > Would be good if you could take a look JB.
> >
> > Thanks,
> > Max
> >
> > On 03.09.19 15:32, David Morávek wrote:
> >> yes, that looks similar. example:
> >>
> >> https://github.com/apache/beam/pull/9464
> >>
> >> D.
> >>
> >> On 3 Sep 2019, at 15:18, Jean-Baptiste Onofré  >> <mailto:j...@nanthrax.net>> wrote:
> >>
> >>> Thanks David,
> >>>
> >>> the build is running on my machine to see if I can reproduce locally.
> >>>
> >>> It sounds like https://issues.apache.org/jira/browse/BEAM-7355 right ?
> >>>
> >>> Regards
> >>> JB
> >>>
> >>> On 03/09/2019 15:11, David Morávek wrote:
> >>>> I’m running into these failures too
> >>>>
> >>>> D.
> >>>>
> >>>> Sent from my iPhone
> >>>>
> >>>>> On 3 Sep 2019, at 14:34, Jean-Baptiste Onofré  >>>>> <mailto:j...@nanthrax.net>> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> Let me take a look. Do you always have this issue on Jenkins or
> >>>>> randomly ?
> >>>>>
> >>>>> Regards
> >>>>> JB
> >>>>>
> >>>>>> On 03/09/2019 14:19, Alex Van Boxel wrote:
> >>>>>> Hi, is it only me that are bumping on the flaky Cassandra on
> >>>>>> Jenkins? I
> >>>>>> like to get my PR approved but I can't get past the Cassandra
> >>>>>> error...
> >>>>>>
> >>>>>> * org.apache.beam.sdk.io.cassandra.CassandraIOTest.classMethod
> >>>>>>   <
> https://builds.apache.org/job/beam_PreCommit_Java_Phrase/1300/testReport/junit/org.apache.beam.sdk.io.cassandra/CassandraIOTest/classMethod/
> >
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> _/
> >>>>>> _/ Alex Van Boxel
> >>>>>
> >>>>> --
> >>>>> Jean-Baptiste Onofré
> >>>>> jbono...@apache.org <mailto:jbono...@apache.org>
> >>>>> http://blog.nanthrax.net
> >>>>> Talend - http://www.talend.com
> >>>
> >>> --
> >>> Jean-Baptiste Onofré
> >>> jbono...@apache.org <mailto:jbono...@apache.org>
> >>> http://blog.nanthrax.net
> >>> Talend - http://www.talend.com
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


[DISCUSS] Supporting multiple Flink versions vs. tech debt

2019-09-07 Thread David Morávek
Hello,

we currently have an opened PR for Flink 1.9
, which greatly improves the
runner for batch use-case. In case the PR gets merged, we would be
supporting 5 latest major versions of Flink, which obviously come with high
maintenance price and makes future development harder (there are already a
sub-optimal parts due to compatibility with previous versions). Thomas and
Max expressed needs for addressing the issue with the current release.

Let's break down possible solution for the problem.

*1) Current solution*

Currently we maintain separate build for each version. The project
structure looks as follows:

*flink/*
+
*1.5/*
   + *src/** # implementation of classes that differ between versions*
   - build.gradle
+ *1.6/*
   + build.gradle #* the version is backward compatible, so it can reuse
"overrides" from 1.5*
+ *1.7/*
   + build.gradle #* the version is backward compatible, so it can reuse
"overrides" from 1.5*
+ *1.8/*
   + *src/ **# implementation of classes that differ between versions*
   - build.gradle
+ *1.9/*
   + *src/ **# implementation of classes that differ between versions*
   - build.gradle
+ *src/*
* # common source, shared among runner versions*
- flink_runner.gradle
* # included by  each /build.gradle*

The problem with this structure is, that we always need to copy all of the
version specific classes between backward incompatible versions, which
results in *duplicate files* (we can not simply override a single file,
because it wouldn't compile due to duplicate classes).

*2) Symlink duplicates*

Maybe we can simply symlink duplicates between versions and only override
the files that need to be changed?

*3) Adjusting the gradle build*

Currently a version build looks something like this (this one is for 1.7.x
version):

project.ext {
  // Set the version of all Flink-related dependencies here.
  flink_version = '1.7.2'
  // Main source directory and Flink version specific code.
  main_source_dirs = ["$basePath/src/main/java", "../1.5/src/main/java"]
  test_source_dirs = ["$basePath/src/test/java", "../1.5/src/test/java"]
  main_resources_dirs = ["$basePath/src/main/resources"]
  test_resources_dirs = ["$basePath/src/test/resources"]
  archives_base_name = 'beam-runners-flink-1.7'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_runner.gradle"

It basically says, take the common source and append version specific
implementations from 1.5 version. Let's say we want to override a single
file for 1.8. We need to copy everything from 1.5/src and the build file
would look as follows:

/* All properties required for loading the Flink build script */
project.ext {
  // Set the version of all Flink-related dependencies here.
  flink_version = '1.8.0'
  // Main source directory and Flink version specific code.
  main_source_dirs = ["$basePath/src/main/java", "./src/main/java"]
  test_source_dirs = ["$basePath/src/test/java", "./src/test/java"]
  main_resources_dirs = ["$basePath/src/main/resources"]
  test_resources_dirs = ["$basePath/src/test/resources"]
  archives_base_name = 'beam-runners-flink-1.8'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_runner.gradle"

For simplicity, let's only focus on *main_source_dirs*. What we really want
to do is to tell the build, to use everything from 1.5 and override a
single class (e.g. CoderTypeSerializer).

def copyOverrides = tasks.register('copyOverrides', Copy) {
  it.from '../1.5/src/', './src'
  it.into "${project.buildDir}/flink-overrides/src"
  it.duplicatesStrategy DuplicatesStrategy.INCLUDE // The last duplicate
file 'wins'.
}

compileJava.dependsOn copyOverrides

projext.ext {
  main_source_dirs = ["$basePath/src/main/java",
"${project.buildDir}/flink-overrides/src/main/java"]
}

This would copy all overrides into build directory, and it case of
duplicate it picks the latest one. Than the build would simple compile
classes from the newly created java files in build directory.

*4) Maintaining last 3 major versions only*

I recall that Flink community only supports 3 latest major versions
 (please correct me if I'm
mistaken). I suggest the the* Beam would do the same*. There is already an
opened BEAM-7962  that
suggest dropping 1.5 & 1.6 versions. Maybe this would allow us to keep the
current structure with bearable amount of technical debt?

Personally I'm in favor of *4)* combined with *3)*.

What do you think? Do you have any other suggestions how to solve this?

Thanks,
D.


Re: [DISCUSS] Supporting multiple Flink versions vs. tech debt

2019-09-09 Thread David Morávek
 > + *1.5_to_1.7_utils/*/ # common pluggable parts that work for 1.5
>> > > through 1.7/
>> > > - build.gradle # maybe makes sense to cross-compile and unit test
>> > > against 1.5 through 1.7, though dirs below give coverage too
>> > > + *src/*
>> > >
>> - org/apache/beam/runners/flink/v1_5_to_1_7/*CoderTypeSerializerFactory.java*
>> > >
>> > >
>> - 
>> org/apache/beam/runners/flink/v1_5_to_1_7/*EncodedValueTypeSerializerFactory.java*
>> > >
>> > > + *1.5/*
>> > > - build.gradle
>> > > + *src/ */# implementation of FlinkRunner compatible with 1.5,
>> > > could have some of its own logic plugged in to FlinkRunnerBuilder, and
>> > > some 1.5_to_1.7 utils/*
>> > > *
>> > >- org/apache/beam/runners/flink/*FlinkRunner.java */#
>> > > FlinkRunnerBuilder(new v1_5_to_1_7.CoderTypeSerializerFactory(), new
>> > > /v1_5_to_1_7./EncodedValueTypeSerializerFactory())/
>> > > + *1.6/*
>> > > - build.gradle
>> > > + *src/ */# implementation of FlinkRunner compatible with 1.6,
>> > > actually has none of its own logic but it could/*
>> > > *
>> > >- org/apache/beam/runners/flink/*FlinkRunner.java */#
>> > > FlinkRunnerBuilder(new v1_5_to_1_7.CoderTypeSerializerFactory(), new
>> > > /v1_5_to_1_7./EncodedValueTypeSerializerFactory())/
>> > > + *1.7/*
>> > > + *src/ */# implementation of FlinkRunner compatible with 1.7,
>> > > actually has none of its own logic but it could/*
>> > > *
>> > >- org/apache/beam/runners/flink/*FlinkRunner.java */#
>> > > FlinkRunnerBuilder(new v1_5_to_1_7.CoderTypeSerializerFactory(), new
>> > > /v1_5_to_1_7./EncodedValueTypeSerializerFactory())/
>> > > + *1.8/*
>> > > - build.gradle
>> > > + *src/*
>> > >- org/apache/beam/runners/flink/*FlinkRunner.java */#
>> > > FlinkRunnerBuilder(new v1_8_CoderTypeSerializerFactory(), new
>> > > EncodedValueTypeSerializerFactory())/
>> > > - org/apache/beam/runners/flink/*v1_8/CoderTypeSerializerFactory.java*
>> > >
>> - org/apache/beam/runners/flink/*v1_8/EncodedValueTypeSerializerFactory.java*
>> > >
>> > >
>> > > Kenn
>> > >
>> > > On Sat, Sep 7, 2019 at 9:00 AM Lukasz Cwik > > > <mailto:lc...@google.com>> wrote:
>> > >
>> > > When we import the Beam code into Google, we also run into issues
>> > > where sometimes we need to transform parts of the code. During
>> > > import we use copybara[1] to do these transformations to the
>> source
>> > > which are more then just copy file X from some other path since
>> most
>> > > of the time we want to change only a few lines and this really
>> helps
>> > > reduce the maintenance pain. Unfortunately I don't see a Gradle
>> > > plugin for copybara but I do imagine there is a plugin that allows
>> > > one to run SED like expressions or other transformations instead
>> of
>> > > just maintaining duplicate copies of files.
>> > >
>> > > 1: https://github.com/google/copybara
>> > >
>> > > On Sat, Sep 7, 2019 at 3:37 AM David Morávek > > > <mailto:d...@apache.org>> wrote:
>> > >
>> > > Hello,
>> > >
>> > > we currently have an opened PR for Flink 1.9
>> > > <https://github.com/apache/beam/pull/9296>, which greatly
>> > > improves the runner for batch use-case. In case the PR gets
>> > > merged, we would be supporting 5 latest major versions of
>> Flink,
>> > > which obviously come with high maintenance price and makes
>> > > future development harder (there are already a sub-optimal
>> parts
>> > > due to compatibility with previous versions). Thomas and Max
>> > > expressed needs for addressing the issue with the current
>> > > release.
>> > >
>> > > Let's break down possible solution for the problem.
>> > >
>> > > *1) Current solution*
>> > > *
>> > > *
>> > > Currently we maintain separate build for each version. The
>> > > project structure looks as follows:
&g

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread David Morávek
Hi,

Spark's GBK is currently implemented using `sortBy(key and
value).mapPartition(...)` for non-merging windowing in order to support
large keys and large scale shuffles. Merging windowing is implemented using
standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
which is by design unable to support large keys.

As Jan noted, problem with mapPartition is, that its UDF receives an
Iterator. Only option here is to wrap this iterator to one that spills to
disk once an internal buffer is exceeded (the approach suggested by
Reuven). This unfortunately comes with a cost in some cases. The best
approach would be to somehow determine, that user wants multiple iterations
and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
how to approach this?

D.

On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax  wrote:

> The Beam API was written to support multiple iterations, and there are
> definitely transforms that do so. I believe that CoGroupByKey may do this
> as well with the resulting iterator.
>
> I know that the Dataflow runner is able to handles iterators larger than
> available memory by paging them in from shuffle, which still allows for
> reiterating. It sounds like Spark is less flexible here?
>
> Reuven
>
> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský  wrote:
>
>> +dev  
>>
>> Lukasz, why do you think that users expect to be able to iterate multiple
>> times grouped elements? Besides that it obviously suggests the 'Iterable'?
>> The way that spark behaves is pretty much analogous to how MapReduce used
>> to work - in certain cases it calles repartitionAndSortWithinPartitions and
>> then does mapPartition, which accepts Iterator - that is because internally
>> it merge sorts pre sorted segments. This approach enables to GroupByKey
>> data sets that are too big to fit into memory (per key).
>>
>> If multiple iterations should be expected by users, we probably should:
>>
>>  a) include that in @ValidatesRunner tests
>>
>>  b) store values in memory on spark, which will break for certain
>> pipelines
>>
>> Because of (b) I think that it would be much better to remove this
>> "expectation" and clearly document that the Iterable is not supposed to be
>> iterated multiple times.
>>
>> Jan
>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>
>> I pretty much think so, because that is how Spark works. The Iterable
>> inside is really an Iterator, which cannot be iterated multiple times.
>>
>> Jan
>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>
>> Jan, in Beam users expect to be able to iterate the GBK output multiple
>> times even from within the same ParDo.
>> Is this something that Beam on Spark Runner never supported?
>>
>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský  wrote:
>>
>>> Hi Gershi,
>>>
>>> could you please outline the pipeline you are trying to execute?
>>> Basically, you cannot iterate the Iterable multiple times in single ParDo.
>>> It should be possible, though, to apply multiple ParDos to output from
>>> GroupByKey.
>>>
>>> Jan
>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> I want to iterate multiple times on the Iterable (the output of
>>> GroupByKey transformation)
>>>
>>> When my Runner is SparkRunner, I get an exception:
>>>
>>>
>>>
>>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>>> iterated more than once,otherwise there could be data lost
>>>
>>> at
>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>
>>> at java.lang.Iterable.spliterator(Iterable.java:101)
>>>
>>>
>>>
>>>
>>>
>>> I understood I can branch the pipeline after GroupByKey into multiple
>>> transformation and iterate in each of them once on the Iterable.
>>>
>>>
>>>
>>> Is there a better way for that?
>>>
>>>
>>>
>>>
>>>
>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>
>>> Software Developer
>>>
>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>
>>> [image: Mail_signature_blue]
>>>
>>>
>>>
>>>


Re: ES 7.0 Support Development

2019-11-07 Thread David Morávek
Hi Zhong,

just fyi, there is another ongoing effort on adding es 7 support.

https://github.com/apache/beam/pull/10025

you guys should get in touch ;)

D.

Sent from my iPhone

> On 7 Nov 2019, at 20:20, Zhong Chen  wrote:
> 
> 
> Hi all,
> 
> I have made a PR for adding ES 7.0 support here. However the unit tests are 
> failing because for some reason the test cluster is not publishing http 
> endpoints correctly, which is leading to connection refused exception. I am 
> still trying to figure that out. Any help would be much appreciated!
> 
> In addition, can someone take a look at my existing PR and provide some 
> feedback?
> 
> https://issues.apache.org/jira/browse/BEAM-5192
> 
> 
> 
> Zhong Chen
> Big Data and Analytics Cloud Consultant
> 650-502-0142
> zhongc...@google.com
> 
> 
> 
> 
> 
> 


Re: Full stream-stream join semantics

2019-11-26 Thread David Morávek
Hi,

I think what Jan has in mind would look something like this
, if
implemented in user code. Am I right?

D.


On Tue, Nov 26, 2019 at 10:23 AM Jan Lukavský  wrote:

>
> On 11/25/19 11:45 PM, Kenneth Knowles wrote:
>
>
>
> On Mon, Nov 25, 2019 at 1:56 PM Jan Lukavský  wrote:
>
>> Hi Rui,
>>
>> > Hi Kenn, you think stateful DoFn based join can emit joined rows that
>> never to be retracted because in stateful DoFn case joined rows will be
>> controlled by timers and emit will be only once? If so I will agree with
>> it. Generally speaking, if only emit once is the factor of needing
>> retraction or not.
>>
>> that would imply buffering elements up until watermark, then sorting and
>> so reduces to the option a) again, is that true? This also has to deal with
>> allowed lateness, that would mean, that with allowed lateness greater than
>> zero, there can still be multiple firings and so retractions are needed.
>>
> Specifically, when I say "bi-temporal join" I mean unbounded-to-unbounded
> join where one of the join conditions is that elements are within event
> time distance d of one another. An element at time t will be saved until
> time t + 2d and then garbage collected. Every matching pair can be emitted
> immediately.
>
> OK, this might simplify things a little. Is there a design doc for that?
> If there are multiple LHS elements within event time distance from RHS
> element, which one should be joined? I suppose all of them, but that is not
> "(time-varying-)relational" join semantics. In that semantics only the last
> element must be joined, because that is how a (classical) relational
> database would see the relation at time T (the old record would have been
> overwritten and not be part of the output). Because of the time distance
> constraint this is different from the join I have in mind, because that
> simply joins every LHS element(s) to most recent RHS element(s) and vice
> versa, without any additional time constraints (that is the RHS "update"
> can happen arbitrarily far in past).
>
> Jan
>
>
> In the triggered CoGBK + join-product implementation, you do need
> retractions as a model concept. But you don't need full support, since they
> only need to be shipped as deltas and only from the CoGBK to the
> join-product transform where they are all consumed to create only positive
> elements. Again a delay is not required; this yields correct results with
> the "always" trigger.
>
> Neither case requires waiting or time sorting a whole buffer. The
> bi-temporal join requires something more, in a way, since you need to query
> by time range and GC time prefixes.
>
> Kenn
>
> Jan
>> On 11/25/19 10:17 PM, Rui Wang wrote:
>>
>>
>>
>> On Mon, Nov 25, 2019 at 11:29 AM Jan Lukavský  wrote:
>>
>>>
>>> On 11/25/19 7:47 PM, Kenneth Knowles wrote:
>>>
>>>
>>>
>>> On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský  wrote:
>>>
 I can put down a design document, but before that I need to clarify
 some things for me. I'm struggling to put all of this into a bigger
 picture. Sorry if the arguments are circulating, but I didn't notice any
 proposal of how to solve these. If anyone can disprove any of this logic it
 would be very much appreciated as I might be able to get from a dead end:

  a) in the bi-temporal join you can either buffer until watermark, or
 emit false data that has to be retracted

>>> This is not the case. A stateful DoFn based join can emit immediately
>>> joined rows that will never need to be retracted. The need for retractions
>>> has to do with CoGBK-based implementation of a join.
>>>
>>> I fail to see how this could work. If I emit joined rows immediately
>>> without waiting for watermark to pass, I can join two elements, that don't
>>> belong to each other, because later can arrive element with lower time
>>> distance, that should have been joint in the place of the previously
>>> emitted one. This is wrong result that has to be retracted. Or what I'm
>>> missing?
>>>
>>
>> Hi Kenn, you think stateful DoFn based join can emit joined rows that
>> never to be retracted because in stateful DoFn case joined rows will be
>> controlled by timers and emit will be only once? If so I will agree with
>> it. Generally speaking, if only emit once is the factor of needing
>> retraction or not.
>>
>> In the past brainstorming, even having retractions ready, streaming join
>> with windowing are likely be implemented by a style of CoGBK + stateful
>> DoFn.
>>
>>
>>
>> I suggest that you work out the definition of the join you are interested
>>> in, with a good amount of mathematical rigor, and then consider the ways
>>> you can implement it. That is where a design doc will probably clarify
>>> things.
>>>
>>> Kenn
>>>
>>>  b) until retractions are 100% functional (and that is sort of holy
 grail for now), then the only solution is using a buffer holding data up to
 watermark *and then sort by ev

Re: Full stream-stream join semantics

2019-11-26 Thread David Morávek
Yes, in batch case with long-term historical data, this would be O(n^2) as
it basically a bubble sort. If you have large # of updates for a single
key, this would be super expensive.

Kenn, can this be re-implemented with your solution?

On Tue, Nov 26, 2019 at 1:10 PM Jan Lukavský  wrote:

> Functionally yes. But this straightforward solution is not working for me
> for two main reasons:
>
>  - it either blows state in batch case or the time complexity of the sort
> would be O(n^2) (and reprocessing several years of dense time-series data
> makes it a no go)
>
>  - it is not reusable for different time-ordering needs, because the logic
> implemented purely in user-space cannot be transferred to different problem
> (there are two states needed, one for buffer, the other for user-state) and
> extending DoFns does not work (cannot create abstract SortedDoFn, because
> of the state annotation definitions)
>
> Jan
> On 11/26/19 12:56 PM, David Morávek wrote:
>
> Hi,
>
> I think what Jan has in mind would look something like this
> <https://gist.github.com/dmvk/3ea32eb36c6406fa72d70b9b1df1d878>, if
> implemented in user code. Am I right?
>
> D.
>
>
> On Tue, Nov 26, 2019 at 10:23 AM Jan Lukavský  wrote:
>
>>
>> On 11/25/19 11:45 PM, Kenneth Knowles wrote:
>>
>>
>>
>> On Mon, Nov 25, 2019 at 1:56 PM Jan Lukavský  wrote:
>>
>>> Hi Rui,
>>>
>>> > Hi Kenn, you think stateful DoFn based join can emit joined rows that
>>> never to be retracted because in stateful DoFn case joined rows will be
>>> controlled by timers and emit will be only once? If so I will agree with
>>> it. Generally speaking, if only emit once is the factor of needing
>>> retraction or not.
>>>
>>> that would imply buffering elements up until watermark, then sorting and
>>> so reduces to the option a) again, is that true? This also has to deal with
>>> allowed lateness, that would mean, that with allowed lateness greater than
>>> zero, there can still be multiple firings and so retractions are needed.
>>>
>> Specifically, when I say "bi-temporal join" I mean unbounded-to-unbounded
>> join where one of the join conditions is that elements are within event
>> time distance d of one another. An element at time t will be saved until
>> time t + 2d and then garbage collected. Every matching pair can be emitted
>> immediately.
>>
>> OK, this might simplify things a little. Is there a design doc for that?
>> If there are multiple LHS elements within event time distance from RHS
>> element, which one should be joined? I suppose all of them, but that is not
>> "(time-varying-)relational" join semantics. In that semantics only the last
>> element must be joined, because that is how a (classical) relational
>> database would see the relation at time T (the old record would have been
>> overwritten and not be part of the output). Because of the time distance
>> constraint this is different from the join I have in mind, because that
>> simply joins every LHS element(s) to most recent RHS element(s) and vice
>> versa, without any additional time constraints (that is the RHS "update"
>> can happen arbitrarily far in past).
>>
>> Jan
>>
>>
>> In the triggered CoGBK + join-product implementation, you do need
>> retractions as a model concept. But you don't need full support, since they
>> only need to be shipped as deltas and only from the CoGBK to the
>> join-product transform where they are all consumed to create only positive
>> elements. Again a delay is not required; this yields correct results with
>> the "always" trigger.
>>
>> Neither case requires waiting or time sorting a whole buffer. The
>> bi-temporal join requires something more, in a way, since you need to query
>> by time range and GC time prefixes.
>>
>> Kenn
>>
>> Jan
>>> On 11/25/19 10:17 PM, Rui Wang wrote:
>>>
>>>
>>>
>>> On Mon, Nov 25, 2019 at 11:29 AM Jan Lukavský  wrote:
>>>
>>>>
>>>> On 11/25/19 7:47 PM, Kenneth Knowles wrote:
>>>>
>>>>
>>>>
>>>> On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský  wrote:
>>>>
>>>>> I can put down a design document, but before that I need to clarify
>>>>> some things for me. I'm struggling to put all of this into a bigger
>>>>> picture. Sorry if the arguments are circulating, but I didn't notice any
>>>>> proposal of how to solve these. If anyone can disprov

Re: Full stream-stream join semantics

2019-11-28 Thread David Morávek
Agreed with Jan. This kind of use case requires having incoming elements
ordered by timestamp. Only effective solution is to delegate sorting to the
runner, which is currently impossible. Introducing an "annotation" that
would guarantee event time order looks like a nice clean to solve this. :+1:

I'd love to see this effort moving forward, are there any objections
against this I'm not aware of (looking at the previous discussions I didn't
find any)?

D.

On Thu, Nov 28, 2019 at 10:46 AM Jan Lukavský  wrote:

> Hi Reza,
> On 11/28/19 8:16 AM, Reza Rokni wrote:
>
> Hi,
>
> With regards to the processing needed for sort:
> The first naive implementation of the prototype did a read and sort for
> every Timer that fired ( timers was set to fire for every LHS element
> timestamp, a property of the use case we was looking at). This worked but
> was very slow as you would expect, so we changed things to make use of
> bundle boundaries as a way to reduce the number of sorts, by storing the
> sorted list into a static map ( Key-Window as key) for the duration of the
> bundle. It was very effective for the use case, but added a lot of
> technical debt and hard to figure out potential bugs...
>
> Note that when you push the sorting from user code to runner (even for
> streaming), then a much more efficient implementation appears, because you
> can read and sort all elements from the sort buffer *up to the input
> watermark*. This is much bigger "hop" the per element and therefore is very
> efficient even with no other optimizations in place. The problem is that in
> user code, the actual input watermark is unknown (yes, that could be
> changed, we can add the value of input watermark to OnTimerContext).
>
>
> With regards to memory needs:
> In our use case, while there was a lot of elements, the elements were
> small in size and even in batch mode we could process all of the data
> without OOM. But we would want a generalized solution not to have to rely
> on this property when in batch mode of course.
>
> +1
>
>
> Just a thought Jan as a temporary solution, for your use case, would
> stripping down the element to just timestamp & joinkey allow the data to
> fit into memory for the batch processing mode? It would require more work
> afterwards to add back the other properties ( a lhs and rhs pass I think..)
> , which could make it prohibitive...?
>
> Actually there are workarounds, yes. I'm looking for a generic solution,
> and because I have implemented the @RequiresTimeSortedInput annotation and
> I'm using it, I actually don't need any workarounds. :-) I just need a
> consensus to add this to master, because I don't (obviously) want to keep
> and maintain that outside Beam.
>
> Jan
>
>
> Cheers
> Reza
>
>
>
>
>
>
>
>
> On Thu, 28 Nov 2019 at 14:12, Kenneth Knowles  wrote:
>
>> Yes, I am suggesting to add more intelligent state data structures for
>> just that sort of join. I tagged Reza because his work basically does it,
>> but explicitly pulls a BagState into memory and sorts it. We just need to
>> avoid that. It is the sort of thing that already exists in some engines so
>> there's proof of concept :-). Jan makes the good point that executing the
>> same join in batch you wouldn't use the same algorithm, because the
>> disorder will be unbounded. In Beam you'd want a PTransform that expands
>> differently based on whether the inputs are bounded or unbounded.
>>
>> Kenn
>>
>> On Tue, Nov 26, 2019 at 4:16 AM David Morávek 
>> wrote:
>>
>>> Yes, in batch case with long-term historical data, this would be O(n^2)
>>> as it basically a bubble sort. If you have large # of updates for a single
>>> key, this would be super expensive.
>>>
>>> Kenn, can this be re-implemented with your solution?
>>>
>>> On Tue, Nov 26, 2019 at 1:10 PM Jan Lukavský  wrote:
>>>
>>>> Functionally yes. But this straightforward solution is not working for
>>>> me for two main reasons:
>>>>
>>>>  - it either blows state in batch case or the time complexity of the
>>>> sort would be O(n^2) (and reprocessing several years of dense time-series
>>>> data makes it a no go)
>>>>
>>>>  - it is not reusable for different time-ordering needs, because the
>>>> logic implemented purely in user-space cannot be transferred to different
>>>> problem (there are two states needed, one for buffer, the other for
>>>> user-state) and extending DoFns does not work (cannot create abstract
>>>> SortedDoFn, because of th

Re: Update on push-down for SQL IOs.

2019-11-28 Thread David Morávek
Nice, this should bring a great performance improvement for SQL. Thanks for
your work!

On Thu, Nov 28, 2019 at 6:33 AM Kenneth Knowles  wrote:

> Nice! Thanks for the very thorough summary. I think this will be a really
> good thing for Beam. Most of the IO sources are very highly optimized for
> querying and will do it more efficiently than the Beam runner when the
> structure of the query matches. I'm really excited to see the performance
> measurements.
>
> A have a thought: your update did not mention a few extensions that we
> might consider: ParquetIO, CassandraIO/HBaseIO/BigTableIO (all should be
> about the same), JdbcIO, IcebergIO (doesn't exist yet, but is basically
> generalized schema-aware files as I understand it). Are these things you
> are thinking about doing, or would these be Jiras that could potentially be
> tagged "starter"? They seem complex but maybe your framework will make it
> feasible for someone with slightly less experience to implement new
> versions of what you have already finished?
>
> Kenn
>
> On Tue, Nov 26, 2019 at 12:19 PM Kirill Kozlov 
> wrote:
>
>> Hello everyone!
>>
>> I have been working on a push-down feature and would like to give a brief
>> update on what is done and is still under works.
>>
>> *Things that are done*:
>> General API for SQL IOs to provide information about what
>> filters/projects they support [1]:
>> - *Filter* can be unsupported, supported with field reordering, and
>> supported without field reordering.
>> - *Predicate* is broken down into a conjunctive normal form (CNF) and
>> passed to a validator class to check what parts are supported or
>> unsupported by an IO.
>>
>> A Calcite rule [2] that checks for push-down support, constructs a new IO
>> source Rel [3] with pushed-down projects and filters when applicable, and
>> preserves unsupported filters/projects.
>>
>> BigQuery should perform push-down when running queries in DIRECT_READ
>> method [4].
>>
>> MongoDB project push-down support is in a PR [5] and predicate support
>> will be added soon.
>>
>>
>> *Things that are in progress:*
>> Documenting how developers can enable push-down for IOs that support it.
>>
>> Documenting certain limitation for BigQuery push-down (ex: comparing
>> values of 2 columns is not supported at the moment, so it is being
>> preserved in a Calc).
>>
>> Updating google-cloud-bigquerystorage to 0.117.0-beta. Earlier versions
>> have a gRPC message limit set to ~11MB, which may cause some pipelies to
>> break when reading from a table with rows larger than the limit.
>>
>> Adding some sort of performance tests to run continuously to
>> measure speed-up and detect regressions.
>>
>> Deciding how cost should be computed for the IO source Rel with push-down
>> [6]. Right now the following formula is used: cost of an IO without
>> push-down minus the normalized (between 0.0 and 1.0) benefit of a performed
>> push-down.
>> The challenge here is to make the change to the cost small enough to not
>> break join reordering, but large enough to make the optimizer favor
>> pushed-down IO.
>>
>>
>> If you have any suggestions/questions/concerns I would love to hear them.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BeamSqlTable.java#L36
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java
>> [3]
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamPushDownIOSourceRel.java
>> [4]
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java#L128
>> [5] https://github.com/apache/beam/pull/10095
>> [6] https://github.com/apache/beam/pull/10060
>>
>> --
>> Kirill
>>
>


Re: [DISCUSS] BIP reloaded

2019-12-09 Thread David Morávek
Hi Jan,

I think this is more pretty much what we currently do, just a little bit
more transparent for the community. If the process is standardized, it can
open doors for bigger contributions from people not familiar with the
process. Also it's way easier to track progress of BIPs, than documents
linked from the mailing list.

Big +1 ;)

D.

On Sun, Dec 8, 2019 at 12:42 PM Jan Lukavský  wrote:

> Hi,
>
> I'd like to revive a discussion that was taken some year and a half ago
> [1], which included a concept of "BIP" (Beam Improvement Proposal) - an
> equivalent of "FLIP" (flink), "KIP" (kafka), "SPIP" (spark), and so on.
>
> The discussion then ended without any (public) conclusion, so I'd like
> to pick up from there. There were questions related to:
>
>   a) how does the concept of BIP differ from simple plain JIRA?
>
>   b) what does it bring to the community?
>
> I'd like to outline my point of view on both of these aspects (they are
> related).
>
> BIP differs from JIRA by definition of a process:
>
> BIP -> vote -> consensus -> JIRA -> implementation
>
> This process (although it might seem a little unnecessary formal) brings
> the following benefits:
>
>   i) improves community's overall awareness of planned and in-progress
> features
>
>   ii) makes it possible to prioritize long-term goals (create "roadmap"
> that was mentioned in the referred thread)
>
>   iii) by casting explicit vote on each improvement proposal diminishes
> the probability of wasted work - as opposed to our current state, where
> it is hard to tell when there is a consensus and what actions need to be
> done in order to reach one if there isn't
>
>   iv) BIPs that eventually pass a vote can be regarded as "to be
> included in some short term" and so new BIPs can build upon them,
> without the risk of having to be redefined if their dependency for
> whatever reason don't make it to the implementation
>
> Although this "process" might look rigid and corporate, it actually
> brings better transparency and overall community health. This is
> especially important as the community grows and becomes more and more
> distributed. There are many, many open questions in this proposal that
> need to be clarified, my current intent is to grab a grasp about how the
> community feels about this.
>
> Looking forward to any comments,
>
>   Jan
>
> [1]
>
> https://lists.apache.org/thread.html/4e1fffa2fde8e750c6d769bf4335853ad05b360b8bd248ad119cc185%40%3Cdev.beam.apache.org%3E
>
>


Re: [DISCUSS] Drop support for Flink 1.7

2020-02-19 Thread David Morávek
+1 for dropping 1.7, once we have 1.10 support ready

D.

On Tue, Feb 18, 2020 at 7:01 PM  wrote:

> Hi Ismael,
> yes, sure. The proposal would be to have snapshot dependency in the
> feature branch. The snapshot must be changed to release before merge to
> master.
> Jan
>
> Dne 18. 2. 2020 17:55 napsal uživatel Ismaël Mejía :
>
> Just to be sure, you mean Flink 1.11.0-SNAPSHOT ONLY in the next branch
> dependency?
> We should not have any SNAPSHOT dependency from other projects in Beam.
>
> On Tue, Feb 18, 2020 at 5:34 PM  wrote:
>
> Hi=Cr�Jincheng,
> I think there should be a "process" for this. I would propose to:
>  a) create new branch with support for new (snapshot) flink - currently
> that would mean flink 1.11
>  b) as part of this brach drop support for all version up to N - 3
> I think th!t that dropping all versions and adding new version should be
> atomic, otherwise we risk we release beam version with less than three
> supprted flink versions.
> I'd suggest to start with the 1.10 branch support, include the drop of 1.7
> into that branch. Once 1.10 gets merged, we should create 1.11 with
> snapshot dependency to be able to keep up with the release cadence of flink.
> WDYT?
>  Jan
>
> Dne 18. 2. 2020 15:34 napsal uživatel jincheng sun <
> sunjincheng...@gmail.com>:
>
> Hi folks,
>
> Apache Flink 1.10 has completed the release announcement [1]n Then we
> would like to add Flink 1.10 build target and make Flink Runner compatible
> with Flink 1.10 [2]. So, I would suggest that at most three versions of
> Flink runner for Apache Beam community according to the update Policy of
> Apache Flink releases [3] , i.e. I think it's better to maintain the three
> versions of 1.8/1.9/1.10 after add Flink 1.10 build target to Flink runner.
>
> The current existence of Flink runner 1.7 will affect the upgrade of Flink
> runner 1.8x and 1.9x due to the code of Flink 1.7 is too old, more detail
> can be found in [4]. So,  we need to drop the support of Flink runner 1.7
> as soon as possible.
>
> This discussion also CC to @User, due to the change will affect our users.
> And I would appreciate it if you could review the PR [5].
>
> Welcome any feedback!
>
> Best,
> Jincheng
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flin+-1-10-0-released-td37564.html
> 
> [2] https://issues.apache.org/jira/browse/BEAM-9295
> [3] https://flink.apache.org/downloads.html#update-policy-for-old-releases
> [4] https://issues.apache.org/jira/browse/BEAM-9299
> [5] https://github.com/apache/beam/pull/10884
>
>
>
>


Euphoria Java 8 DSL - proposal

2017-12-17 Thread David Morávek
Hello,


First of all, thanks for the amazing work the Apache Beam community is
doing!


In 2014, we've started development of the runtime independent Java 8 API,
that helps us to create unified big-data processing flows. It has been used
as a core building block of Seznam.cz web crawler data infrastructure every
since. Its design principles and execution model are very similar to Apache
Beam.


This API was open sourced in 2016, under the name Euphoria API:

https://github.com/seznam/euphoria


As it is very similar to Apache Beam, we feel, that it is not worth of
duplicating effort in terms of development of new runtimes and fine-tuning
of current ones.


The main blocker for us to switch to Apache Beam is lack of the Java 8 API.
*W*e propose the integration of Euphoria API into Apache Beam as a Java 8
DSL, in order to share our effort with the community.


Simple example of the Euphoria API usage, can be found here:

https://github.com/seznam/euphoria/tree/master/euphoria-examples/src/main/java/cz/seznam/euphoria/examples/wordcount


If you feel, that Beam community could leverage from our work, we would
love to start working on Euphoria integration into Apache Beam (we already
have a working POC, with few basic operators implemented).


I look forward to hearing from you,

David


Re: Euphoria Java 8 DSL - proposal

2017-12-18 Thread David Morávek
Hello,

Thanks for the awesome feedback!

Romain:

We already use Java Stream API in all operators where it makes sense (eg.:
ReduceByKey). Still not sure if it was a good choice, but i can be easily
converted to iterator anyway.

Side outputs support is coming soon, we already made an initial work on
this.

Side inputs are not supported in a way you are used to from beam, because
it can be replaced by Join operator on the same key (if annotated with
broadcastHashJoin, it will be turned into map side join).

Only significant difference from Beam is, that we decided not to abstract
serialization, so we need to add support for Type Hints, because of type
erasure.

Fluent API:

API is fluent within one operator. It is designed to "lead the programmer",
which means, that he we'll be only offered methods that makes sense after
the last method he used (eg.: in ReduceByKey, we know that after keyBy
either reduceBy method should come). It is implemented as a series of
builders.

Davor:

Thanks, I'll contact you, and will start the process of having all the
necessary paperwork signed on our side, so we can get things moving.












On Mon, Dec 18, 2017 at 7:46 AM, Romain Manni-Bucau 
wrote:

> Hi guys
>
> A DSL would be very welcomed, in particular if fluent.
>
> Open question: did you study to implement Stream API (surely extending it
> to have a BeamStream and a few more features like sides etc)? Would be very
> natural and integrable easily anywhere and avoid a new API discovery.
>
> Hazelcast jet did it so I dont see why Beam couldnt.
>
> Le 18 déc. 2017 07:26, "Davor Bonaci"  a écrit :
>
>> Hi David,
>> As JB noted, merging of these two projects is a great idea. If fact, some
>> of us have had those discussions in the past.
>>
>> Legally, nothing particular is strictly necessary as the code seem to
>> already be Apache 2.0 licensed. We don't, however, want to be perceived as
>> making hostile forks, so it would be great to file a Software Grant
>> Agreement with the ASF Secretary. I can help with the process, as necessary.
>>
>> Project alignment-wise, there aren't any particular blockers that I am
>> aware of. We welcome DSLs.
>>
>> Technically, the code would start in a feature branch. During this stage,
>> we'd need to validate a few things, including confirmation the code and
>> dependencies match the ASF policy, automate testing in Beam's tooling, etc.
>> At that point, we'd take a community vote to accept the component into
>> master, and consider author(s) for committership in the overall project.
>>
>> Welcome to the ASF and Beam -- we are thrilled to have you! Hope this
>> helps, and please reach out if anybody on our end can help, including JB or
>> myself.
>>
>> Davor
>>
>>
>> On Sun, Dec 17, 2017 at 10:13 AM, Jean-Baptiste Onofré 
>> wrote:
>>
>>> Hi David,
>>>
>>> Generally speaking, having different fluent DSL on top of the Beam SDK
>>> is great.
>>>
>>> I would like to take a look on your wordcount examples to give you a
>>> complete feedback. I like the idea and a fluent Java DSL is valuable.
>>>
>>> Let's wait feedback from others. If we have a consensus, then I would be
>>> more than happy to help you for the donation (I worked on the Camel Java
>>> DSL while ago, so I have some experience here).
>>>
>>> Thanks !
>>> Regards
>>> JB
>>>
>>> On 12/17/2017 07:00 PM, David Morávek wrote:
>>>
>>>> Hello,
>>>>
>>>>
>>>> First of all, thanks for the amazing work the Apache Beam community is
>>>> doing!
>>>>
>>>>
>>>> In 2014, we've started development of the runtime independent Java 8
>>>> API, that helps us to create unified big-data processing flows. It has been
>>>> used as a core building block of Seznam.cz web crawler data infrastructure
>>>> every since. Its design principles and execution model are very similar to
>>>> Apache Beam.
>>>>
>>>>
>>>> This API was open sourced in 2016, under the name Euphoria API:
>>>>
>>>> https://github.com/seznam/euphoria
>>>>
>>>>
>>>> As it is very similar to Apache Beam, we feel, that it is not worth of
>>>> duplicating effort in terms of development of new runtimes and fine-tuning
>>>> of current ones.
>>>>
>>>>
>>>> The main blocker for us to switch to Apache Beam is lack of the Java 8
>>>> API. *W*e propose the integrati

Re: Euphoria Java 8 DSL - proposal

2018-01-02 Thread David Morávek
gt;> (on shuffle a la hadoop). Also if I remember well the 'time' model of
>>>>> Euphoria was simpler than Beam's. I talk about all of this because I
>>>>> am curious about what parts of the Euphoria model you guys had to
>>>>> sacrifice to support Beam, and what parts of Beam's model should still
>>>>> be integrated into Euphoria (and if there is a straightforward path to
>>>>> do it).
>>>>>
>>>>> If I understand well if this gets merged into Apache this means that
>>>>> Euphoria's current implementation would be superseded by this DSL? I
>>>>> am curious because I would like to understand your level of investment
>>>>> on supporting the future of this DSL.
>>>>>
>>>>> Thanks and congrats again !
>>>>> Ismaël
>>>>>
>>>>> On Mon, Dec 18, 2017 at 10:12 AM, Jean-Baptiste Onofré <
>>>>> j...@nanthrax.net> wrote:
>>>>>
>>>>>> Depending of the donation, you would need ICLA for each contributor,
>>>>>> and
>>>>>> CCLA in addition of SGA.
>>>>>>
>>>>>> We can sync with Davor and I for the legal stuff.
>>>>>> However, I would wait a little bit just to have feedback from the
>>>>>> whole team
>>>>>> and start a formal vote.
>>>>>>
>>>>>> I would be happy to start the formal vote.
>>>>>>
>>>>>> Regards
>>>>>> JB
>>>>>>
>>>>>> On 12/18/2017 10:03 AM, David Morávek wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Thanks for the awesome feedback!
>>>>>>>
>>>>>>> Romain:
>>>>>>>
>>>>>>> We already use Java Stream API in all operators where it makes sense
>>>>>>> (eg.:
>>>>>>> ReduceByKey). Still not sure if it was a good choice, but i can be
>>>>>>> easily
>>>>>>> converted to iterator anyway.
>>>>>>>
>>>>>>> Side outputs support is coming soon, we already made an initial work
>>>>>>> on
>>>>>>> this.
>>>>>>>
>>>>>>> Side inputs are not supported in a way you are used to from beam,
>>>>>>> because
>>>>>>> it can be replaced by Join operator on the same key (if annotated
>>>>>>> with
>>>>>>> broadcastHashJoin, it will be turned into map side join).
>>>>>>>
>>>>>>> Only significant difference from Beam is, that we decided not to
>>>>>>> abstract
>>>>>>> serialization, so we need to add support for Type Hints, because of
>>>>>>> type
>>>>>>> erasure.
>>>>>>>
>>>>>>> Fluent API:
>>>>>>>
>>>>>>> API is fluent within one operator. It is designed to "lead the
>>>>>>> programmer", which means, that he we'll be only offered methods that
>>>>>>> makes
>>>>>>> sense after the last method he used (eg.: in ReduceByKey, we know
>>>>>>> that after
>>>>>>> keyBy either reduceBy method should come). It is implemented as a
>>>>>>> series of
>>>>>>> builders.
>>>>>>>
>>>>>>> Davor:
>>>>>>>
>>>>>>> Thanks, I'll contact you, and will start the process of having all
>>>>>>> the
>>>>>>> necessary paperwork signed on our side, so we can get things moving.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 18, 2017 at 7:46 AM, Romain Manni-Bucau <
>>>>>>> rmannibu...@gmail.com
>>>>>>> <mailto:rmannibu...@gmail.com>> wrote:
>>>>>>>
>>>>>>>  Hi guys
>>>>>>>
>>>>>>>  A DSL would be very welcomed,

Re: Euphoria Java 8 DSL - proposal

2018-01-02 Thread David Morávek
Hello JB,

Perfect! I'm already on the Beam Slack workspace, I'll contact you once I
get to the office.

Thanks!
D.

On Wed, Jan 3, 2018 at 6:19 AM, Jean-Baptiste Onofré 
wrote:

> Hi David,
>
> absolutely !! Let's move forward on the preparation steps.
>
> Are you on Slack and/or hangout to plan this ?
>
> Thanks,
> Regards
> JB
>
> On 01/02/2018 05:35 PM, David Morávek wrote:
>
>> Hello JB,
>>
>> can we help in any way to move things forward?
>>
>> Thanks,
>> D.
>>
>> On Mon, Dec 18, 2017 at 4:28 PM, Jean-Baptiste Onofré > <mailto:j...@nanthrax.net>> wrote:
>>
>> Thanks Jan,
>>
>> It makes sense.
>>
>> Let me take a look on the code to understand the "interaction".
>>
>> Regards
>> JB
>>
>>
>> On 12/18/2017 04:26 PM, Jan Lukavský wrote:
>>
>> Hi JB,
>>
>> basically you are not wrong. The project started about three or
>> four
>> years ago with a goal to unify batch and streaming processing into
>> single portable, executor independent API. Because of that, it is
>> currently "close" to Beam in this sense. But we don't see much
>> added
>> value keeping this as a separate project, with one of the key
>> differences to be the API (not the model itself), so we would
>> like to
>> focus on translation from Euphoria API to Beam's SDK. That's why
>> we
>> would like to see it as a DSL, so that it would be possible to use
>> Euphoria API with Beam's runners as much natively as possible.
>>
>> I hope I didn't make the subject even more unclear, if so, I'll
>> be happy
>> to explain anything in more detail. :-)
>>
>> Jan
>>
>>
>> On 12/18/2017 04:08 PM, Jean-Baptiste Onofré wrote:
>>
>> Hi Jan,
>>
>> Thanks for your answers.
>>
>> However, they confused me ;)
>>
>> Regarding what you replied, Euphoria seems like a programming
>> model/SDK "close" to Beam more than a DSL on top of an
>> existing Beam
>> SDK.
>>
>> Am I wrong ?
>>
>> Regards
>> JB
>>
>> On 12/18/2017 03:44 PM, Jan Lukavský wrote:
>>
>> Hi Ismael,
>>
>> basically we adopted the Beam's design regarding
>> partitioning
>> (https://github.com/seznam/euphoria/issues/160
>> <https://github.com/seznam/euphoria/issues/160>) and
>> implemented
>> the sorting manually
>> (https://github.com/seznam/euphoria/issues/158
>> <https://github.com/seznam/euphoria/issues/158>). I'm
>> not aware
>> of the time model differences (Euphoria supports
>> ingestion and
>> event time, we don't support processing time by decision).
>> Regarding other differences (looking into Beam capability
>> matrix, I'd say that):
>>
>>- we don't support stateful FlatMap (i.e. ParDo) for
>> now
>> (https://github.com/seznam/euphoria/issues/192
>> <https://github.com/seznam/euphoria/issues/192>)
>>
>>- we don't support side inputs (by decision now, but
>> might be
>> reconsidered) and outputs
>> (https://github.com/seznam/euphoria/issues/124
>> <https://github.com/seznam/euphoria/issues/124>)
>>
>>
>>- we support complete event-time windows (non-merging,
>> merging, aligned, unaligned) and time control
>>
>>- we don't support processing time by decision (might
>> be
>> reconsidered if a valid use-case is found)
>>
>>- we support window triggering based on both time and
>> data,
>> including discarding and accumulating (without
>> accumulating &
>> retracting)
>>
>> All our executors (runners) - Flink, Spark and Local -
>> implement
>> the complete model, which we enforce using "operator test
>> kit"
>> that all executors must pas

Re: Euphoria Java 8 DSL - proposal

2018-02-27 Thread David Morávek
Hi Davor,

sorry for the delay, we were blocked by our legal department. I've send
both SGA and CCLA to priv...@apache.beam.org, please let me know if you
need anything else.

Regards,
David

On Mon, Feb 19, 2018 at 6:13 AM, Jean-Baptiste Onofré 
wrote:

> Hi Davor,
>
> We still have some discussion/paperwork on Euphoria side (SGA, ...).
>
> So, it's on track but it takes a little more time than expected.
>
> Regards
> JB
>
> On 02/19/2018 05:40 AM, Davor Bonaci wrote:
> > I may have missed things, but any update on the progress of this
> donation?
> >
> > On Tue, Jan 2, 2018 at 10:52 PM, Jean-Baptiste Onofré  > <mailto:j...@nanthrax.net>> wrote:
> >
> > Great !
> >
> > Thanks !
> > Regards
> > JB
> >
> > On 01/03/2018 07:29 AM, David Morávek wrote:
> >
> > Hello JB,
> >
> > Perfect! I'm already on the Beam Slack workspace, I'll contact
> you once
> > I get to the office.
> >
> > Thanks!
> > D.
> >
> > On Wed, Jan 3, 2018 at 6:19 AM, Jean-Baptiste Onofré <
> j...@nanthrax.net
> > <mailto:j...@nanthrax.net> <mailto:j...@nanthrax.net
> > <mailto:j...@nanthrax.net>>> wrote:
> >
> > Hi David,
> >
> >         absolutely !! Let's move forward on the preparation steps.
> >
> > Are you on Slack and/or hangout to plan this ?
> >
> > Thanks,
> > Regards
> > JB
> >
> > On 01/02/2018 05:35 PM, David Morávek wrote:
> >
> > Hello JB,
> >
> > can we help in any way to move things forward?
> >
> > Thanks,
> > D.
> >
> > On Mon, Dec 18, 2017 at 4:28 PM, Jean-Baptiste Onofré
> > mailto:j...@nanthrax.net>
> > <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>>
> > <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>
> > <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>>>>
> wrote:
> >
> >  Thanks Jan,
> >
> >  It makes sense.
> >
> >  Let me take a look on the code to understand the
> "interaction".
> >
> >  Regards
> >  JB
> >
> >
> >  On 12/18/2017 04:26 PM, Jan Lukavský wrote:
> >
> >  Hi JB,
> >
> >  basically you are not wrong. The project
> started about
> > three or
> > four
> >  years ago with a goal to unify batch and
> streaming
> > processing into
> >  single portable, executor independent API.
> Because of
> > that, it is
> >  currently "close" to Beam in this sense. But we
> don't
> > see much
> > added
> >  value keeping this as a separate project, with
> one of
> > the key
> >  differences to be the API (not the model
> itself), so we
> > would
> > like to
> >  focus on translation from Euphoria API to
> Beam's SDK.
> > That's why we
> >  would like to see it as a DSL, so that it would
> be
> > possible to use
> >  Euphoria API with Beam's runners as much
> natively as
> > possible.
> >
> >  I hope I didn't make the subject even more
> unclear, if
> > so, I'll
> > be happy
> >  to explain anything in more detail. :-)
> >
> >  Jan
> >
> >
> >  On 12/18/2017 04:08 PM, Jean-Baptiste Onofré
> wrote:
> >
> >  Hi Jan,
> >
> >  Thanks for your answers.
> >
> >  However, they confused me ;)
> >
> >  Regarding what you replied, Euphoria seems
> like a
> > programming
> >  model/SDK "close" to Beam more than a DSL
> on top of an
> > 

Re: Survey: what is everyone working on that you want to share?

2018-05-15 Thread David Morávek
Hi Kenn,

Java 8 DSL

JIRA:

dsl-euphoria

/ BEAM-3900 

Feature branch: dsl-euphoria


Contact: David Moravek 

Description: Java 8 wrapper over the Beam Java SDK, based on Euphoria API
 project.


Thanks,

David



On Tue, May 15, 2018 at 7:27 AM, Kenneth Knowles  wrote:

> Hi all,
>
> TL;DR: for anyone who is willing & able to answer: What big-picture thing
> are you working on? I want to highlight it here:
>
> https://beam.apache.org/contribute/#works-in-progress
>
> (I want each of these to have a nice description, too! And a saved JIRA
> search for starter tasks would be clever...)
>
>  context 
>
> Following feedback from the Beam summit and other discussions, I've been
> working with Melissa and looping in folks to revamp the website to have a
> more welcoming tone and to make it easier for newcomers to get started.
>
> Part of that is making the site and guide more concise and making the
> entry points more prominent and interesting. Starter tasks are OK, but to
> get a lasting engagement the idea is for starter tasks connect a newcomer
> with ongoing projects. Most importantly, they are more likely to have
> exciting interactions with experienced contributors, and to have follow-up
> work to the starter task.
>
> Kenn
>


Re: Survey: what is everyone working on that you want to share?

2018-05-28 Thread David Morávek
here you go: https://github.com/apache/beam-site/pull/453

Thanks,
D.

On Tue, May 15, 2018 at 11:51 PM, Kenneth Knowles  wrote:

> I wanted to bring back to this thread that, yes, I would most like pull
> requests :-)
>
> And please don't feel like you have to follow the pattern already there.
> It would be better if each project had a sentence describing it (be
> concise, still) and the whole content fit right in the page.
>
> On Tue, May 15, 2018 at 2:48 PM Valentyn Tymofieiev 
> wrote:
>
>> Hi Kenn,
>>
>> I sent https://github.com/apache/beam-site/pull/441 to cover efforts
>> related to Python 3 support in Beam.
>>
>> Thanks,
>> Valentyn
>>
>> On Tue, May 15, 2018 at 10:27 AM, David Morávek 
>> wrote:
>>
>>> Hi Kenn,
>>>
>>> Java 8 DSL
>>>
>>> JIRA:
>>> <https://issues.apache.org/jira/issues/?jql=project%20%3D%20BEAM%20AND%20component%20%3D%20sdk-go>
>>> dsl-euphoria
>>> <https://issues.apache.org/jira/issues/?jql=project%20%3D%20BEAM%20AND%20component%20%3D%20dsl-euphoria>
>>> / BEAM-3900 <https://issues.apache.org/jira/browse/BEAM-3900>
>>>
>>> Feature branch: dsl-euphoria
>>> <https://github.com/apache/beam/tree/dsl-euphoria>
>>>
>>> Contact: David Moravek 
>>>
>>> Description: Java 8 wrapper over the Beam Java SDK, based on Euphoria
>>> API <https://github.com/seznam/euphoria> project.
>>>
>>>
>>> Thanks,
>>>
>>> David
>>>
>>>
>>>
>>> On Tue, May 15, 2018 at 7:27 AM, Kenneth Knowles  wrote:
>>>
>>>> Hi all,
>>>>
>>>> TL;DR: for anyone who is willing & able to answer: What big-picture
>>>> thing are you working on? I want to highlight it here:
>>>>
>>>> https://beam.apache.org/contribute/#works-in-progress
>>>>
>>>> (I want each of these to have a nice description, too! And a saved JIRA
>>>> search for starter tasks would be clever...)
>>>>
>>>>  context 
>>>>
>>>> Following feedback from the Beam summit and other discussions, I've
>>>> been working with Melissa and looping in folks to revamp the website to
>>>> have a more welcoming tone and to make it easier for newcomers to get
>>>> started.
>>>>
>>>> Part of that is making the site and guide more concise and making the
>>>> entry points more prominent and interesting. Starter tasks are OK, but to
>>>> get a lasting engagement the idea is for starter tasks connect a newcomer
>>>> with ongoing projects. Most importantly, they are more likely to have
>>>> exciting interactions with experienced contributors, and to have follow-up
>>>> work to the starter task.
>>>>
>>>> Kenn
>>>>
>>>
>>>
>>


Re: GroupByKey with sorted values within key

2018-05-30 Thread David Morávek
Thanks for pointing us the right direction. We'll try to prototype custom
translation for Spark runner within next sprint. In order to do so, I have
few questions:

1) Should we move SortValues tranform to beam-sdks-java-core or just add it
as spark runner dependency?
2) I think we should try to make SortValues more flexible by letting user
to provide custom value comparator, sorting lexicographically by secondary
key may be painful in some use cases. What do you think?

side note:
I agree that usually top n values, that fit in memory are sufficient and we
can combine them using PQ, but in practice we still have pipelines that
need to do top N selection over data that do not fit in memory for a single
key.

D.

On Wed, May 30, 2018 at 5:28 PM, Lukasz Cwik  wrote:

> SortValues does not have a defined & documented URN yet. Once a Runner is
> providing such an override, it will happen. No runner publicly provides one
> to my knowledge.
>
> On Wed, May 30, 2018 at 8:08 AM Kenneth Knowles  wrote:
>
>> I can see a few usability issues here. Totally agree w/ Luke, just noting:
>>
>>  - The naming is slightly misleading because SortValues is actually
>> already GBK+SortValues.
>>  - It also makes things look less supported when they are in the
>> extensions/ folder. I'd say we should have a better place to put such a
>> library if it is the official public implementation. The word "extensions"
>> doesn't seem particularly accurate or meaningful to me.
>>
>> Q: Does SortValues have a defined & documented URN yet?
>>
>> Kenn
>>
>> On Wed, May 30, 2018 at 7:52 AM Lukasz Cwik  wrote:
>>
>>> Each runner can choose to override the SortValues PTransform with their
>>> own internal offering. For example Spark overrides global combine[1] during
>>> pipeline translation. If Spark detected the SortValues PTransform during
>>> translation, it could override the offering with something that used
>>> repartitionAndSortWithinPartitions.
>>>
>>> GroupByKeyAndSortValuesOnly inside Dataflow exists to support a specific
>>> use case. Users should rely on SortValues as it is the public
>>> implementation for sorting.
>>>
>>> 1: https://github.com/apache/beam/blob/85dcab56268fbac923ffd5885489ee
>>> 154f097fc5/runners/spark/src/main/java/org/apache/beam/
>>> runners/spark/translation/TransformTranslator.java#L200
>>>
>>> As a side note, its uncommon where you need to sort all values, usually
>>> top 100 suffices and can be implemented much more efficiently with a
>>> combiner when compared to sorting.
>>>
>>> On Wed, May 30, 2018 at 3:38 AM  wrote:
>>>
 Hi,
  I have question I am trying to do translation in dsl-euphoria for
 “GroupByKey with sorted values within key” to Beam. I am aware of java sdk
 extensions SortValues, but it doesn’t have sufficient abstraction for
 runners.

 I noticed that in DataflowRunner there is translation of batch
 GroupByKey to GroupByKeyAndSortValuesOnly but is it considered to have it
 in beam core so for example SparkRunner could translate “GroupByKey with
 sorted values within key” with their internals such as
 repartitionAndSortWithinPartitions.

 Thank you.
 Marek Simunek

>>>


Re: [DISCUSS] Unification of Hadoop related IO modules

2018-09-07 Thread David Morávek
+1 for option 3 as it should be the least painful option for the current users

D.

Sent from my iPhone

> On 7 Sep 2018, at 19:50, Tim  wrote:
> 
> Another +1 for option 3 (and preference of HadoopFormatIO naming).
> 
> Thanks Alexey,
> 
> Tim
> 
> 
>> On 7 Sep 2018, at 19:13, Andrew Pilloud  wrote:
>> 
>> +1 for option 3. That approach will keep the mapping clean if SQL supports 
>> this IO. It would be good to put the proxy in the old module and move the 
>> implementation now. That way the old module can be easily deleted when the 
>> time comes.
>> 
>> Andrew
>> 
>>> On Fri, Sep 7, 2018 at 6:15 AM Robert Bradshaw  wrote:
>>> OK, good, that's what I thought. So I stick by (3) which
>>> 
>>> 1) Cleans up the library for all future uses (hopefully the majority of all 
>>> users :). 
>>> 2) Is fully backwards compatible for existing users, minimizing disruption, 
>>> and giving them time to migrate. 
>>> 
 On Fri, Sep 7, 2018 at 2:51 PM Alexey Romanenko  
 wrote:
 In next release it will be still compatible because we keep module 
 “hadoop-input-format” but we make it deprecated and propose to use it 
 through module “hadoop-format” and proxy class HadoopFormatIO (or 
 HadoopMapReduceFormatIO, whatever we name it) which will provide 
 Write/Read functionality by using MapReduce InputFormat or OutputFormat 
 classes. 
 Then, in future releases after next one, we can drop “hadoop-input-format” 
  since it was deprecated and we provided a time to move to new API. I 
 think this is less painful way for user but most complicated for us if the 
 final goal it to merge “hadoop-input-format” and “hadoop-output-format” 
 together.
 
> On 7 Sep 2018, at 13:45, Robert Bradshaw  wrote:
> 
> Agree about not impacting users. Perhaps I misread (3), isn't it fully 
> backwards compatible as well? 
> 
> On Fri, Sep 7, 2018 at 1:33 PM Jean-Baptiste Onofré  
> wrote:
>> Hi,
>> 
>> in order to limit the impact for the existing users on Beam 2.x series,
>> I would go for (1).
>> 
>> Regards
>> JB
>> 
>> On 06/09/2018 17:24, Alexey Romanenko wrote:
>> > Hello everyone,
>> > 
>> > I’d like to discuss the following topic (see below) with community 
>> > since
>> > the optimal solution is not clear for me.
>> > 
>> > There is Java IO module, called “/hadoop-input-format/”, which allows 
>> > to
>> > use MapReduce InputFormat implementations to read data from different
>> > sources (for example, 
>> > org.apache.hadoop.mapreduce.lib.db.DBInputFormat).
>> > According to its name, it has only “Read" and it's missing “Write” 
>> > part,
>> > so, I'm working on “/hadoop-output-format/” to support MapReduce
>> > OutputFormat (PR 6306 ). For
>> > this I created another module with this name. So, in the end, we will
>> > have two different modules “/hadoop-input-format/” and
>> > “/hadoop-output-format/” and it looks quite strange for me since, 
>> > afaik,
>> > every existed Java IO, that we have, incapsulates Read and Write parts
>> > into one module. Additionally, we have “/hadoop-common/” and
>> > /“hadoop-file-system/” as other hadoop-related modules. 
>> > 
>> > Now I’m thinking how it will be better to organise all these Hadoop
>> > modules better. There are several options in my mind: 
>> > 
>> > 1) Add new module “/hadoop-output-format/” and leave all Hadoop modules
>> > “as it is”. 
>> > Pros: no breaking changes, no additional work 
>> > Cons: not logical for users to have the same IO in two different 
>> > modules
>> > and with different names.
>> > 
>> > 2) Merge “/hadoop-input-format/” and “/hadoop-output-format/” into one
>> > module called, say, “/hadoop-format/” or “/hadoop-mapreduce-format/”,
>> > keep the other Hadoop modules “as it is”.
>> > Pros: to have InputFormat/OutputFormat in one IO module which is 
>> > logical
>> > for users
>> > Cons: breaking changes for user code because of module/IO renaming 
>> > 
>> > 3) Add new module “/hadoop-format/” (or “/hadoop-mapreduce-format/”)
>> > which will include new “write” functionality and be a proxy for old
>> > “/hadoop-input-format/”. In its turn, “/hadoop-input-format/” should
>> > become deprecated and be finally moved to common “/hadoop-format/”
>> > module in future releases. Keep the other Hadoop modules “as it is”.
>> > Pros: finally it will be only one module for hadoop MR format; changes
>> > are less painful for user
>> > Cons: hidden difficulties of implementation this strategy; a bit
>> > confusing for user 
>> > 
>> > 4) Add new module “/hadoop/” and move all already existed modules there
>> > as submodules (like we have for “/io/google-cloud-platform/”), merge
>> > “/hadoop-input-form

[PROPOSAL] (BEAM-5309) Support for streaming HadoopOutputFormatIO

2018-09-13 Thread David Morávek
Hello Beamers,

my team is currently starting to work on streaming support for the
HadoopOutputFormatIO introduced by Alexey. I've tried to summarize my
thoughts about the implementation details in the short design document.

It would be great, if the document can get a community review as there are
some unclear parts, that need to be solved and to validate the approach.

Thanks,
D.


Re: [PROPOSAL] (BEAM-5309) Support for streaming HadoopOutputFormatIO

2018-09-13 Thread David Morávek
sorry, document link is here: https://s.apache.org/beam-streaming-hofio :)

On Thu, Sep 13, 2018 at 4:47 PM David Morávek 
wrote:

> Hello Beamers,
>
> my team is currently starting to work on streaming support for the
> HadoopOutputFormatIO introduced by Alexey. I've tried to summarize my
> thoughts about the implementation details in the short design document.
>
> It would be great, if the document can get a community review as there are
> some unclear parts, that need to be solved and to validate the approach.
>
> Thanks,
> D.
>


SparkRunner - GroupByKey

2018-09-14 Thread David Morávek
Hello,

currently, we are trying to move one of our large scale batch jobs (~100TB
inputs) from our Euphoria  based
SparkRunner to Beam's Spark runner and we came across the following issue.

Because we rely on hadoop ecosystem, we need to group outputs by
TaskAttemptID, in order to use OutputFormats based on FileOutputFormat.

We do this by using *GroupByKey*, but we came across the known problem,
that all values for any single key need to fit in-memory at once.

I did a quick research and I think that following needs to be addressed:
a) We can not use Spark's *groupByKey*, because it requires all values to
fit in memory for a single key (it is implemented as "list combiner")
b) *ReduceFnRunner* iterates over values multiple times in order to group
also by window

In Euphoria based runner, we solved this for *non-merging* windowing by
using Spark's *repartitionAndSortWithinPartitions*, where we sorted output
by key and window, so the output could be processed sequentially.

Did anyone run into the same issue? Is there currently any workaround for
this? How should we approach this?

Thanks,
David


Re: SparkRunner - GroupByKey

2018-09-14 Thread David Morávek
Hello Robert,

thanks for the answer! Spark allows us to sort the single partition (after
repartition), by user provided comparator, so it is definitely possible to
do secondary sort by timestamp. The "more intelligent ReduceFnRunner" you
are talking about, is it part of Beam codebase already (I guess it would
lower the contribution investment, if we'd try to fix this)?

This would definitely work for our use case (we used exact same approach in
our custom SparkRunner).

Although, there is one think to consider. This approach would solve the
scaling issue, but it would be probably less effective for the smaller
scale. I think this could be solved by providing support for multiple
translators for a single operator and let user "hint" the translation layer
to decide which one to use. What do you think?

Thanks,
D.

On Fri, Sep 14, 2018 at 4:10 PM Robert Bradshaw  wrote:

>
> If Spark supports producing grouped elements in timestamp order, a more
> intelligent ReduceFnRunner can be used. (We take advantage of that in
> Dataflow for example.)
>
> For non-merging windows, you could also put the window itself (or some
> subset thereof) into the key resulting in smaller groupings. I'm not sure I
> understand your output requirements enough to know if this would work.
>
> On Fri, Sep 14, 2018 at 3:28 PM David Morávek 
> wrote:
>
>> Hello,
>>
>> currently, we are trying to move one of our large scale batch jobs
>> (~100TB inputs) from our Euphoria <http://github.com/seznam/euphoria>
>> based SparkRunner to Beam's Spark runner and we came across the following
>> issue.
>>
>> Because we rely on hadoop ecosystem, we need to group outputs by
>> TaskAttemptID, in order to use OutputFormats based on FileOutputFormat.
>>
>> We do this by using *GroupByKey*, but we came across the known problem,
>> that all values for any single key need to fit in-memory at once.
>>
>> I did a quick research and I think that following needs to be addressed:
>> a) We can not use Spark's *groupByKey*, because it requires all values
>> to fit in memory for a single key (it is implemented as "list combiner")
>> b) *ReduceFnRunner* iterates over values multiple times in order to
>> group also by window
>>
>
>> In Euphoria based runner, we solved this for *non-merging* windowing by
>> using Spark's *repartitionAndSortWithinPartitions*, where we sorted
>> output by key and window, so the output could be processed sequentially.
>>
>> Did anyone run into the same issue? Is there currently any workaround for
>> this? How should we approach this?
>>
>> Thanks,
>> David
>>
>>


Re: [VOTE] Donating the Dataflow Worker code to Apache Beam

2018-09-14 Thread David Morávek
+1



> On 15 Sep 2018, at 00:59, Anton Kedin  wrote:
> 
> +1
> 
>> On Fri, Sep 14, 2018 at 3:22 PM Alan Myrvold  wrote:
>> +1
>> 
>>> On Fri, Sep 14, 2018 at 3:16 PM Boyuan Zhang  wrote:
>>> +1
>>> 
 On Fri, Sep 14, 2018 at 3:15 PM Henning Rohde  wrote:
 +1
 
> On Fri, Sep 14, 2018 at 2:40 PM Ahmet Altay  wrote:
> +1 (binding)
> 
>> On Fri, Sep 14, 2018 at 2:35 PM, Lukasz Cwik  wrote:
>> +1 (binding)
>> 
>>> On Fri, Sep 14, 2018 at 2:34 PM Pablo Estrada  
>>> wrote:
>>> +1
>>> 
 On Fri, Sep 14, 2018 at 2:32 PM Andrew Pilloud  
 wrote:
 +1
 
> On Fri, Sep 14, 2018 at 2:31 PM Lukasz Cwik  wrote:
> There was generally positive support and good feedback[1] but it was 
> not unanimous. I wanted to bring the donation of the Dataflow worker 
> code base to Apache Beam master to a vote.
> 
> +1: Support having the Dataflow worker code as part of Apache Beam 
> master branch
> -1: Dataflow worker code should live elsewhere
> 
> 1: 
> https://lists.apache.org/thread.html/89efd3bc1d30f3d43d4b361a5ee05bd52778c9dc3f43ac72354c2bd9@%3Cdev.beam.apache.org%3E
> 


Re: SparkRunner - GroupByKey

2018-09-17 Thread David Morávek
Thanks! I've created BEAM-5392
<https://issues.apache.org/jira/browse/BEAM-5392> to track the issue.

On Fri, Sep 14, 2018 at 4:46 PM Robert Bradshaw  wrote:

> On Fri, Sep 14, 2018 at 4:22 PM David Morávek 
> wrote:
>
>> Hello Robert,
>>
>> thanks for the answer! Spark allows us to sort the single partition
>> (after repartition), by user provided comparator, so it is definitely
>> possible to do secondary sort by timestamp. The "more intelligent
>> ReduceFnRunner" you are talking about, is it part of Beam codebase already
>> (I guess it would lower the contribution investment, if we'd try to fix
>> this)?
>>
>
> No, but it is part of the dataflow worker codebase we're trying to donate
> (being discussed on the other thread on this very list today), so hopefully
> soon.
>
> This would definitely work for our use case (we used exact same approach
>> in our custom SparkRunner).
>>
>> Although, there is one think to consider. This approach would solve the
>> scaling issue, but it would be probably less effective for the smaller
>> scale. I think this could be solved by providing support for multiple
>> translators for a single operator and let user "hint" the translation layer
>> to decide which one to use. What do you think?
>>
>
> If at all possible, I generally prefer to avoid providing hints like this
> that the user needs to use to get decent performance as it simply doesn't
> scale (in many directions). Fortunately in this case, though you have to be
> a bit more careful about things, it is not less efficient.
>
> On Fri, Sep 14, 2018 at 4:10 PM Robert Bradshaw 
>> wrote:
>>
>>>
>>> If Spark supports producing grouped elements in timestamp order, a more
>>> intelligent ReduceFnRunner can be used. (We take advantage of that in
>>> Dataflow for example.)
>>>
>>> For non-merging windows, you could also put the window itself (or some
>>> subset thereof) into the key resulting in smaller groupings. I'm not sure I
>>> understand your output requirements enough to know if this would work.
>>>
>>> On Fri, Sep 14, 2018 at 3:28 PM David Morávek 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> currently, we are trying to move one of our large scale batch jobs
>>>> (~100TB inputs) from our Euphoria <http://github.com/seznam/euphoria>
>>>> based SparkRunner to Beam's Spark runner and we came across the following
>>>> issue.
>>>>
>>>> Because we rely on hadoop ecosystem, we need to group outputs by
>>>> TaskAttemptID, in order to use OutputFormats based on FileOutputFormat.
>>>>
>>>> We do this by using *GroupByKey*, but we came across the known
>>>> problem, that all values for any single key need to fit in-memory at once.
>>>>
>>>> I did a quick research and I think that following needs to be addressed:
>>>> a) We can not use Spark's *groupByKey*, because it requires all values
>>>> to fit in memory for a single key (it is implemented as "list combiner")
>>>> b) *ReduceFnRunner* iterates over values multiple times in order to
>>>> group also by window
>>>>
>>>
>>>> In Euphoria based runner, we solved this for *non-merging* windowing
>>>> by using Spark's *repartitionAndSortWithinPartitions*, where we sorted
>>>> output by key and window, so the output could be processed sequentially.
>>>>
>>>> Did anyone run into the same issue? Is there currently any workaround
>>>> for this? How should we approach this?
>>>>
>>>> Thanks,
>>>> David
>>>>
>>>>


Re: [Proposal] Euphoria DSL - looking for reviewers

2018-10-10 Thread David Morávek
Hello Max,

It would be great if you can do more of a "general" review, the code base
is fairly large, well tested and it was already reviewed internally by
several people.

We would like to have the overall approach and design decisions validated
by the community and get some inputs on what could be improved and if we
are headed the right direction.

Thanks,
David

On Wed, Oct 10, 2018 at 2:21 PM Maximilian Michels  wrote:

> That is a huge PR! :) Euphoria looks great. Especially for people coming
> from Flink/Spark. I'll check out the documentation.
>
> Do you have any specific code parts which you want to have reviewed?
>
> Thanks,
> Max
>
> On 10.10.18 10:30, Jean-Baptiste Onofré wrote:
> > Hi,
> >
> > Thanks for all the work you are doing on this DSL !
> >
> > I tried to follow the features branch for a while. I'm still committed
> > to  move forward on that front,  but more reviewers would be great.
> >
> > Regards
> > JB
> >
> > On 10/10/2018 10:26, Plajt, Vaclav wrote:
> >> Hello Beam devs,
> >> we finished our main goals in development of Euphoria DSL. It is Easy to
> >> use Java 8 API build on top of the Beam's Java SDK. API provides a
> >> high-level abstraction of data transformations, with focus on the Java 8
> >> language features (e.g. lambdas and streams). It is fully inter-operable
> >> with existing Beam SDK and convertible back and forth. It allows fast
> >> prototyping through use of (optional) Kryo based coders and can be
> >> seamlessly integrated into existing Beam Pipelines.
> >>
> >> Now we believe that it is the time to start discussion about it with the
> >> community. Which will hopefully lead to vote about adapting it into
> >> Apache Beam project. Most of main ideas and development goals were
> >> presented in Beam Summit in London [1].
> >>
> >> We are looking for reviewers within the community. Please start with
> >> documentation [2] or design document [3]. Our contribution is divided to
> >> two modules: `org.apache.beam:beam-sdks-java-extensions-euphoria` and
> >> `org.apache.beam:beam-sdks-java-extensions-kryo`. Rest of the code base
> >> remains untouched.
> >> All the checks in MR [5] are passing with exception of "Website
> >> PreCommit". Which seems to be broken, little help here would be
> appreciated.
> >>
> >> Thank you
> >> We are looking forward for your feedback.
> >> {david.moravek,vaclav.plajt,marek.simunek}@firma.seznam.cz
> >>
> >> Resources:
> >> [1] Beam Summit London presentation:
> >>
> https://docs.google.com/presentation/d/1SagpmzJ-tUQki5VsQOEEEUyi_LXRJdG_3OBLdjBKoh4/edit?usp=sharing
> >> [2] Documentation:
> >>
> https://github.com/seznam/beam/blob/dsl-euphoria/website/src/documentation/sdks/euphoria.md
> >> [3] Design Document: https://s.apache.org/beam-euphoria
> >> [4] ASF Jira Issue: https://issues.apache.org/jira/browse/BEAM-3900
> >> [5] Pull Request: https://github.com/apache/beam/pull/6601
> >> [6] Original proposal:
> >>
> http://mail-archives.apache.org/mod_mbox/beam-dev/201712.mbox/%3ccajjqkhnrp1z8atteogmpfkqxrcjeanb3ykowvvtnwyrvv_-...@mail.gmail.com%3e
> >>
> >>
> >>
> >> Je dobré vědět, že tento e-mail a přílohy jsou důvěrné. Pokud spolu
> >> jednáme o uzavření obchodu, vyhrazujeme si právo naše jednání kdykoli
> >> ukončit. Pro fanoušky právní mluvy - vylučujeme tím ustanovení
> >> občanského zákoníku o předsmluvní odpovědnosti. Pravidla o tom, kdo u
> >> nás a jak vystupuje za společnost a kdo může co a jak podepsat naleznete
> >> zde 
> >>
> >> You should know that this e-mail and its attachments are confidential.
> >> If we are negotiating on the conclusion of a transaction, we reserve the
> >> right to terminate the negotiations at any time. For fans of legalese—we
> >> hereby exclude the provisions of the Civil Code on pre-contractual
> >> liability. The rules about who and how may act for the company and what
> >> are the signing procedures can be found here
> >> .
> >
>


Re: [Proposal] Euphoria DSL - looking for reviewers

2018-10-10 Thread David Morávek
Anton:
All of the points are be correct, with one minor exception. We are
currently moving our production workloads from Euphoria
<https://github.com/seznam/euphoria> to Beam (using the DSL), but we are
hitting scalability issues of the current spark runner, so it is not
technically used in production yet. Everything behaves correctly in the
staging environment, where runner can handle the workload.

Kenn:
here is the the IP Clearance document
https://gist.github.com/dmvk/80acb0579f196e18c02a4e280978d445

Thanks,
David

On Wed, Oct 10, 2018 at 11:30 PM Kenneth Knowles  wrote:

> I just glanced through it to make sure things are in the right place and
> build set up right and that all LGTM.
>
> We need to file the IP Clearance to finish the process that Davor started.
> Please fill the XML template at
> http://svn.apache.org/repos/asf/incubator/public/trunk/content/ip-clearance/ip-clearance-template.xml
> then I will review and file it in SVN.
>
> Kenn
>
> On Wed, Oct 10, 2018 at 2:15 PM Anton Kedin  wrote:
>
>> I think the code looks good and we should probably just merge it (unless
>> there are other blockers, e.g. formal approvals), considering:
>>  - it has been reviewed;
>>  - it is tested and used in production;
>>  - it was discussed on the list and there were no objections to having it
>> as part of Beam;
>>  - it is a standalone extension and doesn't interfere with Beam Java SDK,
>> if I didn't miss anything;
>>  - it has people working on it and supporting it;
>>
>> All other issues can probably be sorted out in normal Beam process.
>>
>> Regards,
>> Anton
>>
>> On Wed, Oct 10, 2018 at 5:57 AM David Morávek 
>> wrote:
>>
>>> Hello Max,
>>>
>>> It would be great if you can do more of a "general" review, the code
>>> base is fairly large, well tested and it was already reviewed internally by
>>> several people.
>>>
>>> We would like to have the overall approach and design decisions
>>> validated by the community and get some inputs on what could be improved
>>> and if we are headed the right direction.
>>>
>>> Thanks,
>>> David
>>>
>>> On Wed, Oct 10, 2018 at 2:21 PM Maximilian Michels 
>>> wrote:
>>>
>>>> That is a huge PR! :) Euphoria looks great. Especially for people
>>>> coming
>>>> from Flink/Spark. I'll check out the documentation.
>>>>
>>>> Do you have any specific code parts which you want to have reviewed?
>>>>
>>>> Thanks,
>>>> Max
>>>>
>>>> On 10.10.18 10:30, Jean-Baptiste Onofré wrote:
>>>> > Hi,
>>>> >
>>>> > Thanks for all the work you are doing on this DSL !
>>>> >
>>>> > I tried to follow the features branch for a while. I'm still committed
>>>> > to  move forward on that front,  but more reviewers would be great.
>>>> >
>>>> > Regards
>>>> > JB
>>>> >
>>>> > On 10/10/2018 10:26, Plajt, Vaclav wrote:
>>>> >> Hello Beam devs,
>>>> >> we finished our main goals in development of Euphoria DSL. It is
>>>> Easy to
>>>> >> use Java 8 API build on top of the Beam's Java SDK. API provides a
>>>> >> high-level abstraction of data transformations, with focus on the
>>>> Java 8
>>>> >> language features (e.g. lambdas and streams). It is fully
>>>> inter-operable
>>>> >> with existing Beam SDK and convertible back and forth. It allows fast
>>>> >> prototyping through use of (optional) Kryo based coders and can be
>>>> >> seamlessly integrated into existing Beam Pipelines.
>>>> >>
>>>> >> Now we believe that it is the time to start discussion about it with
>>>> the
>>>> >> community. Which will hopefully lead to vote about adapting it into
>>>> >> Apache Beam project. Most of main ideas and development goals were
>>>> >> presented in Beam Summit in London [1].
>>>> >>
>>>> >> We are looking for reviewers within the community. Please start with
>>>> >> documentation [2] or design document [3]. Our contribution is
>>>> divided to
>>>> >> two modules: `org.apache.beam:beam-sdks-java-extensions-euphoria` and
>>>> >> `org.apache.beam:beam-sdks-java-extensions-kryo`. Rest of the code
>>>> base
>

Re: [Proposal] Euphoria DSL - looking for reviewers

2018-10-14 Thread David Morávek
Thanks Kenn and Reuven!

This brings up the question, how should we proceed with the further
development? Up until now, we did all changes in our own repository, which
was very flexible as we could do code reviews and PR merges by ourselves.

We would love to take a full responsibility for the newly created modules,
because we have put a great effort into their development over the years.

Would it be possible to gain commit rights for these modules, so we could
maintain them without having to bother a committer with each patch or
improvement?

D.

On Sun, Oct 14, 2018 at 10:48 AM Reuven Lax  wrote:

> This is a brand new extension, so I don't think it's necessary for a Beam
> committer to review every line of this before merging. A committer should
> ensure that files are in the correct places, IP clearance is done, etc.,
> and then I  think it's fine to merge.
>
> I do think this code needs to be reviewed in detail, but I think it's
> sufficient to trust the Euphoria authors to do this review themselves. If
> the code has already been peer reviewed between the Euphoria authors, I
> feel like Beam's review before submit policy has been satisfied.
>
> Reuven
>
> On Wed, Oct 10, 2018 at 1:26 AM Plajt, Vaclav <
> vaclav.pl...@firma.seznam.cz> wrote:
>
>> Hello Beam devs,
>> we finished our main goals in development of Euphoria DSL. It is Easy to
>> use Java 8 API build on top of the Beam's Java SDK. API provides a
>> high-level abstraction of data transformations, with focus on the Java 8
>> language features (e.g. lambdas and streams). It is fully inter-operable
>> with existing Beam SDK and convertible back and forth. It allows fast
>> prototyping through use of (optional) Kryo based coders and can be
>> seamlessly integrated into existing Beam Pipelines.
>>
>> Now we believe that it is the time to start discussion about it with the
>> community. Which will hopefully lead to vote about adapting it into Apache
>> Beam project. Most of main ideas and development goals were presented in
>> Beam Summit in London [1].
>>
>> We are looking for reviewers within the community. Please start with
>> documentation [2] or design document [3]. Our contribution is divided to
>> two modules: `org.apache.beam:beam-sdks-java-extensions-euphoria` and
>> `org.apache.beam:beam-sdks-java-extensions-kryo`. Rest of the code base
>> remains untouched.
>> All the checks in MR [5] are passing with exception of "Website
>> PreCommit". Which seems to be broken, little help here would be appreciated.
>>
>> Thank you
>> We are looking forward for your feedback.
>> {david.moravek,vaclav.plajt,marek.simunek}@firma.seznam.cz
>>
>> Resources:
>> [1] Beam Summit London presentation:
>> https://docs.google.com/presentation/d/1SagpmzJ-tUQki5VsQOEEEUyi_LXRJdG_3OBLdjBKoh4/edit?usp=sharing
>> [2] Documentation:
>> https://github.com/seznam/beam/blob/dsl-euphoria/website/src/documentation/sdks/euphoria.md
>> [3] Design Document: https://s.apache.org/beam-euphoria
>> [4] ASF Jira Issue: https://issues.apache.org/jira/browse/BEAM-3900
>> [5] Pull Request: https://github.com/apache/beam/pull/6601
>> [6] Original proposal:
>> http://mail-archives.apache.org/mod_mbox/beam-dev/201712.mbox/%3ccajjqkhnrp1z8atteogmpfkqxrcjeanb3ykowvvtnwyrvv_-...@mail.gmail.com%3e
>>
>>
>>
>> Je dobré vědět, že tento e-mail a přílohy jsou důvěrné. Pokud spolu
>> jednáme o uzavření obchodu, vyhrazujeme si právo naše jednání kdykoli
>> ukončit. Pro fanoušky právní mluvy - vylučujeme tím ustanovení občanského
>> zákoníku o předsmluvní odpovědnosti. Pravidla o tom, kdo u nás a jak
>> vystupuje za společnost a kdo může co a jak podepsat naleznete zde
>> 
>>
>> You should know that this e-mail and its attachments are confidential. If
>> we are negotiating on the conclusion of a transaction, we reserve the right
>> to terminate the negotiations at any time. For fans of legalese—we hereby
>> exclude the provisions of the Civil Code on pre-contractual liability. The
>> rules about who and how may act for the company and what are the signing
>> procedures can be found here
>> .
>>
>


Re: [Call for items] October Beam Newsletter

2018-10-17 Thread David Morávek
Thomas thanks for the reminder! I've added following:
- Euphoria DSL
- HadoopFormatIO
- Beam meetup Prague, Czech Republic

On Wed, Oct 17, 2018 at 4:32 PM Etienne Chauchot 
wrote:

> Hi Rose,
> Thanks for the reminder, I added
> - ongoing RabbitMQ IO
> - and already done Graphite metrics sink.
>
> Etienne
>
> Le mardi 16 octobre 2018 à 09:04 -0700, Thomas Weise a écrit :
>
> Since the newsletter is still pending, we can probably also mention the
> just merged Euphoria Java 8 DSL?
>
> https://issues.apache.org/jira/browse/BEAM-3900
>
>
> On Tue, Oct 16, 2018 at 6:21 AM Maximilian Michels  wrote:
>
> Hi Rose,
>
> A bit late but since the newsletter does not seem to be out yet, I added
> some items for the Portable Flink Runner.
>
> Cheers,
> Max
>
> On 08.10.18 18:59, Rose Nguyen wrote:
> > Hi Beamers:
> >
> > So much has been going on that it's time to sync up again in the October
> > Beam Newsletter [1]! :)
> >
> > *Add the highlights from September to now (or planned events and talks)
> > that you want to share with the community by 10/14 11:59 p.m. **PDT.*
> > *
> > *
> > We will collect the notes via Google docs but send out the final version
> > directly to the user mailing list. If you do not know how to format
> > something, it is OK to just put down the info and I will edit. I'll ship
> > out the newsletter on 10/15.
> >
> > [1]
> >
> https://docs.google.com/document/d/1KWk-pgq0_UR8PrJFstuRPb-dYtW4WspBwMJhfEPGYIM
> >
> > --
> > Rose Thị Nguyễn
>
>


Re: [Proposal] Euphoria DSL - looking for reviewers

2018-10-17 Thread David Morávek
That's great news. Thank you!

D.

On Tue, Oct 16, 2018 at 6:06 PM Thomas Weise  wrote:

> Congrats to the Euphoria team!
>
> On Tue, Oct 16, 2018 at 8:51 AM Kenneth Knowles  wrote:
>
>> Merged. Welcome to the repo :-)
>>
>> Kenn
>>
>> On Thu, Oct 11, 2018 at 10:06 AM Kenneth Knowles  wrote:
>>
>>> I've filed the IP Clearance. I'll report back here.
>>>
>>> Kenn
>>>
>>> On Wed, Oct 10, 2018 at 3:33 PM David Morávek 
>>> wrote:
>>>
>>>>
>>>>
>>>> Anton:
>>>> All of the points are be correct, with one minor exception. We are
>>>> currently moving our production workloads from Euphoria
>>>> <https://github.com/seznam/euphoria> to Beam (using the DSL), but we
>>>> are hitting scalability issues of the current spark runner, so it is not
>>>> technically used in production yet. Everything behaves correctly in the
>>>> staging environment, where runner can handle the workload.
>>>>
>>>> Kenn:
>>>> here is the the IP Clearance document
>>>> https://gist.github.com/dmvk/80acb0579f196e18c02a4e280978d445
>>>>
>>>> Thanks,
>>>> David
>>>>
>>>> On Wed, Oct 10, 2018 at 11:30 PM Kenneth Knowles 
>>>> wrote:
>>>>
>>>>> I just glanced through it to make sure things are in the right place
>>>>> and build set up right and that all LGTM.
>>>>>
>>>>> We need to file the IP Clearance to finish the process that Davor
>>>>> started. Please fill the XML template at
>>>>> http://svn.apache.org/repos/asf/incubator/public/trunk/content/ip-clearance/ip-clearance-template.xml
>>>>> then I will review and file it in SVN.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Oct 10, 2018 at 2:15 PM Anton Kedin  wrote:
>>>>>
>>>>>> I think the code looks good and we should probably just merge it
>>>>>> (unless there are other blockers, e.g. formal approvals), considering:
>>>>>>  - it has been reviewed;
>>>>>>  - it is tested and used in production;
>>>>>>  - it was discussed on the list and there were no objections to
>>>>>> having it as part of Beam;
>>>>>>  - it is a standalone extension and doesn't interfere with Beam Java
>>>>>> SDK, if I didn't miss anything;
>>>>>>  - it has people working on it and supporting it;
>>>>>>
>>>>>> All other issues can probably be sorted out in normal Beam process.
>>>>>>
>>>>>> Regards,
>>>>>> Anton
>>>>>>
>>>>>> On Wed, Oct 10, 2018 at 5:57 AM David Morávek <
>>>>>> david.mora...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello Max,
>>>>>>>
>>>>>>> It would be great if you can do more of a "general" review, the code
>>>>>>> base is fairly large, well tested and it was already reviewed 
>>>>>>> internally by
>>>>>>> several people.
>>>>>>>
>>>>>>> We would like to have the overall approach and design decisions
>>>>>>> validated by the community and get some inputs on what could be improved
>>>>>>> and if we are headed the right direction.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> David
>>>>>>>
>>>>>>> On Wed, Oct 10, 2018 at 2:21 PM Maximilian Michels 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> That is a huge PR! :) Euphoria looks great. Especially for people
>>>>>>>> coming
>>>>>>>> from Flink/Spark. I'll check out the documentation.
>>>>>>>>
>>>>>>>> Do you have any specific code parts which you want to have reviewed?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Max
>>>>>>>>
>>>>>>>> On 10.10.18 10:30, Jean-Baptiste Onofré wrote:
>>>>>>>> > Hi,
>>>>>>>> >
>>>>>>>> > Thanks for all the work you are doing on this DSL !
>>>>>>>> >
>>>>>>>>

[PROPOSAL] Move sorting to sdks-java-core

2018-10-17 Thread David Morávek
Hello,

I want to summarize my thoughts on the per key value sorting.

Currently we have a separate module for sorting extension. The extension
contains *SortValues* transformation and implementations of different
sorters.

Performance-wise it would be great to be able* to delegate sorting to a
runner* if it supports sort based shuffle. In order to do so, we should *move
SortValues transformation to sdks-java-core*, so a runner can easily
provide its own implementation.

The robust implementation is needed mainly for building of HFiles for the
HBase bulk load. When using external sorter, we often sort the whole data
set twice (shuffle may already did a job).

SortValues can not use custom comparator, because we want to be able to
push sorting logic down to a byte based shuffle.

The usage of SortValues transformation is little bit confusing. I think we
should add a *SortValues.perKey* method, which accepts a secondary key
extractor and coder, as the usage would be easier to understand. Also, this
explicitly states, that we sort values *perKey* only and that we sort using
an *encoded secondary key*. Example usage:


*PCollection> input = ...;*
*input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))*

What do you think? Is this the right direction?

Thanks for the comments!

Links:
-
http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E


Re: [PROPOSAL] Move sorting to sdks-java-core

2018-10-17 Thread David Morávek
We can always fall back to the External sorter in case of merging windows.
I reckon in this case, values usually fit in memory, so it would not be an
issue.

In case of non-merging windows, runner implementation would probably
require to group elements also by window during shuffle.

On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax  wrote:

> One concern would be merging windows. This happens after shuffle, so even
> if the shuffle were sorted you would need to do a sorted merge of two
> sorted buffers.
>
> On Wed, Oct 17, 2018 at 2:08 PM David Morávek 
> wrote:
>
>> Hello,
>>
>> I want to summarize my thoughts on the per key value sorting.
>>
>> Currently we have a separate module for sorting extension. The extension
>> contains *SortValues* transformation and implementations of different
>> sorters.
>>
>> Performance-wise it would be great to be able* to delegate sorting to a
>> runner* if it supports sort based shuffle. In order to do so, we should *move
>> SortValues transformation to sdks-java-core*, so a runner can easily
>> provide its own implementation.
>>
>> The robust implementation is needed mainly for building of HFiles for the
>> HBase bulk load. When using external sorter, we often sort the whole data
>> set twice (shuffle may already did a job).
>>
>> SortValues can not use custom comparator, because we want to be able to
>> push sorting logic down to a byte based shuffle.
>>
>> The usage of SortValues transformation is little bit confusing. I think
>> we should add a *SortValues.perKey* method, which accepts a secondary
>> key extractor and coder, as the usage would be easier to understand. Also,
>> this explicitly states, that we sort values *perKey* only and that we
>> sort using an *encoded secondary key*. Example usage:
>>
>>
>> *PCollection> input = ...;*
>> *input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))*
>>
>> What do you think? Is this the right direction?
>>
>> Thanks for the comments!
>>
>> Links:
>> -
>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>
>


Re: [PROPOSAL] Move sorting to sdks-java-core

2018-10-18 Thread David Morávek
Kenn, I believe we should not introduce hadoop dependency to neither sdks
or runners. We may split sorting in two packages, one with the
transformation + in memory implementation (this is the part I'd love to see
become part of sdks-java-core) and second module with more robust external
sorter (with hadoop dep).

Does this make sense?


On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin  wrote:

> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles  wrote:
>
>> The runner can always just depend on the sorter to do it the legacy way
>> by class matching; it shouldn't incur other dependency penalties... but now
>> that I look briefly, the sorter depends on Hadoop bits. That seems a heavy
>> price to pay for a user in any event. Are those Hadoop deps reasonably
>> self-contained?
>>
>
> Nice catch, Kenn! This is indeed why we didn't originally include the
> Sorter in core. The Hadoop deps have an enormous surface, or did at the
> time.
>
> Dan
>
>
>>
>> Kenn
>>
>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik  wrote:
>>
>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>> executed via portability since the Runner will be able to perform
>>> PTransform replacement and optimization based upon the URN of the transform
>>> and its payload so it would never need to have the "Sorter" class in its
>>> classpath.
>>>
>>> I'm ambivalent about whether merging it now is worth it.
>>>
>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek 
>>> wrote:
>>>
>>>> We can always fall back to the External sorter in case of merging
>>>> windows. I reckon in this case, values usually fit in memory, so it would
>>>> not be an issue.
>>>>
>>>> In case of non-merging windows, runner implementation would probably
>>>> require to group elements also by window during shuffle.
>>>>
>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax  wrote:
>>>>
>>>>> One concern would be merging windows. This happens after shuffle, so
>>>>> even if the shuffle were sorted you would need to do a sorted merge of two
>>>>> sorted buffers.
>>>>>
>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek 
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>
>>>>>> Currently we have a separate module for sorting extension. The
>>>>>> extension contains *SortValues* transformation and implementations
>>>>>> of different sorters.
>>>>>>
>>>>>> Performance-wise it would be great to be able* to delegate sorting
>>>>>> to a runner* if it supports sort based shuffle. In order to do so,
>>>>>> we should *move SortValues transformation to sdks-java-core*, so a
>>>>>> runner can easily provide its own implementation.
>>>>>>
>>>>>> The robust implementation is needed mainly for building of HFiles for
>>>>>> the HBase bulk load. When using external sorter, we often sort the whole
>>>>>> data set twice (shuffle may already did a job).
>>>>>>
>>>>>> SortValues can not use custom comparator, because we want to be able
>>>>>> to push sorting logic down to a byte based shuffle.
>>>>>>
>>>>>> The usage of SortValues transformation is little bit confusing. I
>>>>>> think we should add a *SortValues.perKey* method, which accepts a
>>>>>> secondary key extractor and coder, as the usage would be easier to
>>>>>> understand. Also, this explicitly states, that we sort values
>>>>>> *perKey* only and that we sort using an *encoded secondary key*.
>>>>>> Example usage:
>>>>>>
>>>>>>
>>>>>> *PCollection> input = ...;*
>>>>>> *input.apply(SortValues.perKey(KV::getValue,
>>>>>> BigEndianLongCoder.of()))*
>>>>>>
>>>>>> What do you think? Is this the right direction?
>>>>>>
>>>>>> Thanks for the comments!
>>>>>>
>>>>>> Links:
>>>>>> -
>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>>
>>>>>


Re: [DISCUSS] Beam public roadmap

2018-10-20 Thread David Morávek
+1 this looks like a great starting point. It is always beneficial for the user 
to know where the project is headed.

Sent from my iPhone

> On 20 Oct 2018, at 03:09, Ahmet Altay  wrote:
> 
> I looked at #6718, I think this is great as a starting point and not just a 
> mock. I particularly like that:
> - It divides the roadmap along major component areas (SDKs, runners, 
> portability). This is good because (a) it provides a complete top down 
> picture and (b) allows groups of people working in these areas to build their 
> own roadmaps. This division would empower people working in those components 
> to build mini-roadmaps. This make sense to me because people with most 
> context in those components would likely to already have some vision 
> somewhere about the future of those components and they are already working 
> towards realizing those. Now, they can share it with rest of the community 
> and users in a structured way.
> - The other good bit is that, there is a index page that pulls major bits 
> from each individual roadmap and provides a coherent list of where the 
> project is going. It would be very easy for users to just look at this page 
> and get a sense of the where the project is going.
> 
> I believe this break down makes it easier for the most folks in the community 
> to participate in the process of building and roadmap. In my opinion, we can 
> merge Kenn's _mock_ and ask people to start filling in the areas they care 
> about.
> 
> Ahmet
> 
>> On Wed, Oct 17, 2018 at 7:23 AM, Kenneth Knowles  wrote:
>> I mocked up a little something on https://github.com/apache/beam/pull/6718.
>> 
>> Kenn
>> 
>>> On Sun, Oct 14, 2018 at 5:33 PM Thomas Weise  wrote:
>>> Indeed, our current in-progress subsection isn't visible enough. It is also 
>>> too coarse grained. Perhaps we can replace it with a list of current and 
>>> proposed initiatives?
>>> 
>>> I could see the index live on the web site, but would prefer individual, 
>>> per-initiative pages to live on the wiki. That way they are easy to 
>>> maintain by respective contributors. 
>>> 
>>> Thanks
>>> 
 On Fri, Oct 12, 2018 at 8:06 PM Kenneth Knowles  wrote:
 I think we can easily steer clear of those concerns. It should not look 
 like a company's roadmap. This is just a term that users search for and 
 ask for. It might be an incremental improvement on 
 https://beam.apache.org/contribute/#works-in-progress to present it more 
 for users, to just give them a picture of the trajectory. For example, 
 Beam Python on Flink would probably be of considerable interest but it is 
 buried at https://beam.apache.org/contribute/portability/#status.
 
 Kenn
 
> On Fri, Oct 12, 2018 at 6:49 PM Thomas Weise  wrote:
> As I understand it the term "roadmap" is not favored. It may convey the 
> impression of an outside entity that controls what is being worked on and 
> when. At least in theory contributions are volunteer work and individuals 
> decide what they take up. There are projects that have a "list of 
> initiatives" or "improvement proposals" that are either in idea phase or 
> ongoing. Those provide an idea what is on the radar and perhaps that is a 
> sufficient for those looking for the overall direction? 
> 
> 
>> On Fri, Oct 12, 2018 at 3:08 PM Kenneth Knowles  wrote:
>> Did some searching about to see what other projects have done. Most OSS 
>> projects with open governance don't actually have such a thing AFAICT. 
>> Here are some from various [types of] projects. Please contribute links 
>> for any project you can think of that might be interesting examples.
>> 
>> My personal favorite for readability and content is Bazel. It does not 
>> do timelines, but says what they are most focused on. It has fewer, 
>> larger, items than our "Ongoing Projects" section. Then some breakouts 
>> into roadmaps for sub-bits.
>> 
>> Apache Flink (roadmap doc is stale, FLIPs nice and readable though)
>>  - 
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Release+and+Feature+Plan
>>  - 
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>> 
>> Apache Spark (no roadmap doc I could find, SPIPs not in real readable 
>> format):
>>  - https://spark.apache.org/improvement-proposals.html
>> 
>> Apache Apex
>>  - http://apex.apache.org/roadmap.html
>> 
>> Apache Calcite Avatica
>>  - https://calcite.apache.org/avatica/docs/roadmap.html
>> 
>> Apache Kafka
>>  - https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
>> 
>> Tensorflow
>>  - https://www.tensorflow.org/community/roadmap
>> 
>> Kubernetes
>>  - https://github.com/kubernetes/kubernetes/milestones
>> 
>> Firefox
>>  - https://wiki.mozilla.org/Firefox/Roadmap
>> 
>> Servo
>>  - https://github.com/se

Re: [PROPOSAL] Move sorting to sdks-java-core

2018-10-22 Thread David Morávek
What should be the next step? I guess we all agree that hadoop dependency
should be splitted out. Then we're left off with the SortValues transform +
in memory implementation. I'm ok with keeping this as a separate module, as
this would discourage users to use sorting in their business logic.

Robert:
ad introduction of a new method for the coders. How about creating a new
interface eg. *OrderPreservingCoder*? Than you can require this interface
in your method signature and IDE will autocomplete all of the possible
implementations that you can use. In case of a new method, user needs to
now which implementations are order preserving and it can be really
confusing. I think the same thinking should apply to other coder properties.

D.



On Thu, Oct 18, 2018 at 12:15 PM Niel Markwick  wrote:

> FYI: the BufferedExternalSorter depends on Hadoop client libraries
> (specifically hadoop_mapreduce_client_core and hadoop_common), but not on
> the Hadoop service -- because the  ExternalSorter
> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java>
> uses Hadoop's SequenceFile
> <http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/io/SequenceFile.html>
>  for
> on-disk sorting.
>
>
>
> On Thu, 18 Oct 2018 at 11:19 David Morávek 
> wrote:
>
>> Kenn, I believe we should not introduce hadoop dependency to neither sdks
>> or runners. We may split sorting in two packages, one with the
>> transformation + in memory implementation (this is the part I'd love to see
>> become part of sdks-java-core) and second module with more robust external
>> sorter (with hadoop dep).
>>
>> Does this make sense?
>>
>>
>> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin  wrote:
>>
>>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles  wrote:
>>>
>>>> The runner can always just depend on the sorter to do it the legacy way
>>>> by class matching; it shouldn't incur other dependency penalties... but now
>>>> that I look briefly, the sorter depends on Hadoop bits. That seems a heavy
>>>> price to pay for a user in any event. Are those Hadoop deps reasonably
>>>> self-contained?
>>>>
>>>
>>> Nice catch, Kenn! This is indeed why we didn't originally include the
>>> Sorter in core. The Hadoop deps have an enormous surface, or did at the
>>> time.
>>>
>>> Dan
>>>
>>>
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik  wrote:
>>>>
>>>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>>>> executed via portability since the Runner will be able to perform
>>>>> PTransform replacement and optimization based upon the URN of the 
>>>>> transform
>>>>> and its payload so it would never need to have the "Sorter" class in its
>>>>> classpath.
>>>>>
>>>>> I'm ambivalent about whether merging it now is worth it.
>>>>>
>>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek 
>>>>> wrote:
>>>>>
>>>>>> We can always fall back to the External sorter in case of merging
>>>>>> windows. I reckon in this case, values usually fit in memory, so it would
>>>>>> not be an issue.
>>>>>>
>>>>>> In case of non-merging windows, runner implementation would probably
>>>>>> require to group elements also by window during shuffle.
>>>>>>
>>>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax  wrote:
>>>>>>
>>>>>>> One concern would be merging windows. This happens after shuffle, so
>>>>>>> even if the shuffle were sorted you would need to do a sorted merge of 
>>>>>>> two
>>>>>>> sorted buffers.
>>>>>>>
>>>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <
>>>>>>> david.mora...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>>>
>>>>>>>> Currently we have a separate module for sorting extension. The
>>>>>>>> extension contains *SortValues* transformation and implementations
>>>>>>>> of different sorter

Re: [PROPOSAL] Move sorting to sdks-java-core

2018-10-22 Thread David Morávek
Lukasz, you are right. I didn't think about structured coders. Thanks

On Mon, Oct 22, 2018 at 7:40 PM Lukasz Cwik  wrote:

> I don't believe an interface will work because KvCoder/ListCoder/... would
> only be order preserving if their components coders were order preserving.
>
> On Mon, Oct 22, 2018 at 8:52 AM David Morávek 
> wrote:
>
>> What should be the next step? I guess we all agree that hadoop dependency
>> should be splitted out. Then we're left off with the SortValues transform +
>> in memory implementation. I'm ok with keeping this as a separate module, as
>> this would discourage users to use sorting in their business logic.
>>
>> Robert:
>> ad introduction of a new method for the coders. How about creating a new
>> interface eg. *OrderPreservingCoder*? Than you can require this
>> interface in your method signature and IDE will autocomplete all of the
>> possible implementations that you can use. In case of a new method, user
>> needs to now which implementations are order preserving and it can be
>> really confusing. I think the same thinking should apply to other coder
>> properties.
>>
>> D.
>>
>>
>>
>> On Thu, Oct 18, 2018 at 12:15 PM Niel Markwick  wrote:
>>
>>> FYI: the BufferedExternalSorter depends on Hadoop client libraries
>>> (specifically hadoop_mapreduce_client_core and hadoop_common), but not on
>>> the Hadoop service -- because the  ExternalSorter
>>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java>
>>> uses Hadoop's SequenceFile
>>> <http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/io/SequenceFile.html>
>>>  for
>>> on-disk sorting.
>>>
>>>
>>>
>>> On Thu, 18 Oct 2018 at 11:19 David Morávek 
>>> wrote:
>>>
>>>> Kenn, I believe we should not introduce hadoop dependency to neither
>>>> sdks or runners. We may split sorting in two packages, one with the
>>>> transformation + in memory implementation (this is the part I'd love to see
>>>> become part of sdks-java-core) and second module with more robust external
>>>> sorter (with hadoop dep).
>>>>
>>>> Does this make sense?
>>>>
>>>>
>>>> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin 
>>>> wrote:
>>>>
>>>>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles 
>>>>> wrote:
>>>>>
>>>>>> The runner can always just depend on the sorter to do it the legacy
>>>>>> way by class matching; it shouldn't incur other dependency penalties... 
>>>>>> but
>>>>>> now that I look briefly, the sorter depends on Hadoop bits. That seems a
>>>>>> heavy price to pay for a user in any event. Are those Hadoop deps
>>>>>> reasonably self-contained?
>>>>>>
>>>>>
>>>>> Nice catch, Kenn! This is indeed why we didn't originally include the
>>>>> Sorter in core. The Hadoop deps have an enormous surface, or did at the
>>>>> time.
>>>>>
>>>>> Dan
>>>>>
>>>>>
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik  wrote:
>>>>>>
>>>>>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>>>>>> executed via portability since the Runner will be able to perform
>>>>>>> PTransform replacement and optimization based upon the URN of the 
>>>>>>> transform
>>>>>>> and its payload so it would never need to have the "Sorter" class in its
>>>>>>> classpath.
>>>>>>>
>>>>>>> I'm ambivalent about whether merging it now is worth it.
>>>>>>>
>>>>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <
>>>>>>> david.mora...@gmail.com> wrote:
>>>>>>>
>>>>>>>> We can always fall back to the External sorter in case of merging
>>>>>>>> windows. I reckon in this case, values usually fit in memory, so it 
>>>>>>>> would
>>>>>>>> not be an issue.
>>>>>>>>
>>>>>>>> In case of non-merging windows, runner 

Re: [DISCUSS] Publish vendored dependencies independently

2018-10-23 Thread David Morávek
+1 This should improve build times a lot. It would be great if vendored
deps could stay in the main repository.

D.

On Tue, Oct 23, 2018 at 12:21 PM Maximilian Michels  wrote:

> Looks great, Kenn!
>
> > Max: what is the story behind having a separate flink-shaded repo? Did
> that make it easier to manage in some way?
>
> Better separation of concerns, but I don't think releasing the shaded
> artifacts from the main repo is a problem. I'd even prefer not to split
> up the repo because it makes updates to the vendored dependencies
> slightly easier.
>
> On 23.10.18 03:25, Kenneth Knowles wrote:
> > OK, I've filed https://issues.apache.org/jira/browse/BEAM-5819 to
> > collect sub-tasks. This has enough upsides throughout lots of areas of
> > the project that even though it is not glamorous it seems pretty
> > valuable to start on immediately. And I want to find out if there's a
> > pitfall lurking.
> >
> > Max: what is the story behind having a separate flink-shaded repo? Did
> > that make it easier to manage in some way?
> >
> > Kenn
> >
> > On Mon, Oct 22, 2018 at 2:55 AM Maximilian Michels  > > wrote:
> >
> > +1 for publishing vendored Jars independently. It will improve build
> > time and ease IntelliJ integration.
> >
> > Flink also publishes shaded dependencies separately:
> >
> > - https://github.com/apache/flink-shaded
> > - https://issues.apache.org/jira/browse/FLINK-6529
> >
> > AFAIK their main motivation was to get rid of duplicate shaded
> classes
> > on the classpath. We don't appear to have that problem because we
> > already have a separate "vendor" project.
> >
> >  >  - With shading, it is hard (impossible?) to step into dependency
> > code in IntelliJ's debugger, because the actual symbol at runtime
> > does not match what is in the external jars
> >
> > This would be solved by releasing the sources of the shaded jars.
> >  From a
> > legal perspective, this could be problematic as alluded to here:
> > https://github.com/apache/flink-shaded/issues/25
> >
> > -Max
> >
> > On 20.10.18 01:11, Lukasz Cwik wrote:
> >  > I have tried several times to improve the build system and
> intellij
> >  > integration and each attempt ended with little progress when
> dealing
> >  > with vendored code. My latest attempt has been the most promising
> > where
> >  > I take the vendored classes/jars and decompile them generating the
> >  > source that Intellij can then use. I have a branch[1] that
> > demonstrates
> >  > the idea. It works pretty well (and up until a change where we
> > started
> >  > vendoring gRPC, was impractical to do. Instructions to try it out
> > are:
> >  >
> >  > // Clean up any remnants of prior builds/intellij projects
> >  > git clean -fdx
> >  > // Generated the source for vendored/shaded modules
> >  > ./gradlew decompile
> >  >
> >  > // Remove the "generated" Java sources for protos so they don't
> > conflict with the decompiled sources.
> >  > rm -rf model/pipeline/build/generated/source/proto
> >  > rm -rf model/job-management/build/generated/source/proto
> >  > rm -rf model/fn-execution/build/generated/source/proto
> >  > // Import the project into Intellij, most code completion now
> > works still some issues with a few classes.
> >  > // Note that the Java decompiler doesn't generate valid source so
> > still need to delegate to Gradle for build/run/test actions
> >  > // Other decompilers may do a better/worse job but haven't tried
> > them.
> >  >
> >  >
> >  > The problems that I face are that the generated Java source from
> the
> >  > protos and the decompiled source from the compiled version of that
> >  > source post shading are both being imported as content roots and
> > then
> >  > conflict. Also, the CFR decompiler isn't producing valid source,
> if
> >  > people could try others and report their mileage, we may find one
> > that
> >  > works and then we would be able to use intellij to build/run our
> > code
> >  > and not need to delegate all our build/run/test actions to Gradle.
> >  >
> >  > After all these attempts I have done, vendoring the dependencies
> > outside
> >  > of the project seems like a sane approach and unless someone
> > wants to
> >  > take a stab at the best progress I have made above, I would go
> > with what
> >  > Kenn is suggesting even though it will mean that we will need to
> > perform
> >  > releases every time we want to change the version of one of our
> > vendored
> >  > dependencies.
> >  >
> >  > 1: https://github.com/lukecwik/incubator-beam/tree/intellij
> >  >
> >  >
> >  > On Fri, Oct 19, 2018 at 10:43 AM Kenneth Knowles  > 
> >  > 

Re: Growing Beam -- A call for ideas? What is missing? What would be good to see?

2018-10-27 Thread David Morávek
Hello Alejandro,

+1 for java implementation, even though this would probably require more
effort from your side

The main problem with Scio is that it lives outside beam code base and
depends on specific version of Beam SDK. The sketching extension (and any
other module in beam code base) on the other hand uses Beam SDK that is
build from sources (current checkout), that Scio might not be compatible
with.

D.

On Sat, Oct 27, 2018 at 8:26 AM Alejandro  wrote:

> Hello,
>
> although not exactly your intentions, I am also looking to contribute to
> Beam, but from a code perspective.
>
> I've been discussing with some beam members like Austin and lukasz
> (CCed) on how to integrate https://github.com/elbaulp/DPASF into Beam.
>
> It seems the best place for this algorithms is
> https://github.com/apache/beam/tree/master/sdks/java/extensions/sketching,
> but right now I lack the Beam knowledge that allow my to implement it.
> So I am looking to someone who could help me start. Should I write
> wrappers that interface my Scala code using
> https://github.com/spotify/scio? or re implement all in Java?
>
> Cheers.
>
> On 10/26/2018 11:55 PM, Rose Nguyen wrote:
> > I've heard of many people referring to the Medium posts related to Beam
> > for step-by-step tutorials.
> >
> > https://medium.com/tag/apache-beam/latest
> >
> > On Thu, Oct 25, 2018 at 9:25 PM Austin Bennett
> > mailto:whatwouldausti...@gmail.com>>
> wrote:
> >
> > Hi Beam Devs and Users,
> >
> > Trying to get a sense from the community on the sorts of things we
> > think would be useful to build the community (I am thinking not from
> > an angle of specific code/implementation/functionality, but from a
> > user/usability -- I want to dive in and make real contributions with
> > the code, too, but know I also have the interest and skills to help
> > with education and community aspects, hence my focus on this).
> >
> > I had previously suggested a sort of cookbook for focused and
> > curated examples (code and explination) to help people get started,
> > on-boarding, using Beam to aid getting up and running and
> > accomplishing something worthwhile (and quickly), that seems one way
> > to help grow our user base (and maybe future dev base afterwards
> > those users become enamored), which did get some positive feedback
> > when first put out there.
> >
> > There are many other areas where featuring others sharing successes
> > from having used Beam or little tips can be valuable, Pablo's
> > Awesome Beam is one example of such a
> > collection: https://github.com/pabloem/awesome-beam or even
> > centralizing a general place to find any/all Beam
> > blogs/shared-code/writeups/etc.
> >
> > Certainly there is a place for all sorts of contributions and
> > resources.  What do people on these lists think would be
> > particularly useful?  Trying to get a more focused sense of where we
> > think efforts might be best focused.
> >
> > Please share anything (even semi-)related!?
> >
> > Thanks,
> > Austin
> >
> >
> > P.S.  I realize that those following this list are rather self
> > selecting as well, so this might not be the best forum to figure out
> > what new/novice users need, but I would like to hear what everyone
> > else here thinks could be useful.
> >
> >
> >
> > --
> > Rose Thị Nguyễn
>
> --
> elbauldelprogramador.com
>


[Euphoria] Looking for a reviewer.

2018-11-06 Thread David Morávek
Hello,

I'm looking for a reviewer for [BEAM-5790]
 and also for any upcoming
Euphoria PR which I submit.

It has been already reviewed internally, but it should be also reviewed by
a committer who did not author the code.

It would be also great if other committer becomes familiar with the code
base (at least through reviews).

Is anyone willing to help?

Thanks,
D.


Re: [DISCUSS] Structuring Java based DSLs

2018-11-30 Thread David Morávek
Jan, we made Kryo optional recently (it is a separate module and is used
only in tests). From a quick look it seems that we forgot to remove compile
time dependency from euphoria's *build.gradle*. Only "strong" dependencies
I'm aware of are core SDK and guava. We'll be probably adding sketching
extension dependency soon.

D.

On Fri, Nov 30, 2018 at 7:08 PM Jan Lukavský  wrote:

> Hi Anton,
> reactions inline.
>
> -- Původní e-mail --
> Od: Anton Kedin 
> Komu: dev@beam.apache.org
> Datum: 30. 11. 2018 18:17:06
> Předmět: Re: [DISCUSS] Structuring Java based DSLs
>
> I think this approach makes sense in general, Euphoria can be the
> implementation detail of SQL, similar to Join Library or core SDK Schemas.
>
> I wonder though whether it would be better to bring Euphoria closer to
> core SDK first, maybe even merge them together. If you look at Reuven's
> recent work around schemas it seems like there are already similarities
> between that and Euphoria's approach, unless I'm missing the point (e.g.
> Filter transforms, FullJoin vs CoGroup... see [2]). And we're already
> switching parts of SQL to those transforms (e.g. SQL Aggregation is now
> implemented by core SDK's Group[3]).
>
>
>
> Yes, these transforms seem to be very similar to those Euphoria has.
> Whether or not to merge Euphoria with core is essentially just a decision
> of the community (in my point of view).
>
>
>
> Adding explicit Schema support to Euphoria will bring it both closer to
> core SDK and make it natural to use for SQL. Can this be a first step
> towards this integration?
>
>
>
> Euphoria currently operates on pure PCollections, so when PCollection has
> a schema, it will be accessible by Euphoria. It makes sense to make use of
> the schema in Euphoria - it seems natural on inputs to Euphoria operators,
> but it might be tricky (not saying impossible) to actually produce
> schema-aware PCollections as outputs from Euphoria operators (generally
> speaking, in special cases that might be possible). Regarding inputs, there
> is actually intention to act on type of PCollection - e.g. when PCollection
> is already of type KV, then it is possible to make key extractor and value
> extractor optional in Euphoria builders, so it feels natural to enable
> changing the builders when a schema-aware PCollection, and make use of the
> provided schema. The rest of Euphoria team might correct me, if I'm wrong.
>
>
>
>
> One question I have is, does Euphoria bring dependencies that are not
> needed by SQL, or does more or less only rely on the core SDK?
>
>
>
> I think the only relevant dependency that Euphoria has besides core SDK is
> Kryo. It is the default coder when no coder is provided, but that could be
> made optional - e.g. the default coder would be supported only if an
> appropriate module would be available. That way I think that Euphoria has
> no special dependencies.
>
>
>
> [1]
> https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java#L73
> [2]
> https://github.com/apache/beam/tree/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms
> [3]
> https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L179
>
>
>
> On Fri, Nov 30, 2018 at 6:29 AM Jan Lukavský  wrote:
>
> Hi community,
>
> I'm part of Euphoria DSL team, and on behalf of this team, I'd like to
> discuss possible development of Java based DSLs currently present in
> Beam. In my knowledge, there are currently two DSLs based on Java SDK -
> Euphoria and SQL. These DSLs currently share only the SDK itself,
> although there might be room to share some more effort. We already know
> that both Euphoria and SQL have need for retractions, but there are
> probably many more features that these two could share.
>
> So, I'd like to open a discussion on what it would cost and what it
> would possibly bring, if instead of the current structure
>
>Java SDK
>
>  |  SQL
>
>  |  Euphoria
>
> these DSLs would be structured as
>
>Java SDK ---> Euphoria ---> SQL
>
> I'm absolutely sure that this would be a great investment and a huge
> change, but I'd like to gather some opinions and general feelings of the
> community about this. Some points to start the discussion from my side
> would be, that structuring DSLs like this has internal logical
> consistency, because each API layer further narrows completeness, but
> brings simpler API for simpler tasks, while adding additional high-level
> view of the data processing pipeline and thus enabling more
> optimizations. On Euphoria side, these are various implementations joins
> (most effective implementation depends on data), pipeline sampling and
> more. Some (or maybe most) of these optimizations would have to be
> implemented i

Re: [VOTE] Accept the Firefly design donation as Beam Mascot - Deadline Mon April 6

2020-04-06 Thread David Morávek
+1 (non-binding)

On Mon, Apr 6, 2020 at 12:51 PM Reza Rokni  wrote:

> +1(non-binding)
>
> On Mon, Apr 6, 2020 at 5:24 PM Alexey Romanenko 
> wrote:
>
>> +1 (non-binding).
>>
>> > On 3 Apr 2020, at 14:53, Maximilian Michels  wrote:
>> >
>> > +1 (binding)
>> >
>> > On 03.04.20 10:33, Jan Lukavský wrote:
>> >> +1 (non-binding).
>> >>
>> >> On 4/2/20 9:24 PM, Austin Bennett wrote:
>> >>> +1 (nonbinding)
>> >>>
>> >>> On Thu, Apr 2, 2020 at 12:10 PM Luke Cwik > >>> > wrote:
>> >>>
>> >>>+1 (binding)
>> >>>
>> >>>On Thu, Apr 2, 2020 at 11:54 AM Pablo Estrada > >>>> wrote:
>> >>>
>> >>>+1! (binding)
>> >>>
>> >>>On Thu, Apr 2, 2020 at 11:19 AM Alex Van Boxel
>> >>>mailto:a...@vanboxel.be>> wrote:
>> >>>
>> >>>Thanks for clearing this up Aizhamal.
>> >>>
>> >>>+1 (non binding)
>> >>>
>> >>>_/
>> >>>_/ Alex Van Boxel
>> >>>
>> >>>
>> >>>On Thu, Apr 2, 2020 at 8:14 PM Aizhamal Nurmamat kyzy
>> >>>mailto:aizha...@apache.org>> wrote:
>> >>>
>> >>>Good point, Alex. Actually Julian and I have talked
>> >>>about producing this kind of guide. It will be
>> >>>delivered as an additional contribution in the follow
>> >>>up. We think this will be a derivative of the original
>> >>>design, and be done after the original is officially
>> >>>accepted.
>> >>>
>> >>>With this vote, we want to accept the Firefly donation
>> >>>as designed [1], and let Julian produce other
>> >>>artifacts using the official Beam mascot later on.
>> >>>
>> >>>[1]
>> https://docs.google.com/document/d/1zK8Cm8lwZ3ALVFpD1aY7TLCVNwlyTS3PXxTV2qQCAbk/edit?usp=sharing
>> >>>
>> >>>
>> >>>On Thu, Apr 2, 2020 at 10:37 AM Alex Van Boxel
>> >>>mailto:a...@vanboxel.be>> wrote:
>> >>>
>> >>>I don't want to be a spoiler... but this vote
>> >>>feels like a final deliverable... but without a
>> >>>style guide as Kenn originally suggested most of
>> >>>use will not be able to adapt the design. This
>> >>>would include:
>> >>>
>> >>>  * frontal view
>> >>>  * side view
>> >>>  * back view
>> >>>
>> >>>actually different posses so we can mix and match.
>> >>>Without this it will never reach the potential of
>> >>>the Go gopher or gRPC Pancakes.
>> >>>
>> >>>Note this is *not* a negative vote but I'm afraid
>> >>>that the use without a guide will be fairly
>> >>>limited as most of use are not designers. Just a
>> >>>concern.
>> >>>
>> >>> _/
>> >>>_/ Alex Van Boxel
>> >>>
>> >>>
>> >>>On Thu, Apr 2, 2020 at 7:27 PM Andrew Pilloud
>> >>>mailto:apill...@apache.org>>
>> >>>wrote:
>> >>>
>> >>>+1, Accept the donation of the Firefly design
>> >>>as Beam Mascot
>> >>>
>> >>>On Thu, Apr 2, 2020 at 10:19 AM Julian Bruno
>> >>>> >>>> wrote:
>> >>>
>> >>>Hello Apache Beam Community,
>> >>>
>> >>>Please vote on the acceptance of the final
>> >>>design of the Firefly as Beam's mascot
>> >>>[1]. Please share your input no later than
>> >>>Monday, April 6, at noon Pacific Time.
>> >>>
>> >>>
>> >>>[ ] +1, Accept the donation of the Firefly
>> >>>design as Beam Mascot
>> >>>
>> >>>[ ] -1, Decline the donation of the
>> >>>Firefly design as Beam Mascot
>> >>>
>> >>>
>> >>>Vote is adopted by at least 3 PMC +1
>> >>>approval votes, with no PMC -1 disapproval
>> >>>
>> >>>votes. Non-PMC votes are still encouraged.
>> >>>
>> >>>PMC voters, please help by indicating your
>> >>>vote as "(binding)"
>> >>>
>> >>>
>> >>>The vote and input phase will be open
>> >>>until Monday, April 6, at 12 pm Pacific
>> Time.
>> >>>
>> >>>
>> >>>Thank you very much for your feedback and
>> >>>ideas,
>> >>>
>> >>>Julian
>> >>>
>> >>>
>> >>>[1]
>> >>>
>> https://docs.google

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

2020-04-21 Thread David Morávek
Hi Stephen,

nice catch and awesome report! ;) This definitely needs a proper fix. I've
created a new JIRA to track the issue and will try to resolve it soon as
this seems critical to me.

https://issues.apache.org/jira/browse/BEAM-9794

Thanks,
D.

On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel 
wrote:

> I was able to reproduce this in a unit test:
>
> @Test
>>
>>   *public* *void* test() *throws* InterruptedException,
>> ExecutionException {
>>
>> FlinkPipelineOptions options = PipelineOptionsFactory.*as*
>> (FlinkPipelineOptions.*class*);
>>
>> options.setCheckpointingInterval(10L);
>>
>> options.setParallelism(1);
>>
>> options.setStreaming(*true*);
>>
>> options.setRunner(FlinkRunner.*class*);
>>
>> options.setFlinkMaster("[local]");
>>
>> options.setStateBackend(*new* MemoryStateBackend(Integer.*MAX_VALUE*
>> ));
>>
>> Pipeline pipeline = Pipeline.*create*(options);
>>
>> pipeline
>>
>> .apply(Create.*of*((Void) *null*))
>>
>> .apply(
>>
>> ParDo.*of*(
>>
>> *new* DoFn() {
>>
>>
>>   *private* *static* *final* *long* *serialVersionUID* =
>> 1L;
>>
>>
>>   @RequiresStableInput
>>
>>   @ProcessElement
>>
>>   *public* *void* processElement() {}
>>
>> }));
>>
>> pipeline.run();
>>
>>   }
>>
>
> It took a while to get to checkpoint 32,767, but eventually it did, and it
> failed with the same error I listed above.
>
> On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel 
> wrote:
>
>> I have a Beam Pipeline (2.14) running on Flink (1.8.0, emr-5.26.0) that
>> uses the RequiresStableInput feature.
>>
>> Currently it's configured to checkpoint once a minute, and after around
>> 32000-33000 checkpoints, it fails with:
>>
>>> 2020-04-15 13:15:02,920 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
>>> 2020-04-15 13:15:05,762 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes
>>> in 2667 ms).
>>> 2020-04-15 13:16:02,919 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
>>> 2020-04-15 13:16:03,147 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph-
>>>  (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from
>>> RUNNING to FAILED.
>>> AsynchronousException{java.lang.Exception: Could not materialize
>>> checkpoint 32702 for operator  (1/2).}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.Exception: Could not materialize checkpoint 32702
>>> for operator  (1/2).
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>>> ... 6 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.IllegalArgumentException
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>>> at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>>> ... 5 more
>>> Caused by: java.lang.IllegalArgumentException
>>> at
>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>>> at
>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.(OperatorBackendSerializationProxy.java:68)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>>> at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:2

Re: Flink: Lost pane timing at some steps of pipeline

2020-05-04 Thread David Morávek
Hi Jozef, I think this is expected beahior as Flink does not use default
expansion for Reshuffle (uses round-robin rebalance ship strategy instead).
There is no aggregation that needs buffering (and triggering). All of the
elements are immediately emmited to downstream operations after the
Reshuffle.

In case of direct runner, this is just a side-effect of Reshuffle
expansion. See
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L69
for more details.

I don't think we should expect Reshuffle to have the same semantics as GBK,
because it's only an performance optimization steps, that should not have
any effect to pipeline's overall result. Some runners may also completely
ignore this step as part of execution plan optimization process (eg. two
reshuffles in a row are idempotent). (
https://issues.apache.org/jira/browse/BEAM-9824)

D.

On Mon, May 4, 2020 at 2:48 PM Jozef Vilcek  wrote:

> I have a pipeline which
>
> 1. Read from KafkaIO
> 2. Does stuff with events and writes windowed file via FileIO
> 3. Apply statefull DoFn on written files info
>
> The statefull DoFn does some logic which depends on PaneInfo.Timing, if it
> is EARLY or something else. When testing in DirectRunner, all is good. But
> with FlinkRunner, panes are always NO_FIRING.
>
> To demonstrate this, here is a dummy test pipeline:
>
> val testStream = sc.testStream(testStreamOf[String]
>   .advanceWatermarkTo(new Instant(1))
>   .addElements(goodMessage, goodMessage)
>   .advanceWatermarkTo(new Instant(2))
>   .addElements(goodMessage, goodMessage)
>   .advanceWatermarkTo(new Instant(200))
>   .addElements(goodMessage, goodMessage)
>   .advanceWatermarkToInfinity())
>
> testStream
>   .withFixedWindows(
> duration = Duration.standardSeconds(1),
> options = WindowOptions(
>   trigger = AfterWatermark.pastEndOfWindow()
> .withEarlyFirings(AfterPane.elementCountAtLeast(1))
> .withLateFirings(AfterPane.elementCountAtLeast(1)),
>   accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
>   allowedLateness = Duration.standardDays(1)
> ))
>   .keyBy(_ => "static_key")
>   .withPaneInfo
>   .map { case (element, paneInfo) =>
> println(s"#1 - $paneInfo")
> element
>   }
>   //.groupByKey // <- need to uncomment this for Flink to work
>   .applyTransform(Reshuffle.viaRandomKey())
>   .withPaneInfo
>   .map { case (element, paneInfo) =>
> println(s"#2 - $paneInfo")
> element
>   }
>
> When executed with DirectRunner, #1 prints pane with UNKNOWN timing and #2
> with EARLY, which is what I expect. When run with Flink runner, both #1 and
> #2 writes UNKNOWN timing from PaneInfo.NO_FIRING. Only if I add extra GBK,
> then #2 writes panes with EARLY timing.
>
> This is run on Beam 2.19. I was trying to analyze where could be a problem
> but got lost. I will be happy for any suggestions or pointers. Does it
> sounds like bug or am I doing something wrong?
>


Re: Validates Runner on Java 11 and the Java SDK Harness

2020-05-07 Thread David Morávek
Great effort Ismaël! ;) Can't wait to try this out :)

On Thu, May 7, 2020 at 12:08 PM Ismaël Mejía  wrote:

> Filled https://issues.apache.org/jira/browse/BEAM-9915 for the moment
> to track this.
>
> On Wed, Apr 22, 2020 at 10:35 PM Mikhail Gryzykhin 
> wrote:
> >
> > +Paweł Pasterz
> >
> > On Wed, Apr 22, 2020, 13:23 Pablo Estrada  wrote:
> >>
> >> +Mikhail Gryzykhin fyi : )
> >>
> >> On Tue, Apr 21, 2020 at 1:25 PM Ismaël Mejía  wrote:
> >>>
> >>> I have been working in recent days on enabling the CI tests for Java
> 11 in
> >>> Jenkins for Flink (based on latest release 1.10.x, already merged) and
> Spark
> >>> (based on upcoming release 3.x.x tested locally), so far we have good
> progress
> >>> for both classical runners with only one test failing the complete
> suite! So
> >>> soon we would be able to announce that we support Java 11 for our most
> popular
> >>> open source classical runners.
> >>>
> >>> The next step is to tackle the portable validates runner tests, and
> when I
> >>> looked at those I realized that we are not publishing the Java SDK
> Harness based
> >>> on Java 11. I would like to know if someone might be interested on
> taking this
> >>> task so we can publish the Java 11 SDK Harness docker image too maybe
> as part of
> >>> the next release.
> >>>
> >>> Anyone who knows the harness container part interested on creating a
> >>> JIRA and working on a fix for this?
> >>>
> >>> Regards,
> >>> Ismaël
>


snapshot releases

2020-05-13 Thread David Morávek
Hi,

I've just lost quite some time on the issue related to how we currently
release beam snapshots.

Because we only publish snapshots from the master branch, unreleased
version of beam (2.21.0-SNAPSHOT) was last updated on 9th of April.

eg.
https://repository.apache.org/content/groups/snapshots/org/apache/beam/beam-runners-core-java/2.21.0-SNAPSHOT/

This is may be confusing for the end user and I think we should either:
- Upload snapshots from all release branches.
- Keep only snapshots from current master version (delete all prior
versions) to have user fail fast on CI build.

WDYT?

D.


dealing with late data output timestamps

2020-05-28 Thread David Morávek
Hi,

I've came across "unexpected" model behaviour when dealing with late data
and custom timestamp combiners. Let's take a following pipeline as an
example:

final PCollection input = ...;
input.apply(
  "GlobalWindows",
  Window.into(new GlobalWindows())
  .triggering(
  AfterWatermark.pastEndOfWindow()
  .withEarlyFirings(
  AfterProcessingTime.pastFirstElementInPane()
  .plusDelayOf(Duration.standardSeconds(10
  .withTimestampCombiner(TimestampCombiner.LATEST)
  .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
  .accumulatingFiredPanes())
  .apply("Aggregate", Count.perElement())

The above pipeline emits updates with the latest input timestamp it has
seen so far (from non-late elements). We write the output from this
timestamp to kafka and read it from another pipeline.

Problem comes when we need to handle late elements behind output watermark.
In this case beam can not use combined timestamp and uses EOW timestamp
instead. Unfortunately this results in downstream pipeline progressing it's
input watermark to end of global window. Also if we would use fixed windows
after this aggregation, it would yield unexpected results.

There is no reasoning about this behaviour in the last section of lateness
design doc  [1], so I'd like to open a
discussion about what the expected result should be.

My personal opinion is, that correct approach would be emitting late
elements with currentOutputWatermark rather than EOW in case of EARLIEST
and LATEST timestamp combiners.

I've prepared a faling test case for ReduceFnRunner
,
if anyone wants to play around with the issue.

I also think that BEAM-2262
 [2] may be related to
this discussion.

[1] https://s.apache.org/beam-lateness
[2] https://issues.apache.org/jira/browse/BEAM-2262
[3]
https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b

Looking forward to hearing your thoughts.

Thanks,
D.


Re: Remove EOL'd Runners

2020-06-10 Thread David Morávek
+1

On Tue, Jun 9, 2020 at 7:43 PM Ahmet Altay  wrote:

> Thank you Tyson!
>
> On Tue, Jun 9, 2020 at 10:20 AM Thomas Weise  wrote:
>
>> +1
>>
>>
>> On Tue, Jun 9, 2020 at 9:41 AM Robert Bradshaw 
>> wrote:
>>
>>> Makes sense to me.
>>>
>>> On Tue, Jun 9, 2020 at 8:45 AM Maximilian Michels 
>>> wrote:
>>>
 Thanks of the heads-up, Tyson! It's a sensible decision to remove
 unsupported runners.

 -Max

 On 09.06.20 16:51, Tyson Hamilton wrote:
 > Hi All,
 >
 > As part of the Fixit [1] I'd like to remove EOL'd runners, Apex and
 Gearpump, as described in BEAM- [2]. This will be a big PR I think and
 didn't want anyone to be surprised. There is already some agreement in the
 linked Jira issue. If there are no objections I'll get started later today
 or tomorrow.
 >
 > -Tyson
 >
 >
 > [1]:
 https://lists.apache.org/thread.html/r9ddc77a8fee58ad02f68e2d9a7f054aab3e55717cc88ad1d5bc49311%40%3Cdev.beam.apache.org%3E
 > [2]: https://issues.apache.org/jira/browse/BEAM-
 >

>>>


Re: Possible 80% reduction in overhead for flink runner, input needed

2020-10-27 Thread David Morávek
Hi Teodor,

Thanks for bringing this up. This is a known, long standing "issue".
Unfortunately there are few things we need to consider:

- As you correctly noted, the *Beam model doesn't enforce immutability* of
input / output elements, so this is the price.
- We* can not break *existing pipelines.
- Flink Runner needs to provide the *same guarantees as the Beam model*.

There are definitely some things we can do here, to make things faster:

- We can try the similar approach as HadoopIO
(HadoopInputFormatReader#isKnownImmutable), to check for known immutable
types (KV, primitives, protobuf, other known internal immutable structures).
-* If the type is immutable, we can safely reuse it.* This should cover
most of the performance costs without breaking the guarantees Beam model
provides.
- We can enable registration of custom "immutable" types via pipeline
options? (this may be an unnecessary knob, so this needs a further
discussion)

WDYT?

D.


On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren 
wrote:

> Hey!
>
> I'm a student at the University of Oslo, and I'm writing a master thesis
> about the possibility of using Beam to benchmark stream processing
> systems. An important factor in this is the overhead associated with
> using Beam over writing code for the runner directly. [1] found that
> there was a large overhead associated with using Beam, but did not
> investigate where this overhead came from. I've done benchmarks and
> confirmed the findings there, where for simple chains of identity
> operators, Beam is 43x times slower than the Flink equivalent.
>
> These are very simple pipelines, with custom sources that just output a
> series of integers. By profiling I've found that most of the overhead
> comes from serializing and deserializing. Specifically the way
> TypeSerializer's, [2], is implemented in [3], where each object is
> serialized and then deserialized between every operator. Looking into
> the semantics of Beam, no operator should change the input, so we don't
> need to do a copy here. The function in [3] could potentially be changed
> to a single `return` statement.
>
> Doing this removes 80% of the overhead in my tests. This is a very
> synthetic example, but it's a low hanging fruit and might give a speed
> boost to many pipelines when run on the Flink runnner. I would like to
> make this my first contribution to Beam, but as the guide [4] says, I
> thought I'd ask here first to see if there a is a reason not to do this.
>
> Only objection I can see, is that it might break existing pipelines
> which rely on the Flink runner saving them from not following the
> immutability guarantee. I see this as a small loss as they are relying
> on an implementation detail of the Flink runner.
>
> I hope I have explained this adequately and eagerly away any feedback :)
>
> Best regards,
> Teodor Spæren
>
> [1]: https://arxiv.org/abs/1907.08302
> [2]:
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
> [3]:
> https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84
> [4]: https://beam.apache.org/contribute/
>


Re: Possible 80% reduction in overhead for flink runner, input needed

2020-10-27 Thread David Morávek
you made a really good argument ;) I'm inclined to an experimental opt-in
flag that would enable this. It would be great if we could automatically
check for violations - kind of a safety net, for mistakes in user code.

Just to note, direct runner enforcement may not cover all cases, as it only
checks binary representation after serialization. Also there are
programmers that don't write tests, especially during prototyping (not an
argument for perf. penalty, but something to keep in mind).

Max, WDYT?





On Tue, Oct 27, 2020 at 12:44 PM Teodor Spæren 
wrote:

> Some more thoughts:
>
> As it says on the DirectRunner [1] page, the DirectRunner is meant to
> check that users don't rely on semantics that are not guaranteed by the
> Beam model.
>
> Programs that rely on the Flink runner deep cloning the inputs between
> each operator in the pipeline is relying on a semantic that is not
> guaranteed by the Beam model, and those pipelines would fail if ran on
> the DirectRunner.
>
> As I stated in the previous email, I have some example programs that
> return different outputs on the Flink runner and on the DirectRunner. I
> have not tested these programs on other runners, so I don't know what
> they would return. If they return different answers than the
> DirectRunner, I'm inclined to say that the DirectRunner should either be
> changed, or the runners be changed.
>
>  From my very limited point of view, the Flink runner seems to be
> spending a lot of extra time implementing a semantic guarantee that the
> Beam model explicitly doesn't support.
>
>
> Best regards,
> Teodor Spæren
>
> [1]: https://beam.apache.org/documentation/runners/direct/
>
> On Tue, Oct 27, 2020 at 12:08:51PM +0100, Teodor Spæren wrote:
> >Hey David,
> >
> >I think I might have worded this poorly, because what I meant is that
> >from what I can see in [1], the BEAM model explicitly states that
> >PCollections should be treated as immutable. The direct runner also
> >tests for this. Do the other runners also protect the user from
> >misusing the system so? If not we have a situation where running the
> >same pipeline on two different runners will yield different answers. I
> >can show some examples that return different examples for the Flink
> >and the Direct Runner.
> >
> >I agree that a breaking existing pipelines is a no-no, but I do think
> >that we could simply gate this behind an option on the Flink runner.
> >
> >I also tried to search for this before, but did not find any mention
> >of it, can you link me to some discussions about this in the past?
> >
> >Thanks for reply :D
> >
> >Best regards,
> >Teodor Spæren
> >
> >[1]:
> https://beam.apache.org/documentation/programming-guide/#immutability
> >
> >
> >On Tue, Oct 27, 2020 at 11:49:45AM +0100, David Morávek wrote:
> >>Hi Teodor,
> >>
> >>Thanks for bringing this up. This is a known, long standing "issue".
> >>Unfortunately there are few things we need to consider:
> >>
> >>- As you correctly noted, the *Beam model doesn't enforce immutability*
> of
> >>input / output elements, so this is the price.
> >>- We* can not break *existing pipelines.
> >>- Flink Runner needs to provide the *same guarantees as the Beam model*.
> >>
> >>There are definitely some things we can do here, to make things faster:
> >>
> >>- We can try the similar approach as HadoopIO
> >>(HadoopInputFormatReader#isKnownImmutable), to check for known immutable
> >>types (KV, primitives, protobuf, other known internal immutable
> structures).
> >>-* If the type is immutable, we can safely reuse it.* This should cover
> >>most of the performance costs without breaking the guarantees Beam model
> >>provides.
> >>- We can enable registration of custom "immutable" types via pipeline
> >>options? (this may be an unnecessary knob, so this needs a further
> >>discussion)
> >>
> >>WDYT?
> >>
> >>D.
> >>
> >>
> >>On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren  >
> >>wrote:
> >>
> >>>Hey!
> >>>
> >>>I'm a student at the University of Oslo, and I'm writing a master thesis
> >>>about the possibility of using Beam to benchmark stream processing
> >>>systems. An important factor in this is the overhead associated with
> >>>using Beam over writing code for the runner directly. [1] found that
> >>>there was a large overhead associated with using Beam, but did not

Re: [DISCUSS] Drop support for Flink 1.8 and 1.9

2021-03-12 Thread David Morávek
+1

D.

On Thu, Mar 11, 2021 at 8:33 PM Ismaël Mejía  wrote:

> +user
>
> > Should we add a warning or something to 2.29.0?
>
> Sounds like a good idea.
>
>
>
>
> On Thu, Mar 11, 2021 at 7:24 PM Kenneth Knowles  wrote:
> >
> > Should we add a warning or something to 2.29.0?
> >
> > On Thu, Mar 11, 2021 at 10:19 AM Ismaël Mejía  wrote:
> >>
> >> Hello,
> >>
> >> We have been supporting older versions of Flink that we had agreed in
> previous
> >> discussions where we said we will be supporting only the latest three
> releases
> >> [1].
> >>
> >> I would like to propose that for Beam 2.30.0 we stop supporting Flink
> 1.8 and
> >> 1.9 [2].  I prepared a PR for this [3] but of course I wanted to bring
> the
> >> subject here (and to user@) for your attention and in case someone has
> a
> >> different opinion or reason to still support the older versions.
> >>
> >> WDYT?
> >>
> >> Regards,
> >> Ismael
> >>
> >> [1]
> https://lists.apache.org/thread.html/rfb5ac9d889d0e3f4400471de3c25000a15352bde879622c899d97581%40%3Cdev.beam.apache.org%3E
> >> [2] https://issues.apache.org/jira/browse/BEAM-11948
> >> [3] https://github.com/apache/beam/pull/14203
>


Re: [DISCUSS] Drop support for Flink 1.10

2021-05-31 Thread David Morávek
Hi,

+1 as we've agreed to keep support for three latest major releases in the
past

D.

On Mon, May 31, 2021 at 9:54 AM Jan Lukavský  wrote:

> Hi,
>
> +1 to remove the support for 1.10.
>
>  Jan
> On 5/28/21 10:00 PM, Ismaël Mejía wrote:
>
> Hello,
>
> With Beam support for Flink 1.13 just merged it is the time to discuss the
> end of
> support for Flink 1.10 following the agreed policy on supporting only the
> latest
> three Flink releases [1].
>
> I would like to propose that for Beam 2.31.0 we stop supporting Flink 1.10
> [2].
> I prepared a PR for this [3] but of course I wanted to bring the subject
> here
> (and to user@) for your attention and in case someone has a different
> opinion or
> reason to still support the older version.
>
> WDYT?
>
> Regards,
> Ismael
>
> [1]
> https://lists.apache.org/thread.html/rfb5ac9d889d0e3f4400471de3c25000a15352bde879622c899d97581%40%3Cdev.beam.apache.org%3E
> [2] https://issues.apache.org/jira/browse/BEAM-12281
> [3] https://github.com/apache/beam/pull/14906
>
>


Re: Unable to read state Witten by Beam application with Flink runner using Flink's State Processor API

2021-08-06 Thread David Morávek
Hi Sandeep,

I've stitched together a little working example of how to read Beam State
using State Processor API. It's basically a thin wrapper around the
existing API. From the example you can see, that accessing Beam state is
far more complicated as it's another layer on top of Flink state
primitives. Another notable difference is that Beam uses custom state
namespaces.

Having a toolkit for simple reading of Beam savepoints would definitely be
a great addition to the existing Beam's Flink Runner ecosystem. I'm ccing
dev@beam..., if someone from the community would be interested in working
on this topic.

https://github.com/dmvk/beam-flink-state-processor/blob/master/src/main/java/org/apache/dmvk/beam/BeamSavepoint.java
https://github.com/dmvk/beam-flink-state-processor/blob/master/src/test/java/org/apache/dmvk/beam/BeamSavepointTest.java#L37

Best,
D.

On Wed, Aug 4, 2021 at 9:03 AM David Morávek  wrote:

> Hi Sandeep, thanks for the example, I'll take a look into it and will get
> back to you ;)
>
> On Tue, Aug 3, 2021 at 9:44 PM Kathula, Sandeep <
> sandeep_kath...@intuit.com> wrote:
>
>> Hi David,
>> Thanks for the reply. I tried with Beam 2.29 and Flink
>> 1.12 and still getting NullPointerException like before. I changed the code
>> a bit to remove all the proprietary software used in our company and able
>> to reproduce the issue with local Kafka, Beam with Flink runner running
>> locally.
>>
>>Beam Flink runner code:
>> https://github.com/kathulasandeep/beam-flink-stateful/blob/master/src/main/java/org/sandeep/processor/Processor.java
>>Local Kafka producer:
>> https://github.com/kathulasandeep/beam-flink-stateful/blob/master/src/main/java/org/sandeep/kafka/SimpleKafkaProducer.java
>> Reading state using State processor API:
>> https://github.com/kathulasandeep/beam-flink-stateful/blob/master/src/main/java/org/sandeep/readstate/StateReader.java
>>
>> Thanks,
>> Sandeep
>>
>>
>> On 7/27/21, 10:10 AM, "David Morávek"  wrote:
>>
>> This email is from an external sender.
>>
>>
>> Hi Sandeep,
>>
>> In general I'd say it will be tricky to read Beam state this way as it
>> doesn't use Flink primitives, but it's writing state in custom binary
>> format (it can be de-serialized, but it's not easy to put all of the
>> pieces
>> together).
>>
>> Can you please share an example code of how you're reading the state?
>> Also
>> can please you try this with latest Beam / Flink versions (the ones
>> you're
>> using are no longer supported)?
>>
>> Best,
>> D.
>>
>> On Tue, Jul 27, 2021 at 5:46 PM Kathula, Sandeep
>>  wrote:
>>
>> > Hi,
>> >  We have a simple Beam application like a work count running
>> with
>> > Flink runner (Beam 2.26 and Flink 1.9). We are using Beam’s value
>> state. I
>> > am trying to read the state from savepoint using  Flink's State
>> Processor
>> > API but getting a NullPointerException. Converted the whole code
>> into Pure
>> > Flink application, created a savepoint and tried to read the state
>> where we
>> > are able to read the state successfully.
>> >
>> > Exception Stack trace:
>> >
>> > Exception in thread "main"
>> > org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> > at
>> >
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>> > at
>> >
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631)
>> > at
>> >
>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:222)
>> > at
>> >
>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>> > at
>> >
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
>> > at
>> > org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>> > at
>> > com.intuit.spp.example.StateReader.main(StateReader.java:34)
>> > Caused by: java.io.IOException: Failed to restore state backend
>> > at
>> >
>> org.apache.flink.state.api.

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread David Morávek
Hi Sandeep,

Jan has already provided pretty good guidelines for getting more context on
the issue ;)

Because this is not for the first time, I would like to raise awareness,
that it's not OK to send a user related question to four Apache mailing
list (that I know of). Namely:

- u...@flink.apache.org
- d...@flink.apache.org
- u...@beam.apache.org
- dev@beam.apache.org

Community focus is a very precious resource, that should be used wisely.
All of these mailings lists are answering many complex questions each day
and it's very unfortunate if any of this work needs to be duplicated. Next
time please focus Beam related user questions solely to u...@beam.apache.org
.

Thanks for your understanding. You can consult community guidelines [1][2]
if you are not sure where the particular question belongs to.

[1] https://flink.apache.org/community.html#mailing-lists
[2] https://beam.apache.org/community/contact-us/

Best,
D.

On Tue, Sep 14, 2021 at 5:47 PM Jan Lukavský  wrote:

> Hi Sandeep,
> a few questions:
>  a) which state backend do you use for Flink?
>  b) what is your checkpointingInterval set for FlinkRunner?
>  c) how much data is there in your input Kafka topic(s)?
>
> FileIO has to buffer all elements per window (by default) into state, so
> this might create a high pressure on state backend and/or heap, which could
> result in suboptimal performance. Due to the "connection loss" and timeout
> exceptions you describe I'd suppose there might be a lot of GC pressure.
>
>  Jan
> On 9/14/21 5:20 PM, Kathula, Sandeep wrote:
>
> Hi,
>
>We have a simple Beam application which reads from Kafka, converts to
> parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
> have a fixed window of 5 minutes after conversion to
> PCollection and then writing to S3. We have around 320
> columns in our data. Our intention is to write large files of size 128MB or
> more so that it won’t have a small file problem when reading back from
> Hive. But from what we observed it is taking too much memory to write to S3
> (giving memory of 8GB to heap is not enough to write 50 MB files and it is
> going OOM). When I increase memory for heap to 32GB then it take lot of
> time to write records to s3.
>
> For instance it takes:
>
>
>
> 20 MB file - 30 sec
>
> 50 MB file - 1 min 16 sec
>
> 75 MB file - 2 min 15 sec
>
> 83 MB file - 2 min 40 sec
>
>
>
> Code block to write to S3:
>
> PCollection parquetRecord = …….
>
>
>
> parquetRecord.apply(FileIO.*write*()
> .via(ParquetIO.*sink*(getOutput_schema()))
> .to(outputPath.isEmpty() ? outputPath() : outputPath)
> .withNumShards(5)
> .withNaming(new CustomFileNaming("snappy.parquet")));
>
>
>
>
>
> We are also getting different exceptions like:
>
>
>
>1. *UserCodeException*:
>
>
>
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>
> at
> com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>
> at
> org.apache.beam.runners.core.Simp

Re: Flink support for OrderedListState

2021-11-16 Thread David Morávek
Hi Reuven,

this would be a great addition to the Flink Runner and could help with
broader adoption ;)

to make an effective implementation that works well across different state
backends, this will most likely require adding a new primitive state type
to the Flink's state backend ecosystem. I'll do some analysis to see what's
necessary and will get back to you until end of the week. I can also
shepherd the effort on the Flink side.

I think it's a good idea to start with the naive implementation anyway as
this could be potentially only supported by not-yet released Flink versions.

Feel free to assign me as a reviewer for this on the Beam side, I'm still
familiar with the Flink runner code base.

Best,
D.



On Tue, Nov 16, 2021 at 5:29 AM Reuven Lax  wrote:

> Not hearing any answers - I'll add a naive implementation to our Flink
> runner then.
>
> Who currently is the best reviewer for changes to the Flink runner?
>
> On Mon, Nov 15, 2021 at 1:49 AM Jan Lukavský  wrote:
>
>> Hi Reuven,
>>
>> cc d...@flink.apache.org.
>>
>>  Jan
>> On 11/12/21 04:35, Reuven Lax wrote:
>>
>> OrderedListState
>> 
>>  was
>> added to Beam over a year ago. To date it is only supported by the Dataflow
>> runner and the DirectRunner. I want to see if it's possible to support this
>> well on the Flink runner (and eventually on other runners as well).
>>
>> This is a state type that is indexed by an int64 sorting key (the Beam
>> API exposes this as a 64-bit timestamp, as that's the most-common use case,
>> but fundamentally it's just an integer). Users can insert element, fetch
>> ranges of elements, and delete ranges of elements.
>>
>> Is there any way to implement this on Flink? I could of course add a
>> naive implementation - store everything in a ListState and have the Flink
>> runner sort the list every time it is fetched. This seems quite
>> inefficient, and the range deletes will be even less efficient.
>> https://issues.apache.org/jira/browse/FLINK-6219 seems to imply that
>> Flink has considered sorted state primitives, but I don't see any activity
>> on this issue.
>>
>> Is there any way to do this, or should I just add the naive
>> implementation?
>>
>> Reuven
>>
>>


Re: Flink support for OrderedListState

2021-11-16 Thread David Morávek
>
> Based on what Aljoscha said, storing it in MapState would be better as in
> practice the map will be sorted (as long as the key encoding is order
> preserving, and on quick glance InstantCoder is order preserving), so I'll
> change that


This property only holds for RocksDB based state backend, so we
unfortunately can not leverage that. For example HeapStateBackend is backed
by a simple HashMap.

Best,
D.

On Tue, Nov 16, 2021 at 6:17 PM Reuven Lax  wrote:

> Sounds great!
>
> I implemented the naive approach, but my current impl is based on
> ListState, sorting it every time it's read. Based on what Aljoscha said,
> storing it in MapState would be better as in practice the map will be
> sorted (as long as the key encoding is order preserving, and on quick
> glance InstantCoder is order preserving), so I'll change that. This still
> requires fetching the entire list every time, even if the user only issues
> a range fetch, but probably good enough for now. Would be cool if Flink
> were able to support this natively and efficiently.
>
> While trying to test this I realized that the Flink runner also didn't
> support the OnWindowExpiration callback and all the relevant tests
> used that callback. I'll first send a PR implementing OnWindowExpiration.
>
> To Robert's point - we should definitely add this to the protos and
> support it on the portable runners as well!
>
> Reuven
>
> On Tue, Nov 16, 2021 at 6:02 AM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> sorry for the slow response! I was on vacation and only saw this now.
>>
>> I can't add much more than what David already said: right now, it's not
>> possible to do an *efficient* implementation on Flink. The closest Flink
>> state type would be MapState, which keeps state in multiple underlying
>> RocksDB key/value cells (for the RocksDB state backend). That already keeps
>> things sorted, but it's sorted by the serialized representation in RocksDB,
>> which might just be the correct sorting for int64 timestamps. We don't
>> expose that guarantee, though, because it might not be true for other state
>> backends.
>>
>> It would require convincing the Flink folks to add such a new state type,
>> though.
>>
>> Best,
>> Aljoscha
>>
>> On Fri, Nov 12, 2021, at 04:35, Reuven Lax wrote:
>> > OrderedListState
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java
>> >
>> > was
>> > added to Beam over a year ago. To date it is only supported by the
>> > Dataflow
>> > runner and the DirectRunner. I want to see if it's possible to support
>> > this
>> > well on the Flink runner (and eventually on other runners as well).
>> >
>> > This is a state type that is indexed by an int64 sorting key (the Beam
>> API
>> > exposes this as a 64-bit timestamp, as that's the most-common use case,
>> but
>> > fundamentally it's just an integer). Users can insert element, fetch
>> ranges
>> > of elements, and delete ranges of elements.
>> >
>> > Is there any way to implement this on Flink? I could of course add a
>> > naive implementation - store everything in a ListState and have the
>> Flink
>> > runner sort the list every time it is fetched. This seems quite
>> > inefficient, and the range deletes will be even less efficient.
>> > https://issues.apache.org/jira/browse/FLINK-6219 seems to imply that
>> Flink
>> > has considered sorted state primitives, but I don't see any activity on
>> > this issue.
>> >
>> > Is there any way to do this, or should I just add the naive
>> implementation?
>> >
>> > Reuven
>>
>


Re: Flink support for OrderedListState

2021-11-16 Thread David Morávek
Yes, my intuition is that it should be more or less the same. There might
be some penalty in terms of having more keys in the RocksDB, eg. on the
delete code path you need to write more tombstones + compaction, but other
than that I can't think of anything.

You'd still need to sort all the data in memory after reading them, but in
this particular case it should save some cycles.

On Tue, Nov 16, 2021 at 8:00 PM Reuven Lax  wrote:

> Right, but no worse than storing in ListState for that case, right?
>
> On Tue, Nov 16, 2021 at 10:44 AM David Morávek  wrote:
>
>> Based on what Aljoscha said, storing it in MapState would be better as in
>>> practice the map will be sorted (as long as the key encoding is order
>>> preserving, and on quick glance InstantCoder is order preserving), so I'll
>>> change that
>>
>>
>> This property only holds for RocksDB based state backend, so we
>> unfortunately can not leverage that. For example HeapStateBackend is backed
>> by a simple HashMap.
>>
>> Best,
>> D.
>>
>> On Tue, Nov 16, 2021 at 6:17 PM Reuven Lax  wrote:
>>
>>> Sounds great!
>>>
>>> I implemented the naive approach, but my current impl is based on
>>> ListState, sorting it every time it's read. Based on what Aljoscha said,
>>> storing it in MapState would be better as in practice the map will be
>>> sorted (as long as the key encoding is order preserving, and on quick
>>> glance InstantCoder is order preserving), so I'll change that. This still
>>> requires fetching the entire list every time, even if the user only issues
>>> a range fetch, but probably good enough for now. Would be cool if Flink
>>> were able to support this natively and efficiently.
>>>
>>> While trying to test this I realized that the Flink runner also didn't
>>> support the OnWindowExpiration callback and all the relevant tests
>>> used that callback. I'll first send a PR implementing OnWindowExpiration.
>>>
>>> To Robert's point - we should definitely add this to the protos and
>>> support it on the portable runners as well!
>>>
>>> Reuven
>>>
>>> On Tue, Nov 16, 2021 at 6:02 AM Aljoscha Krettek 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> sorry for the slow response! I was on vacation and only saw this now.
>>>>
>>>> I can't add much more than what David already said: right now, it's not
>>>> possible to do an *efficient* implementation on Flink. The closest Flink
>>>> state type would be MapState, which keeps state in multiple underlying
>>>> RocksDB key/value cells (for the RocksDB state backend). That already keeps
>>>> things sorted, but it's sorted by the serialized representation in RocksDB,
>>>> which might just be the correct sorting for int64 timestamps. We don't
>>>> expose that guarantee, though, because it might not be true for other state
>>>> backends.
>>>>
>>>> It would require convincing the Flink folks to add such a new state
>>>> type, though.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On Fri, Nov 12, 2021, at 04:35, Reuven Lax wrote:
>>>> > OrderedListState
>>>> > <
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java
>>>> >
>>>> > was
>>>> > added to Beam over a year ago. To date it is only supported by the
>>>> > Dataflow
>>>> > runner and the DirectRunner. I want to see if it's possible to
>>>> support
>>>> > this
>>>> > well on the Flink runner (and eventually on other runners as well).
>>>> >
>>>> > This is a state type that is indexed by an int64 sorting key (the
>>>> Beam API
>>>> > exposes this as a 64-bit timestamp, as that's the most-common use
>>>> case, but
>>>> > fundamentally it's just an integer). Users can insert element, fetch
>>>> ranges
>>>> > of elements, and delete ranges of elements.
>>>> >
>>>> > Is there any way to implement this on Flink? I could of course add a
>>>> > naive implementation - store everything in a ListState and have the
>>>> Flink
>>>> > runner sort the list every time it is fetched. This seems quite
>>>> > inefficient, and the range deletes will be even less efficient.
>>>> > https://issues.apache.org/jira/browse/FLINK-6219 seems to imply that
>>>> Flink
>>>> > has considered sorted state primitives, but I don't see any activity
>>>> on
>>>> > this issue.
>>>> >
>>>> > Is there any way to do this, or should I just add the naive
>>>> implementation?
>>>> >
>>>> > Reuven
>>>>
>>>


Re: Flink support for OrderedListState

2021-11-22 Thread David Morávek
Hi Reuven,

I've put together a little POC of the Flink side support [1] for
OrderListState over the weekend, PTAL. Overall I'd say it should be pretty
straightforward and backward compatible change.

This effort will definitely require creating a new FLIP and making a good
case for the Flink community. Would you be willing to participate in this
contribution?

[1]
https://github.com/dmvk/flink/commit/ecdbc774b13b515e8c0943b2c143fb1e34eca6f0

Best,
D.

On Tue, Nov 16, 2021 at 8:34 PM David Morávek  wrote:

> Yes, my intuition is that it should be more or less the same. There might
> be some penalty in terms of having more keys in the RocksDB, eg. on the
> delete code path you need to write more tombstones + compaction, but other
> than that I can't think of anything.
>
> You'd still need to sort all the data in memory after reading them, but in
> this particular case it should save some cycles.
>
> On Tue, Nov 16, 2021 at 8:00 PM Reuven Lax  wrote:
>
>> Right, but no worse than storing in ListState for that case, right?
>>
>> On Tue, Nov 16, 2021 at 10:44 AM David Morávek  wrote:
>>
>>> Based on what Aljoscha said, storing it in MapState would be better as
>>>> in practice the map will be sorted (as long as the key encoding is order
>>>> preserving, and on quick glance InstantCoder is order preserving), so I'll
>>>> change that
>>>
>>>
>>> This property only holds for RocksDB based state backend, so we
>>> unfortunately can not leverage that. For example HeapStateBackend is backed
>>> by a simple HashMap.
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Nov 16, 2021 at 6:17 PM Reuven Lax  wrote:
>>>
>>>> Sounds great!
>>>>
>>>> I implemented the naive approach, but my current impl is based on
>>>> ListState, sorting it every time it's read. Based on what Aljoscha said,
>>>> storing it in MapState would be better as in practice the map will be
>>>> sorted (as long as the key encoding is order preserving, and on quick
>>>> glance InstantCoder is order preserving), so I'll change that. This still
>>>> requires fetching the entire list every time, even if the user only issues
>>>> a range fetch, but probably good enough for now. Would be cool if Flink
>>>> were able to support this natively and efficiently.
>>>>
>>>> While trying to test this I realized that the Flink runner also didn't
>>>> support the OnWindowExpiration callback and all the relevant tests
>>>> used that callback. I'll first send a PR implementing OnWindowExpiration.
>>>>
>>>> To Robert's point - we should definitely add this to the protos and
>>>> support it on the portable runners as well!
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Nov 16, 2021 at 6:02 AM Aljoscha Krettek 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> sorry for the slow response! I was on vacation and only saw this now.
>>>>>
>>>>> I can't add much more than what David already said: right now, it's
>>>>> not possible to do an *efficient* implementation on Flink. The closest
>>>>> Flink state type would be MapState, which keeps state in multiple
>>>>> underlying RocksDB key/value cells (for the RocksDB state backend). That
>>>>> already keeps things sorted, but it's sorted by the serialized
>>>>> representation in RocksDB, which might just be the correct sorting for
>>>>> int64 timestamps. We don't expose that guarantee, though, because it might
>>>>> not be true for other state backends.
>>>>>
>>>>> It would require convincing the Flink folks to add such a new state
>>>>> type, though.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>> On Fri, Nov 12, 2021, at 04:35, Reuven Lax wrote:
>>>>> > OrderedListState
>>>>> > <
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java
>>>>> >
>>>>> > was
>>>>> > added to Beam over a year ago. To date it is only supported by the
>>>>> > Dataflow
>>>>> > runner and the DirectRunner. I want to see if it's possible to
>>>>> support
>>>>> > this
>>>>> > well on the Flink runner (and eventually on other runners as well).
>>>>> >
>>>>> > This is a state type that is indexed by an int64 sorting key (the
>>>>> Beam API
>>>>> > exposes this as a 64-bit timestamp, as that's the most-common use
>>>>> case, but
>>>>> > fundamentally it's just an integer). Users can insert element, fetch
>>>>> ranges
>>>>> > of elements, and delete ranges of elements.
>>>>> >
>>>>> > Is there any way to implement this on Flink? I could of course add a
>>>>> > naive implementation - store everything in a ListState and have the
>>>>> Flink
>>>>> > runner sort the list every time it is fetched. This seems quite
>>>>> > inefficient, and the range deletes will be even less efficient.
>>>>> > https://issues.apache.org/jira/browse/FLINK-6219 seems to imply
>>>>> that Flink
>>>>> > has considered sorted state primitives, but I don't see any activity
>>>>> on
>>>>> > this issue.
>>>>> >
>>>>> > Is there any way to do this, or should I just add the naive
>>>>> implementation?
>>>>> >
>>>>> > Reuven
>>>>>
>>>>


Re: Flink support for OrderedListState

2021-12-22 Thread David Morávek
>
> The only thing I'm not sure of is firstKey, lastKey methods on SortedMap -
> could this be implemented efficiently in the rocksdb store?
>

I think some kind of skip scan should be enough to do this.

As for sorting by an arbitrary key, I'd be bit afraid of unlocking this
door :) Just remember the long standing discussion about introducing
sorting capabilities into the Beam Model. For years the community didn't
want to hear anything about sorting and then came up to a conclusion that
if you need sorting, you should never need to sort by anything else than a
timestamp (eg. for state machines, temporal joins).

Personally I can still imagine some use cases along the lines of building
HFiles, but for this case one sorting pass at the flush should be enough,
so we might just use some kind of external sorter instead.

D.

On Wed, Dec 1, 2021 at 6:03 PM Reuven Lax  wrote:

> To randomize this even further, if Flink can support arbitrary keys, then
> another approach here would be to implement Java's SortedMap API.
> FetchRange could then be implemented via map.subMap(begin, end) and
> clearRange could be done via map.subMap(begin, end).clear(). The only thing
> I'm not sure of is firstKey, lastKey methods on SortedMap - could this be
> implemented efficiently in the rocksdb store?
>
> On Tue, Nov 30, 2021 at 8:43 PM Reuven Lax  wrote:
>
>> Left a comment - wonder if we should let the list be sorted by an
>> arbitrary key. instead of timestamp. Timestamp is likely the more-common
>> use case, but I can imagine other use cases (e.g. imagining reading log
>> entries and wanting to keep the entries sorted by LSN).
>>
>> On Mon, Nov 22, 2021 at 1:12 PM David Morávek  wrote:
>>
>>> Hi Reuven,
>>>
>>> I've put together a little POC of the Flink side support [1] for
>>> OrderListState over the weekend, PTAL. Overall I'd say it should be pretty
>>> straightforward and backward compatible change.
>>>
>>> This effort will definitely require creating a new FLIP and making a
>>> good case for the Flink community. Would you be willing to participate in
>>> this contribution?
>>>
>>> [1]
>>> https://github.com/dmvk/flink/commit/ecdbc774b13b515e8c0943b2c143fb1e34eca6f0
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Nov 16, 2021 at 8:34 PM David Morávek  wrote:
>>>
>>>> Yes, my intuition is that it should be more or less the same. There
>>>> might be some penalty in terms of having more keys in the RocksDB, eg. on
>>>> the delete code path you need to write more tombstones + compaction, but
>>>> other than that I can't think of anything.
>>>>
>>>> You'd still need to sort all the data in memory after reading them, but
>>>> in this particular case it should save some cycles.
>>>>
>>>> On Tue, Nov 16, 2021 at 8:00 PM Reuven Lax  wrote:
>>>>
>>>>> Right, but no worse than storing in ListState for that case, right?
>>>>>
>>>>> On Tue, Nov 16, 2021 at 10:44 AM David Morávek 
>>>>> wrote:
>>>>>
>>>>>> Based on what Aljoscha said, storing it in MapState would be better
>>>>>>> as in practice the map will be sorted (as long as the key encoding is 
>>>>>>> order
>>>>>>> preserving, and on quick glance InstantCoder is order preserving), so 
>>>>>>> I'll
>>>>>>> change that
>>>>>>
>>>>>>
>>>>>> This property only holds for RocksDB based state backend, so we
>>>>>> unfortunately can not leverage that. For example HeapStateBackend is 
>>>>>> backed
>>>>>> by a simple HashMap.
>>>>>>
>>>>>> Best,
>>>>>> D.
>>>>>>
>>>>>> On Tue, Nov 16, 2021 at 6:17 PM Reuven Lax  wrote:
>>>>>>
>>>>>>> Sounds great!
>>>>>>>
>>>>>>> I implemented the naive approach, but my current impl is based on
>>>>>>> ListState, sorting it every time it's read. Based on what Aljoscha said,
>>>>>>> storing it in MapState would be better as in practice the map will be
>>>>>>> sorted (as long as the key encoding is order preserving, and on quick
>>>>>>> glance InstantCoder is order preserving), so I'll change that. This 
>>>>>>> still
>>>>>>> requires fetching the entire li

Re: Regarding Project proposal review and feedback

2023-04-27 Thread David Morávek
Hi Siddharth,

Thanks for your interest in the Flink Runner for Beam. Reading through the
project, one thing that immediately strikes me is that there already is a
Flink runner based on DataStream and Operator (one level below DataStream)
API in the code base. Are you aware of this? If yes, how does the runner
you want to introduce differ from the existing one?

Best,
D.

On Sun, Apr 2, 2023 at 9:41 PM Svetak Sundhar via dev 
wrote:

> Hi Siddharth,
> I left some comments as well on the sentiment analysis proposal.
>
> Thanks,
>
>
> Svetak Sundhar
>
>   Technical Solutions Engineer, Data
> s vetaksund...@google.com
>
>
>
> On Sun, Apr 2, 2023 at 1:58 PM Anand Inguva via dev 
> wrote:
>
>> I left some comments on the sentiment analysis proposal.
>>
>> Thanks,
>> Anand
>>
>> On Thu, Mar 30, 2023 at 9:59 AM Danny McCormick via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Thanks Siddharth! I left some comments on the sentiment analysis
>>> proposal, I am probably not the best person to comment on the flink
>>> datastream api one though.
>>>
>>> Thanks,
>>> Danny
>>>
>>> On Fri, Mar 24, 2023 at 11:53 PM Siddharth Aryan <
>>> siddhartharyan...@gmail.com> wrote:
>>>
 Hello ,
 I am Siddharth Aryan a undergrad and I am looking forward to someone
 who can help me reviewing my proposal and give me a feedback on the them
 which help me to create a good proposal.
 Here ,I am attaching my both the project proposals:
 >Sentimental Analysis Pipeline with the help of Machine Learnig:

 https://docs.google.com/document/d/1U6zcXAWsDCrWlbf14f5VlLqPZFucwXR48tD7mrERW-g/edit?usp=sharing

 >Integrating Apache Beam with Flink Datastream API:

 https://docs.google.com/document/d/1sQEe9eVuoHX9QWS9Zj5wVl7MLmfk7QO09pjZOsk-TFY/edit?usp=sharing

 Best Regards
 Siddharth Aryan

 Github :https://github.com/nervoussidd

>>>


Re: Regarding Project proposal review and feedback

2023-04-29 Thread David Morávek
@Siddharth


> Secondly, the existing runner does not support Beam's windowing and
> triggering semantics in the Flink DataStream API


Can you please expand on this? AFAIK the current Flink runner is feature
complete w.r.t. windowing support.

which provides more control over the execution of the pipeline but requires
> more manual configuration
>

What manual configuration are you referring to? The user should be
completely abstracted away from the Flink APIs. If this is not the case,
it's something to be fixed.

@Jeff

one idea in my mind is to integrate the beam sql api with flink table api
>

(this might be a bit outdated, please correct me if I'm wrong) AFAIK Beam
SQL is just a DSL over Beam's low-level APIs. This makes introducing new
runners fairly straightforward because you only need to support a few
primitive transformations to have a fully working runner (even though it
might not be optimal performance-wise); everything else is just built on
top of those. This prevents us from directly translating Beam SQL into
Flink SQL / Flink Table API (you always need to go through Beam's low-level
API).


Unless there is a good reason, we should not introduce another Flink runner
into the Beam codebase. The current one, which is very advanced
feature-wise (it took years to get there), already has a need for more
maintainers. If you see weak spots in the current implementation, would you
consider improving the existing runner instead of trying to write a new one
from scratch?

Best,
D.

On Fri, Apr 28, 2023 at 11:29 AM Siddharth Aryan <
siddhartharyan...@gmail.com> wrote:

> Hello Jeff,
> Thank you for the idea, as it will allow beam users to write sql
> queries using the Beam SQL API and execute them on the Flink Table API.I
> will look into it later as my current focus is to implement an integration
> between Apache Beam and the Flink DataStream API. While the existing Flink
> runner is based on DataStream and Operator API, my project aims to create a
> new runner that specifically utilizes the Flink DataStream API.
> And thanks for the feedback.
>
> Best Regards,
> Siddharth Aryan
>
> On Thu, Apr 27, 2023 at 1:39 PM Jeff Zhang  wrote:
>
>> Same question as David,  one idea in my mind is to integrate the beam sql
>> api with flink table api, this does not exist in the current flink runner.
>>
>> On Thu, Apr 27, 2023 at 3:46 PM David Morávek  wrote:
>>
>>> Hi Siddharth,
>>>
>>> Thanks for your interest in the Flink Runner for Beam. Reading through
>>> the project, one thing that immediately strikes me is that there already is
>>> a Flink runner based on DataStream and Operator (one level below
>>> DataStream) API in the code base. Are you aware of this? If yes, how does
>>> the runner you want to introduce differ from the existing one?
>>>
>>> Best,
>>> D.
>>>
>>> On Sun, Apr 2, 2023 at 9:41 PM Svetak Sundhar via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> Hi Siddharth,
>>>> I left some comments as well on the sentiment analysis proposal.
>>>>
>>>> Thanks,
>>>>
>>>>
>>>> Svetak Sundhar
>>>>
>>>>   Technical Solutions Engineer, Data
>>>> s vetaksund...@google.com
>>>>
>>>>
>>>>
>>>> On Sun, Apr 2, 2023 at 1:58 PM Anand Inguva via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> I left some comments on the sentiment analysis proposal.
>>>>>
>>>>> Thanks,
>>>>> Anand
>>>>>
>>>>> On Thu, Mar 30, 2023 at 9:59 AM Danny McCormick via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> Thanks Siddharth! I left some comments on the sentiment analysis
>>>>>> proposal, I am probably not the best person to comment on the flink
>>>>>> datastream api one though.
>>>>>>
>>>>>> Thanks,
>>>>>> Danny
>>>>>>
>>>>>> On Fri, Mar 24, 2023 at 11:53 PM Siddharth Aryan <
>>>>>> siddhartharyan...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello ,
>>>>>>> I am Siddharth Aryan a undergrad and I am looking forward to someone
>>>>>>> who can help me reviewing my proposal and give me a feedback on the them
>>>>>>> which help me to create a good proposal.
>>>>>>> Here ,I am attaching my both the project proposals:
>>>>>>> >Sentimental Analysis Pipeline with the help of Machine Learnig:
>>>>>>>
>>>>>>> https://docs.google.com/document/d/1U6zcXAWsDCrWlbf14f5VlLqPZFucwXR48tD7mrERW-g/edit?usp=sharing
>>>>>>>
>>>>>>> >Integrating Apache Beam with Flink Datastream API:
>>>>>>>
>>>>>>> https://docs.google.com/document/d/1sQEe9eVuoHX9QWS9Zj5wVl7MLmfk7QO09pjZOsk-TFY/edit?usp=sharing
>>>>>>>
>>>>>>> Best Regards
>>>>>>> Siddharth Aryan
>>>>>>>
>>>>>>> Github :https://github.com/nervoussidd
>>>>>>>
>>>>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>