Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-25 Thread Rui Wang
I see.

Actually I was still referring to make "LookupStream" as PCollectionView to
perform sideinput join, which then doesn't have mismatch WindowFn problem.
Otherwise, we shouldn't check special case of WindowFn to decide if perform
a sideinput join for two unbounded PCollection when their WindowFn does not
match.

And "data completeness" really means is sideinput is triggered so it could
change, and then the question is when sideinput is changed, should we
refine previous data? It becomes harder to reason at this moment.


Rui

On Thu, Jul 25, 2019 at 6:17 PM rahul patwari 
wrote:

> "*In terms of Join schematic, I think it's hard to reason data
> completeness since one side of the join is changing*"
> - As it is possible to apply [Global Windows with Non-Default Trigger] to
> Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
> from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
> the condition that one of the PCollection being Joined have WindowFn as
> [Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
> pastFirstElementInPane())] is it sufficient to perform the Join of
> "MainStream" and this "LookupStream"?
>
> In other words, I mean to say that instead of directly throwing Exception
> 
>  when
> Joining two Unbounded PCollections with different WindowFns, If we can
> ensure that
> MainStream: one side of the join is Unbounded with WindowFn as [Non-Global
> Windows with DefaultTrigger] and
> LookupStream: the other side of the Join is a "Slowly Changing Lookup
> Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
> pastFirstElementInPane()) Trigger],
> we can directly perform a SideInputJoin.
>
> Will we have "data completeness" problem even in "Slowly Changing lookup
> Cache Pattern"?
>
> On Fri, Jul 26, 2019 at 2:51 AM Rui Wang  wrote:
>
>> To be more clear, I think it's useful if we can achieve the following
>> that you wrote
>>
>> PCollection mainStream = ...;
>> PCollection lookupStream = ...;
>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>> new TupleTag("LookupTable"));
>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>
>> -Rui
>>
>> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang  wrote:
>>
>>> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes
>>> the slow changing table join problem.
>>>
>>> To your question: "Can we implement SideInputJoin for this case", there
>>> are two perspectives.
>>>
>>> In terms of implementing the slowing changing lookup cache pattern
>>> 
>>>  in
>>> BeamSQL, such sidinput join can be done that way. At least it worth
>>> exploring it until we identify blockers. I also think this pattern is
>>> already useful to users.
>>>
>>> In terms of Join schematic, I think it's hard to reason data
>>> completeness since one side of join is changing.
>>>
>>> -Rui
>>>
>>>
>>> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <
>>> rahulpatwari8...@gmail.com> wrote:
>>>
 Hi Kenn,

 If we consider the following two *Unbounded* PCollections:
 - PCollection1 => [*Non-Global* Window with Default Trigger]
 - PCollection2 => [Global Window with *Non-Default* Trigger] :)
 coincidentally turned out to be the opposite

 Joining these two PCollections in BeamSql currently is not possible
 because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
 Mismatch)
 But in this case, PCollection1 can be joined with PCollection2 using
 SideInputJoin (
 https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
 which is being done for Joining an Unbounded PCollection with Bounded
 PCollection. I am thinking that Beam can guarantee it joins all input
 elements once per window for this case.
 The result of the join might be fuzzy for the window when the Trigger
 for PCollection2 fires and sideinput gets loaded into Memory.

 PCollection2 can be considered as Slowly Changing Lookup Cache and
 BeamSql can support Pattern:
 https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
 which is currently not possible.
 I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
 BeamSql to natively support PCollectionView so that BeamSql supports
 "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
 TableProvider.

 If we can support this, User will be able to do:
 PCollection mainStream = ...;
 PCollection lookupStream = ...;
 PCollectionTuple tuple = 

Re: Live fixing of a Beam bug on July 25 at 3:30pm-4:30pm PST

2019-07-25 Thread Andres Angel
Awesome Pablo thanks so much!!!

AU

On Thu, Jul 25, 2019 at 7:48 PM Pablo Estrada  wrote:

> Thanks for those who tuned in : ) - I feel like I might have spent too
> long fiddling with Python code, and not long enough doing setup, testing,
> etc. I will try to do another one where I just test / setup the environment
> / lint checks etc.
>
> Here are links for:
> Setting up the Python environment: https://youtu.be/xpIpEO4PUDo?t=334
> Quickly setting up the Java environment:
> https://youtu.be/xpIpEO4PUDo?t=3659
>
> Doing a Pull Request: https://youtu.be/xpIpEO4PUDo?t=3770
>
> On Thu, Jul 25, 2019 at 4:39 PM sridhar inuog 
> wrote:
>
>> Thanks, Pablo for organizing this session. I found it useful.
>>
>> On Thu, Jul 25, 2019 at 4:56 PM Pablo Estrada  wrote:
>>
>>> The link is here: https://www.youtube.com/watch?v=xpIpEO4PUDo
>>> This is still happening.
>>>
>>> On Thu, Jul 25, 2019 at 2:55 PM Innocent Djiofack 
>>> wrote:
>>>
 Did I miss the link or this was postponed?

 On Tue, Jul 23, 2019 at 3:05 PM Austin Bennett <
 whatwouldausti...@gmail.com> wrote:

> Pablo,
>
> Assigned  https://issues.apache.org/jira/browse/BEAM-7607 to you, to
> make even more likely that it is still around on the 25th :-)
>
> Cheers,
> Austin
>
> On Tue, Jul 23, 2019 at 11:24 AM Pablo Estrada 
> wrote:
>
>> Hi all,
>> I've just realized that
>> https://issues.apache.org/jira/browse/BEAM-7607 is a single-line
>> change - and we'd spend 40 minutes chitchatting, so I'll also be working 
>> on
>> https://jira.apache.org/jira/browse/BEAM-7803, which is a Python
>> issue (also for the BigQuery sink!).
>> Thanks!
>> -P.
>>
>> On Sat, Jul 20, 2019 at 2:05 PM Pablo Estrada 
>> wrote:
>>
>>> Hello all,
>>>
>>> This will be streamed on youtube on this link:
>>> https://www.youtube.com/watch?v=xpIpEO4PUDo
>>>
>>> I think there will be a live chat, so I will hopefully be available
>>> to answer questions. To be honest, my workflow is not super efficient,
>>> but... oh well, hopefully it will be at least somewhat helpful to 
>>> others : )
>>> Best
>>> -P.
>>>
>>> On Thu, Jul 18, 2019 at 12:59 AM Tim Sell  wrote:
>>>
 +1, I'd love to see this as a recording. Will you stick it up on
 youtube afterwards?

 On Thu, Jul 18, 2019 at 4:00 AM sridhar inuog <
 sridharin...@gmail.com> wrote:

> Thanks, Pablo! Looking forward to it! Hopefully, it will also be
> recorded as well.
>
> On Wed, Jul 17, 2019 at 2:50 PM Pablo Estrada 
> wrote:
>
>> Yes! So I will be working on a small feature request for Java's
>> BigQueryIO: https://issues.apache.org/jira/browse/BEAM-7607
>>
>> Maybe I'll do something for Python next month. : )
>> Best
>> -P.
>>
>> On Wed, Jul 17, 2019 at 12:32 PM Rakesh Kumar <
>> rakeshku...@lyft.com> wrote:
>>
>>> +1, I really appreciate this initiative. It would be really
>>> helpful newbies like me.
>>>
>>> Is it possible to list out what are the things that you are
>>> planning to cover?
>>>
>>>
>>>
>>>
>>> On Tue, Jul 16, 2019 at 11:19 AM Yichi Zhang 
>>> wrote:
>>>
 Thanks for organizing this Pablo, it'll be very helpful!

 On Tue, Jul 16, 2019 at 10:57 AM Pablo Estrada <
 pabl...@google.com> wrote:

> Hello all,
> I'll be having a session where I live-fix a Beam bug for 1
> hour next week. Everyone is invited.
>
> It will be on July 25, between 3:30pm and 4:30pm PST.
> Hopefully I will finish a full change in that time frame, but 
> we'll see.
>
> I have not yet decided if I will do this via hangouts, or via
> a youtube livestream. In any case, I will share the link here in 
> the next
> few days.
>
> I will most likely work on the Java SDK (I have a little
> feature request in mind).
>
> Thanks!
> -P.
>


 --

 *DJIOFACK INNOCENT*
 *"Be better than the day before!" -*
 *+1 404 751 8024*

>>>


Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-25 Thread rahul patwari
"*In terms of Join schematic, I think it's hard to reason data completeness
since one side of the join is changing*"
- As it is possible to apply [Global Windows with Non-Default Trigger] to
Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
the condition that one of the PCollection being Joined have WindowFn as
[Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
pastFirstElementInPane())] is it sufficient to perform the Join of
"MainStream" and this "LookupStream"?

In other words, I mean to say that instead of directly throwing Exception

when
Joining two Unbounded PCollections with different WindowFns, If we can
ensure that
MainStream: one side of the join is Unbounded with WindowFn as [Non-Global
Windows with DefaultTrigger] and
LookupStream: the other side of the Join is a "Slowly Changing Lookup
Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
pastFirstElementInPane()) Trigger],
we can directly perform a SideInputJoin.

Will we have "data completeness" problem even in "Slowly Changing lookup
Cache Pattern"?

On Fri, Jul 26, 2019 at 2:51 AM Rui Wang  wrote:

> To be more clear, I think it's useful if we can achieve the following that
> you wrote
>
> PCollection mainStream = ...;
> PCollection lookupStream = ...;
> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
> new TupleTag("LookupTable"));
> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>
> -Rui
>
> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang  wrote:
>
>> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes the
>> slow changing table join problem.
>>
>> To your question: "Can we implement SideInputJoin for this case", there
>> are two perspectives.
>>
>> In terms of implementing the slowing changing lookup cache pattern
>> 
>>  in
>> BeamSQL, such sidinput join can be done that way. At least it worth
>> exploring it until we identify blockers. I also think this pattern is
>> already useful to users.
>>
>> In terms of Join schematic, I think it's hard to reason data completeness
>> since one side of join is changing.
>>
>> -Rui
>>
>>
>> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <
>> rahulpatwari8...@gmail.com> wrote:
>>
>>> Hi Kenn,
>>>
>>> If we consider the following two *Unbounded* PCollections:
>>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>>> coincidentally turned out to be the opposite
>>>
>>> Joining these two PCollections in BeamSql currently is not possible
>>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>>> Mismatch)
>>> But in this case, PCollection1 can be joined with PCollection2 using
>>> SideInputJoin (
>>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>>> which is being done for Joining an Unbounded PCollection with Bounded
>>> PCollection. I am thinking that Beam can guarantee it joins all input
>>> elements once per window for this case.
>>> The result of the join might be fuzzy for the window when the Trigger
>>> for PCollection2 fires and sideinput gets loaded into Memory.
>>>
>>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>>> BeamSql can support Pattern:
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>>> which is currently not possible.
>>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>>> BeamSql to natively support PCollectionView so that BeamSql supports
>>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>>> TableProvider.
>>>
>>> If we can support this, User will be able to do:
>>> PCollection mainStream = ...;
>>> PCollection lookupStream = ...;
>>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>>> new TupleTag("LookupTable"));
>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>
>>> Can we implement SideInputJoin for this case?
>>> I might be wrong in my understanding. Please let me know your thoughts.
>>>
>>> Thanks,
>>> Rahul
>>>
>>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles  wrote:
>>>
 I think the best way to approach this is probably to have an example
 SQL statement and to discuss what the relational semantics should be.

 Windowing is not really part of SQL (yet) and in a way it just needs
 very minimal extensions. See https://arxiv.org/abs/1905.12133. In this
 proposal for SQL, windowed ag

Re: Live fixing of a Beam bug on July 25 at 3:30pm-4:30pm PST

2019-07-25 Thread Pablo Estrada
Thanks for those who tuned in : ) - I feel like I might have spent too long
fiddling with Python code, and not long enough doing setup, testing, etc. I
will try to do another one where I just test / setup the environment / lint
checks etc.

Here are links for:
Setting up the Python environment: https://youtu.be/xpIpEO4PUDo?t=334
Quickly setting up the Java environment: https://youtu.be/xpIpEO4PUDo?t=3659

Doing a Pull Request: https://youtu.be/xpIpEO4PUDo?t=3770

On Thu, Jul 25, 2019 at 4:39 PM sridhar inuog 
wrote:

> Thanks, Pablo for organizing this session. I found it useful.
>
> On Thu, Jul 25, 2019 at 4:56 PM Pablo Estrada  wrote:
>
>> The link is here: https://www.youtube.com/watch?v=xpIpEO4PUDo
>> This is still happening.
>>
>> On Thu, Jul 25, 2019 at 2:55 PM Innocent Djiofack 
>> wrote:
>>
>>> Did I miss the link or this was postponed?
>>>
>>> On Tue, Jul 23, 2019 at 3:05 PM Austin Bennett <
>>> whatwouldausti...@gmail.com> wrote:
>>>
 Pablo,

 Assigned  https://issues.apache.org/jira/browse/BEAM-7607 to you, to
 make even more likely that it is still around on the 25th :-)

 Cheers,
 Austin

 On Tue, Jul 23, 2019 at 11:24 AM Pablo Estrada 
 wrote:

> Hi all,
> I've just realized that
> https://issues.apache.org/jira/browse/BEAM-7607 is a single-line
> change - and we'd spend 40 minutes chitchatting, so I'll also be working 
> on
> https://jira.apache.org/jira/browse/BEAM-7803, which is a Python
> issue (also for the BigQuery sink!).
> Thanks!
> -P.
>
> On Sat, Jul 20, 2019 at 2:05 PM Pablo Estrada 
> wrote:
>
>> Hello all,
>>
>> This will be streamed on youtube on this link:
>> https://www.youtube.com/watch?v=xpIpEO4PUDo
>>
>> I think there will be a live chat, so I will hopefully be available
>> to answer questions. To be honest, my workflow is not super efficient,
>> but... oh well, hopefully it will be at least somewhat helpful to others 
>> : )
>> Best
>> -P.
>>
>> On Thu, Jul 18, 2019 at 12:59 AM Tim Sell  wrote:
>>
>>> +1, I'd love to see this as a recording. Will you stick it up on
>>> youtube afterwards?
>>>
>>> On Thu, Jul 18, 2019 at 4:00 AM sridhar inuog <
>>> sridharin...@gmail.com> wrote:
>>>
 Thanks, Pablo! Looking forward to it! Hopefully, it will also be
 recorded as well.

 On Wed, Jul 17, 2019 at 2:50 PM Pablo Estrada 
 wrote:

> Yes! So I will be working on a small feature request for Java's
> BigQueryIO: https://issues.apache.org/jira/browse/BEAM-7607
>
> Maybe I'll do something for Python next month. : )
> Best
> -P.
>
> On Wed, Jul 17, 2019 at 12:32 PM Rakesh Kumar <
> rakeshku...@lyft.com> wrote:
>
>> +1, I really appreciate this initiative. It would be really
>> helpful newbies like me.
>>
>> Is it possible to list out what are the things that you are
>> planning to cover?
>>
>>
>>
>>
>> On Tue, Jul 16, 2019 at 11:19 AM Yichi Zhang 
>> wrote:
>>
>>> Thanks for organizing this Pablo, it'll be very helpful!
>>>
>>> On Tue, Jul 16, 2019 at 10:57 AM Pablo Estrada <
>>> pabl...@google.com> wrote:
>>>
 Hello all,
 I'll be having a session where I live-fix a Beam bug for 1 hour
 next week. Everyone is invited.

 It will be on July 25, between 3:30pm and 4:30pm PST. Hopefully
 I will finish a full change in that time frame, but we'll see.

 I have not yet decided if I will do this via hangouts, or via a
 youtube livestream. In any case, I will share the link here in the 
 next few
 days.

 I will most likely work on the Java SDK (I have a little
 feature request in mind).

 Thanks!
 -P.

>>>
>>>
>>> --
>>>
>>> *DJIOFACK INNOCENT*
>>> *"Be better than the day before!" -*
>>> *+1 404 751 8024*
>>>
>>


Re: Live fixing of a Beam bug on July 25 at 3:30pm-4:30pm PST

2019-07-25 Thread sridhar inuog
Thanks, Pablo for organizing this session. I found it useful.

On Thu, Jul 25, 2019 at 4:56 PM Pablo Estrada  wrote:

> The link is here: https://www.youtube.com/watch?v=xpIpEO4PUDo
> This is still happening.
>
> On Thu, Jul 25, 2019 at 2:55 PM Innocent Djiofack 
> wrote:
>
>> Did I miss the link or this was postponed?
>>
>> On Tue, Jul 23, 2019 at 3:05 PM Austin Bennett <
>> whatwouldausti...@gmail.com> wrote:
>>
>>> Pablo,
>>>
>>> Assigned  https://issues.apache.org/jira/browse/BEAM-7607 to you, to
>>> make even more likely that it is still around on the 25th :-)
>>>
>>> Cheers,
>>> Austin
>>>
>>> On Tue, Jul 23, 2019 at 11:24 AM Pablo Estrada 
>>> wrote:
>>>
 Hi all,
 I've just realized that https://issues.apache.org/jira/browse/BEAM-7607 is
 a single-line change - and we'd spend 40 minutes chitchatting, so I'll also
 be working on https://jira.apache.org/jira/browse/BEAM-7803, which is
 a Python issue (also for the BigQuery sink!).
 Thanks!
 -P.

 On Sat, Jul 20, 2019 at 2:05 PM Pablo Estrada 
 wrote:

> Hello all,
>
> This will be streamed on youtube on this link:
> https://www.youtube.com/watch?v=xpIpEO4PUDo
>
> I think there will be a live chat, so I will hopefully be available to
> answer questions. To be honest, my workflow is not super efficient, but...
> oh well, hopefully it will be at least somewhat helpful to others : )
> Best
> -P.
>
> On Thu, Jul 18, 2019 at 12:59 AM Tim Sell  wrote:
>
>> +1, I'd love to see this as a recording. Will you stick it up on
>> youtube afterwards?
>>
>> On Thu, Jul 18, 2019 at 4:00 AM sridhar inuog 
>> wrote:
>>
>>> Thanks, Pablo! Looking forward to it! Hopefully, it will also be
>>> recorded as well.
>>>
>>> On Wed, Jul 17, 2019 at 2:50 PM Pablo Estrada 
>>> wrote:
>>>
 Yes! So I will be working on a small feature request for Java's
 BigQueryIO: https://issues.apache.org/jira/browse/BEAM-7607

 Maybe I'll do something for Python next month. : )
 Best
 -P.

 On Wed, Jul 17, 2019 at 12:32 PM Rakesh Kumar 
 wrote:

> +1, I really appreciate this initiative. It would be really
> helpful newbies like me.
>
> Is it possible to list out what are the things that you are
> planning to cover?
>
>
>
>
> On Tue, Jul 16, 2019 at 11:19 AM Yichi Zhang 
> wrote:
>
>> Thanks for organizing this Pablo, it'll be very helpful!
>>
>> On Tue, Jul 16, 2019 at 10:57 AM Pablo Estrada <
>> pabl...@google.com> wrote:
>>
>>> Hello all,
>>> I'll be having a session where I live-fix a Beam bug for 1 hour
>>> next week. Everyone is invited.
>>>
>>> It will be on July 25, between 3:30pm and 4:30pm PST. Hopefully
>>> I will finish a full change in that time frame, but we'll see.
>>>
>>> I have not yet decided if I will do this via hangouts, or via a
>>> youtube livestream. In any case, I will share the link here in the 
>>> next few
>>> days.
>>>
>>> I will most likely work on the Java SDK (I have a little feature
>>> request in mind).
>>>
>>> Thanks!
>>> -P.
>>>
>>
>>
>> --
>>
>> *DJIOFACK INNOCENT*
>> *"Be better than the day before!" -*
>> *+1 404 751 8024*
>>
>


Re: [VOTE] Release 2.14.0, release candidate #1

2019-07-25 Thread Rui Wang
Tried to verify RC1 by running Nexmark on Dataflow but found it's broken
(at least based commands from Running+Nexmark
). Will
try to debug it and rerun the process.


-Rui

On Thu, Jul 25, 2019 at 2:39 PM Anton Kedin  wrote:

> Hi everyone,
> Please review and vote on the release candidate #3 for the version 2.14.0,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release to be deployed to dist.apache.org
> [2], which is signed with the key with fingerprint
> 89E2FFCAE7E99CF6E6827CFEF7349F2310FFB193 [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "v2.14.0-RC1" [5], [6]
> * website pull request listing the release [7], publishing the API
> reference manual [8].
> * Python artifacts are deployed along with the source release to the
> dist.apache.org [2].
> * Validation sheet with a tab for 2.14.0 release to help with validation
> [9].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Anton
>
> [1]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12345431
> [2] https://dist.apache.org/repos/dist/dev/beam/2.14.0/
> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
> [4] https://repository.apache.org/content/repositories/orgapachebeam-1080/
> [5] https://github.com/apache/beam/tree/v2.14.0-RC1
> [6] https://github.com/apache/beam/tags
> [7] https://github.com/apache/beam/pull/9157
> [8] https://github.com/apache/beam-site/pull/591/
> [9] https://s.apache.org/beam-release-validation#gid=1082148452
>


Re: Live fixing of a Beam bug on July 25 at 3:30pm-4:30pm PST

2019-07-25 Thread Pablo Estrada
The link is here: https://www.youtube.com/watch?v=xpIpEO4PUDo
This is still happening.

On Thu, Jul 25, 2019 at 2:55 PM Innocent Djiofack 
wrote:

> Did I miss the link or this was postponed?
>
> On Tue, Jul 23, 2019 at 3:05 PM Austin Bennett <
> whatwouldausti...@gmail.com> wrote:
>
>> Pablo,
>>
>> Assigned  https://issues.apache.org/jira/browse/BEAM-7607 to you, to
>> make even more likely that it is still around on the 25th :-)
>>
>> Cheers,
>> Austin
>>
>> On Tue, Jul 23, 2019 at 11:24 AM Pablo Estrada 
>> wrote:
>>
>>> Hi all,
>>> I've just realized that https://issues.apache.org/jira/browse/BEAM-7607 is
>>> a single-line change - and we'd spend 40 minutes chitchatting, so I'll also
>>> be working on https://jira.apache.org/jira/browse/BEAM-7803, which is a
>>> Python issue (also for the BigQuery sink!).
>>> Thanks!
>>> -P.
>>>
>>> On Sat, Jul 20, 2019 at 2:05 PM Pablo Estrada 
>>> wrote:
>>>
 Hello all,

 This will be streamed on youtube on this link:
 https://www.youtube.com/watch?v=xpIpEO4PUDo

 I think there will be a live chat, so I will hopefully be available to
 answer questions. To be honest, my workflow is not super efficient, but...
 oh well, hopefully it will be at least somewhat helpful to others : )
 Best
 -P.

 On Thu, Jul 18, 2019 at 12:59 AM Tim Sell  wrote:

> +1, I'd love to see this as a recording. Will you stick it up on
> youtube afterwards?
>
> On Thu, Jul 18, 2019 at 4:00 AM sridhar inuog 
> wrote:
>
>> Thanks, Pablo! Looking forward to it! Hopefully, it will also be
>> recorded as well.
>>
>> On Wed, Jul 17, 2019 at 2:50 PM Pablo Estrada 
>> wrote:
>>
>>> Yes! So I will be working on a small feature request for Java's
>>> BigQueryIO: https://issues.apache.org/jira/browse/BEAM-7607
>>>
>>> Maybe I'll do something for Python next month. : )
>>> Best
>>> -P.
>>>
>>> On Wed, Jul 17, 2019 at 12:32 PM Rakesh Kumar 
>>> wrote:
>>>
 +1, I really appreciate this initiative. It would be really helpful
 newbies like me.

 Is it possible to list out what are the things that you are
 planning to cover?




 On Tue, Jul 16, 2019 at 11:19 AM Yichi Zhang 
 wrote:

> Thanks for organizing this Pablo, it'll be very helpful!
>
> On Tue, Jul 16, 2019 at 10:57 AM Pablo Estrada 
> wrote:
>
>> Hello all,
>> I'll be having a session where I live-fix a Beam bug for 1 hour
>> next week. Everyone is invited.
>>
>> It will be on July 25, between 3:30pm and 4:30pm PST. Hopefully I
>> will finish a full change in that time frame, but we'll see.
>>
>> I have not yet decided if I will do this via hangouts, or via a
>> youtube livestream. In any case, I will share the link here in the 
>> next few
>> days.
>>
>> I will most likely work on the Java SDK (I have a little feature
>> request in mind).
>>
>> Thanks!
>> -P.
>>
>
>
> --
>
> *DJIOFACK INNOCENT*
> *"Be better than the day before!" -*
> *+1 404 751 8024*
>


Re: Live fixing of a Beam bug on July 25 at 3:30pm-4:30pm PST

2019-07-25 Thread Innocent Djiofack
Did I miss the link or this was postponed?

On Tue, Jul 23, 2019 at 3:05 PM Austin Bennett 
wrote:

> Pablo,
>
> Assigned  https://issues.apache.org/jira/browse/BEAM-7607 to you, to make
> even more likely that it is still around on the 25th :-)
>
> Cheers,
> Austin
>
> On Tue, Jul 23, 2019 at 11:24 AM Pablo Estrada  wrote:
>
>> Hi all,
>> I've just realized that https://issues.apache.org/jira/browse/BEAM-7607 is
>> a single-line change - and we'd spend 40 minutes chitchatting, so I'll also
>> be working on https://jira.apache.org/jira/browse/BEAM-7803, which is a
>> Python issue (also for the BigQuery sink!).
>> Thanks!
>> -P.
>>
>> On Sat, Jul 20, 2019 at 2:05 PM Pablo Estrada  wrote:
>>
>>> Hello all,
>>>
>>> This will be streamed on youtube on this link:
>>> https://www.youtube.com/watch?v=xpIpEO4PUDo
>>>
>>> I think there will be a live chat, so I will hopefully be available to
>>> answer questions. To be honest, my workflow is not super efficient, but...
>>> oh well, hopefully it will be at least somewhat helpful to others : )
>>> Best
>>> -P.
>>>
>>> On Thu, Jul 18, 2019 at 12:59 AM Tim Sell  wrote:
>>>
 +1, I'd love to see this as a recording. Will you stick it up on
 youtube afterwards?

 On Thu, Jul 18, 2019 at 4:00 AM sridhar inuog 
 wrote:

> Thanks, Pablo! Looking forward to it! Hopefully, it will also be
> recorded as well.
>
> On Wed, Jul 17, 2019 at 2:50 PM Pablo Estrada 
> wrote:
>
>> Yes! So I will be working on a small feature request for Java's
>> BigQueryIO: https://issues.apache.org/jira/browse/BEAM-7607
>>
>> Maybe I'll do something for Python next month. : )
>> Best
>> -P.
>>
>> On Wed, Jul 17, 2019 at 12:32 PM Rakesh Kumar 
>> wrote:
>>
>>> +1, I really appreciate this initiative. It would be really helpful
>>> newbies like me.
>>>
>>> Is it possible to list out what are the things that you are planning
>>> to cover?
>>>
>>>
>>>
>>>
>>> On Tue, Jul 16, 2019 at 11:19 AM Yichi Zhang 
>>> wrote:
>>>
 Thanks for organizing this Pablo, it'll be very helpful!

 On Tue, Jul 16, 2019 at 10:57 AM Pablo Estrada 
 wrote:

> Hello all,
> I'll be having a session where I live-fix a Beam bug for 1 hour
> next week. Everyone is invited.
>
> It will be on July 25, between 3:30pm and 4:30pm PST. Hopefully I
> will finish a full change in that time frame, but we'll see.
>
> I have not yet decided if I will do this via hangouts, or via a
> youtube livestream. In any case, I will share the link here in the 
> next few
> days.
>
> I will most likely work on the Java SDK (I have a little feature
> request in mind).
>
> Thanks!
> -P.
>


-- 

*DJIOFACK INNOCENT*
*"Be better than the day before!" -*
*+1 404 751 8024*


[VOTE] Release 2.14.0, release candidate #1

2019-07-25 Thread Anton Kedin
Hi everyone,
Please review and vote on the release candidate #3 for the version 2.14.0,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org [2],
which is signed with the key with fingerprint
89E2FFCAE7E99CF6E6827CFEF7349F2310FFB193 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "v2.14.0-RC1" [5], [6]
* website pull request listing the release [7], publishing the API
reference manual [8].
* Python artifacts are deployed along with the source release to the
dist.apache.org [2].
* Validation sheet with a tab for 2.14.0 release to help with validation
[9].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Anton

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12345431
[2] https://dist.apache.org/repos/dist/dev/beam/2.14.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4] https://repository.apache.org/content/repositories/orgapachebeam-1080/
[5] https://github.com/apache/beam/tree/v2.14.0-RC1
[6] https://github.com/apache/beam/tags
[7] https://github.com/apache/beam/pull/9157
[8] https://github.com/apache/beam-site/pull/591/
[9] https://s.apache.org/beam-release-validation#gid=1082148452


Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-25 Thread Rui Wang
To be more clear, I think it's useful if we can achieve the following that
you wrote

PCollection mainStream = ...;
PCollection lookupStream = ...;
PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"), new
TupleTag("LookupTable"));
tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));

-Rui

On Thu, Jul 25, 2019 at 1:56 PM Rui Wang  wrote:

> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes the
> slow changing table join problem.
>
> To your question: "Can we implement SideInputJoin for this case", there
> are two perspectives.
>
> In terms of implementing the slowing changing lookup cache pattern
> 
>  in
> BeamSQL, such sidinput join can be done that way. At least it worth
> exploring it until we identify blockers. I also think this pattern is
> already useful to users.
>
> In terms of Join schematic, I think it's hard to reason data completeness
> since one side of join is changing.
>
> -Rui
>
>
> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari 
> wrote:
>
>> Hi Kenn,
>>
>> If we consider the following two *Unbounded* PCollections:
>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>> coincidentally turned out to be the opposite
>>
>> Joining these two PCollections in BeamSql currently is not possible
>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>> Mismatch)
>> But in this case, PCollection1 can be joined with PCollection2 using
>> SideInputJoin (
>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>> which is being done for Joining an Unbounded PCollection with Bounded
>> PCollection. I am thinking that Beam can guarantee it joins all input
>> elements once per window for this case.
>> The result of the join might be fuzzy for the window when the Trigger for
>> PCollection2 fires and sideinput gets loaded into Memory.
>>
>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>> BeamSql can support Pattern:
>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>> which is currently not possible.
>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>> BeamSql to natively support PCollectionView so that BeamSql supports
>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>> TableProvider.
>>
>> If we can support this, User will be able to do:
>> PCollection mainStream = ...;
>> PCollection lookupStream = ...;
>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>> new TupleTag("LookupTable"));
>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>
>> Can we implement SideInputJoin for this case?
>> I might be wrong in my understanding. Please let me know your thoughts.
>>
>> Thanks,
>> Rahul
>>
>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles  wrote:
>>
>>> I think the best way to approach this is probably to have an example SQL
>>> statement and to discuss what the relational semantics should be.
>>>
>>> Windowing is not really part of SQL (yet) and in a way it just needs
>>> very minimal extensions. See https://arxiv.org/abs/1905.12133. In this
>>> proposal for SQL, windowed aggregation is explicitly be part of the GROUP
>>> BY operation, where you GROUP BY window columns that were added. So it is
>>> more explicit than in Beam. Relations do not have a WindowFn so there is no
>>> problem of them being incompatible.
>>>
>>> With Beam SQL there are basically two ways of windowing that work
>>> totally differently:
>>>
>>> 1. SQL style windowing where you GROUP BY windows. This does not use the
>>> input PCollection windowfn
>>> 2. PCollection windowing where the SQL does not do any windowing - this
>>> should apply the SQL expression to each window independently
>>>
>>> In order to support a hybrid of these, it might be:
>>>
>>> 3. SQL style windowing, where when a PCollection has window assigned,
>>> the window columns are added before the SQL is applied. It is a bit strange
>>> but might enable your use.
>>>
>>> Kenn
>>>
>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>>> rahulpatwari8...@gmail.com> wrote:
>>>
 Hi,

 Beam currently doesn't support Join of Unbounded PCollections of
 different WindowFns (
 https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
 ).

 BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
 performing 'SideInputJoin' with Bounded PCollection as a SideInput.

 Can we support [Unbounded PCollection] JOIN [Unbounded PCollection],
 when one of the Unbounded PCollection has [GlobalWindows Applied with
 Non-Default Trigger(probably

Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread Robert Bradshaw
On Thu, Jul 25, 2019 at 6:34 PM rahul patwari
 wrote:
>
> So, If an RPC call has to be performed for a batch of Rows(PCollection), 
> instead of each Row, the recommended way is to batch the Rows in 
> startBundle() of 
> DoFn(https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?

Yes.

> I thought Stateful and Timely Processing could be helpful here.

The upside is that you can persist state across bundles (which is
especially helpful when bundles are small, e.g. for streaming
pipelines). The downside is that you can't persist state across keys
(and it also enforces a shuffle to colocate the data by key).

If you get to choose your keys, you would want to have about as many
keys as you have concurrent bundles (or some small multiple, to ensure
they're not lumpily distributed). Keying by something like
System.identityHashCode(this) in the body of a DoFn might be
sufficient.

> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw  wrote:
>>
>> Though it's not obvious in the name, Stateful ParDos can only be
>> applied to keyed PCollections, similar to GroupByKey. (You could,
>> however, assign every element to the same key and then apply a
>> Stateful DoFn, though in that case all elements would get processed on
>> the same worker.)
>>
>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>>  wrote:
>> >
>> > Hi,
>> >
>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an 
>> > example of assigning an arbitrary-but-consistent index to each element on 
>> > a per key-and-window basis.
>> >
>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say, 
>> > PCollection with Fixed Windows, the state is maintained per window 
>> > and every element in the window will be assigned a consistent index?
>> > Does this mean every element belonging to the window will be processed in 
>> > a single DoFn Instance, which otherwise could have been done in multiple 
>> > parallel instances, limiting performance?
>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?
>> >
>> > Thanks,
>> > Rahul


Re: Sort Merge Bucket - Action Items

2019-07-25 Thread Eugene Kirpichov
Hi Gleb,

Regarding the future of io.Read: ideally things would go as follows
- All runners support SDF at feature parity with Read (mostly this is just
the Dataflow runner's liquid sharding and size estimation for bounded
sources, and backlog for unbounded sources, but I recall that a couple of
other runners also used size estimation)
- Bounded/UnboundedSource APIs are declared "deprecated" - it is forbidden
to add any new implementations to SDK, and users shouldn't use them either
(note: I believe it's already effectively forbidden to use them for cases
where a DoFn/SDF at the current level of support will be sufficient)
- People one by one rewrite existing Bounded/UnboundedSource based
PTransforms in the SDK to use SDFs instead
- Read.from() is rewritten to use a wrapper SDF over the given Source, and
explicit support for Read is deleted from runners
- In the next major version of Beam - presumably 3.0 - the Read transform
itself is deleted

I don't know what's the current status of SDF/Read feature parity, maybe
Luke or Cham can comment. An alternative path is offered in
http://s.apache.org/sdf-via-source.


On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov  wrote:

> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going
> away in favor of SDF, or we are always going to have both?
>
> I was looking into AvroIO.read and AvroIO.readAll, both of them
> use AvroSource. AvroIO.readAll is using SDF, and it's implemented with
> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at
> ReadAllViaFileBasedSource I find it not necessary to use Source, it
> should be enough to have something like (KV,
> OutputReceiver), as we have discussed in this thread, and that should be
> fine for SMB as well. It would require duplicating code from AvroSource,
> but in the end, I don't see it as a problem if AvroSource is going away.
>
> I'm attaching a small diagram I put for myself to better understand the
> code.
>
> AvroIO.readAll :: PTransform> ->
>
> FileIO.matchAll :: PTransform,
> PCollection>
> FileIO.readMatches :: PTransform,
> PCollection>
> AvroIO.readFiles :: PTransform,
> PCollection> ->
>
> ReadAllViaFileBasedSource :: PTransform,
> PCollection> ->
>
> ParDo.of(SplitIntoRangesFn :: DoFn OffsetRange>>) (splittable do fn)
>
> Reshuffle.viaRandomKey()
>
> ParDo.of(ReadFileRangesFn(createSource) :: DoFn OffsetRange>, T>) where
>
> createSource :: String -> FileBasedSource
>
> createSource = AvroSource
>
>
> AvroIO.read without getHintMatchedManyFiles() :: PTransform PCollection> ->
>
> Read.Bounded.from(createSource) where
>
> createSource :: String -> FileBasedSource
>
> createSource = AvroSource
>
>
> Gleb
>
>
> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw 
> wrote:
>
>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles  wrote:
>> >
>> > From the peanut gallery, keeping a separate implementation for SMB
>> seems fine. Dependencies are serious liabilities for both upstream and
>> downstream. It seems like the reuse angle is generating extra work, and
>> potentially making already-complex implementations more complex, instead of
>> helping things.
>>
>> +1
>>
>> To be clear, what I care about is that WriteFiles(X) and
>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
>> TFRecord, ...}. In other words composability of the API (vs. manually
>> filling out the matrix). If WriteFiles and WriteSmbFiles find
>> opportunities for (easy, clean) implementation sharing, that'd be
>> nice, but not the primary goal.
>>
>> (Similarly for reading, though that's seem less obvious. Certainly
>> whatever T is useful for ReadSmb(T) could be useful for a
>> (non-liquid-shading) ReadAll(T) however.)
>>
>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li 
>> wrote:
>> >>
>> >> I spoke too soon. Turns out for unsharded writes, numShards can't be
>> determined until the last finalize transform, which is again different from
>> the current SMB proposal (static number of buckets & shards).
>> >> I'll end up with more code specialized for SMB in order to generalize
>> existing sink code, which I think we all want to avoid.
>> >>
>> >> Seems the only option is duplicating some logic like temp file
>> handling, which is exactly what we did in the original PR.
>> >> I can reuse Compression & Sink for file level writes but that seems
>> about the most I can reuse right now.
>> >>
>> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li 
>> wrote:
>> >>>
>> >>> So I spent one afternoon trying some ideas for reusing the last few
>> transforms WriteFiles.
>> >>>
>> >>> WriteShardsIntoTempFilesFn extends DoFn,
>> Iterable>, FileResult>
>> >>> => GatherResults extends PTransform,
>> PCollection>>
>> >>> => FinalizeTempFileBundles extends
>> PTransform>>,
>> WriteFilesResult>
>> >>>
>> >>> I replaced FileResult with KV
>> so I can use pre-compute SMB destination file names for the transforms.
>> >>> I'm also thinking of parameterizing ShardedKey for SMB's
>> bucket/shard to reuse WriteShardsIntoTempFil

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-25 Thread Rui Wang
Hi Rahul, thanks for your detailed writeup. It pretty much summarizes the
slow changing table join problem.

To your question: "Can we implement SideInputJoin for this case", there are
two perspectives.

In terms of implementing the slowing changing lookup cache pattern

in
BeamSQL, such sidinput join can be done that way. At least it worth
exploring it until we identify blockers. I also think this pattern is
already useful to users.

In terms of Join schematic, I think it's hard to reason data completeness
since one side of join is changing.

-Rui


On Thu, Jul 25, 2019 at 12:55 PM rahul patwari 
wrote:

> Hi Kenn,
>
> If we consider the following two *Unbounded* PCollections:
> - PCollection1 => [*Non-Global* Window with Default Trigger]
> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
> coincidentally turned out to be the opposite
>
> Joining these two PCollections in BeamSql currently is not possible
> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
> Mismatch)
> But in this case, PCollection1 can be joined with PCollection2 using
> SideInputJoin (
> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
> which is being done for Joining an Unbounded PCollection with Bounded
> PCollection. I am thinking that Beam can guarantee it joins all input
> elements once per window for this case.
> The result of the join might be fuzzy for the window when the Trigger for
> PCollection2 fires and sideinput gets loaded into Memory.
>
> PCollection2 can be considered as Slowly Changing Lookup Cache and BeamSql
> can support Pattern:
> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
> which is currently not possible.
> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for BeamSql
> to natively support PCollectionView so that BeamSql supports "Slowly
> Updating Global Window Sideinput Pattern" using SqlTransform's
> TableProvider.
>
> If we can support this, User will be able to do:
> PCollection mainStream = ...;
> PCollection lookupStream = ...;
> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
> new TupleTag("LookupTable"));
> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>
> Can we implement SideInputJoin for this case?
> I might be wrong in my understanding. Please let me know your thoughts.
>
> Thanks,
> Rahul
>
> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles  wrote:
>
>> I think the best way to approach this is probably to have an example SQL
>> statement and to discuss what the relational semantics should be.
>>
>> Windowing is not really part of SQL (yet) and in a way it just needs very
>> minimal extensions. See https://arxiv.org/abs/1905.12133. In this
>> proposal for SQL, windowed aggregation is explicitly be part of the GROUP
>> BY operation, where you GROUP BY window columns that were added. So it is
>> more explicit than in Beam. Relations do not have a WindowFn so there is no
>> problem of them being incompatible.
>>
>> With Beam SQL there are basically two ways of windowing that work totally
>> differently:
>>
>> 1. SQL style windowing where you GROUP BY windows. This does not use the
>> input PCollection windowfn
>> 2. PCollection windowing where the SQL does not do any windowing - this
>> should apply the SQL expression to each window independently
>>
>> In order to support a hybrid of these, it might be:
>>
>> 3. SQL style windowing, where when a PCollection has window assigned, the
>> window columns are added before the SQL is applied. It is a bit strange but
>> might enable your use.
>>
>> Kenn
>>
>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>> rahulpatwari8...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Beam currently doesn't support Join of Unbounded PCollections of
>>> different WindowFns (
>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>> ).
>>>
>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
>>> performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>>
>>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection],
>>> when one of the Unbounded PCollection has [GlobalWindows Applied with
>>> Non-Default Trigger(probably a slow-changing lookup cache
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>> by performing 'SideInputJoin'?
>>>
>>> Regards,
>>> Rahul
>>>
>>


Re: Sort Merge Bucket - Action Items

2019-07-25 Thread Claire McGinty
As far as I/O code re-use, the consensus seems to be to make the SMB module
as composable as possible using existing Beam components, ideally as-is or
with very basic tweaks.

To be clear, what I care about is that WriteFiles(X) and
> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
> TFRecord, ...}. In other words composability of the API (vs. manually
> filling out the matrix). If WriteFiles and WriteSmbFiles find
> opportunities for (easy, clean) implementation sharing, that'd be
> nice, but not the primary goal.


 For SMB writes, it's a pretty easy change to parameterize them by
FileIO.Sink, for which there are already public implementations for
Avro/TFRecord/Parquet/Text! It'll remove a lot of code duplication from the
smb module.

(Similarly for reading, though that's seem less obvious. Certainly
> whatever T is useful for ReadSmb(T) could be useful for a
> (non-liquid-shading) ReadAll(T) however.)


It seems like there isn't an easily composable equivalent for Reader that
isn't coupled with FileBasedSource (+1 on Gleb's question about the
long-term future of org.apache.beam.sdk.io.Read). One thing we could do to
improve parity between SMB reads and Beam's io module is to use ReadableFile

as
our file handles (currently we just use plain ResourceIds + FileSystems api
to open), so our Source transform would more closely resemble the existing
ReadFiles transforms
.
ReadableFile also brings in io.Compression

and
handles opening a ReadableByteChannel for us. We'd still be re-implementing
the I/O operations that deserialize a bytestream into individual elements,
but this seems unavoidable for the time being.

Let me know what you think about these proposed modifications to SMB
read/write!

Thanks,
Claire

On Thu, Jul 25, 2019 at 9:39 AM Gleb Kanterov  wrote:

> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going
> away in favor of SDF, or we are always going to have both?
>
> I was looking into AvroIO.read and AvroIO.readAll, both of them
> use AvroSource. AvroIO.readAll is using SDF, and it's implemented with
> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at
> ReadAllViaFileBasedSource I find it not necessary to use Source, it
> should be enough to have something like (KV,
> OutputReceiver), as we have discussed in this thread, and that should be
> fine for SMB as well. It would require duplicating code from AvroSource,
> but in the end, I don't see it as a problem if AvroSource is going away.
>
> I'm attaching a small diagram I put for myself to better understand the
> code.
>
> AvroIO.readAll :: PTransform> ->
>
> FileIO.matchAll :: PTransform,
> PCollection>
> FileIO.readMatches :: PTransform,
> PCollection>
> AvroIO.readFiles :: PTransform,
> PCollection> ->
>
> ReadAllViaFileBasedSource :: PTransform,
> PCollection> ->
>
> ParDo.of(SplitIntoRangesFn :: DoFn OffsetRange>>) (splittable do fn)
>
> Reshuffle.viaRandomKey()
>
> ParDo.of(ReadFileRangesFn(createSource) :: DoFn OffsetRange>, T>) where
>
> createSource :: String -> FileBasedSource
>
> createSource = AvroSource
>
>
> AvroIO.read without getHintMatchedManyFiles() :: PTransform PCollection> ->
>
> Read.Bounded.from(createSource) where
>
> createSource :: String -> FileBasedSource
>
> createSource = AvroSource
>
>
> Gleb
>
>
> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw 
> wrote:
>
>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles  wrote:
>> >
>> > From the peanut gallery, keeping a separate implementation for SMB
>> seems fine. Dependencies are serious liabilities for both upstream and
>> downstream. It seems like the reuse angle is generating extra work, and
>> potentially making already-complex implementations more complex, instead of
>> helping things.
>>
>> +1
>>
>> To be clear, what I care about is that WriteFiles(X) and
>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
>> TFRecord, ...}. In other words composability of the API (vs. manually
>> filling out the matrix). If WriteFiles and WriteSmbFiles find
>> opportunities for (easy, clean) implementation sharing, that'd be
>> nice, but not the primary goal.
>>
>> (Similarly for reading, though that's seem less obvious. Certainly
>> whatever T is useful for ReadSmb(T) could be useful for a
>> (non-liquid-shading) ReadAll(T) however.)
>>
>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li 
>> wrote:
>> >>
>> >> I spoke too soon. Turns out for unsharded writes, numShards can't be
>> determined until the last finalize transform, which is again different from
>> the current SMB proposal (static number

Re: [2.14.0] Release Progress Update

2019-07-25 Thread Anton Kedin
Planning to send out the RC1 within the next couple of hours.

Regards,
Anton

On Thu, Jul 25, 2019 at 1:21 PM Pablo Estrada  wrote:

> Hi Anton,
> are there updates on the release?
> Thanks!
> -P.
>
> On Fri, Jul 19, 2019 at 12:33 PM Anton Kedin  wrote:
>
>> Verification build succeeds except for AWS IO (which has tests hanging).
>> I will continue the release process as normal and will investigate the AWS
>> IO issue meanwhile. Will either disable the hanging tests to get the
>> artifacts for an RC or will continue without it temporarily, will need to
>> re-validate it when the issue is resolved.
>>
>> Regards,
>> Anton
>>
>> On Thu, Jul 18, 2019 at 8:54 AM Anton Kedin  wrote:
>>
>>> All cherry-picks are merged, blocker jiras closed, running the
>>> verification build.
>>>
>>> On Mon, Jul 15, 2019 at 4:53 PM Ahmet Altay  wrote:
>>>
 Anton, any updates on this release? Do you need help?

 On Fri, Jun 28, 2019 at 11:42 AM Anton Kedin  wrote:

> I have been running validation builds (had some hickups with that),
> everything looks mostly good, except failures in `:beam-test-tools` and
> `:io:aws`. Now I will start cherry-picking other fixes and trying to 
> figure
> the specific issues out.
>
> Regards,
> Anton
>
> On Fri, Jun 21, 2019 at 3:17 PM Anton Kedin  wrote:
>
>> Not much progress today. Debugging build issues when running global
>> `./gradlew build -PisRelease --scan`
>>
>> Regards,
>> Anton
>>
>> On Thu, Jun 20, 2019 at 4:12 PM Anton Kedin  wrote:
>>
>>> Published the snapshots, working through the
>>> verify_release_validation script
>>>
>>> Got another blocker to be cherry-picked when merged:
>>> https://issues.apache.org/jira/browse/BEAM-7603
>>>
>>> Regards,
>>> Anton
>>>
>>>
>>> On Wed, Jun 19, 2019 at 4:17 PM Anton Kedin 
>>> wrote:
>>>
 I have cut the release branch for 2.14.0 and working through the
 release process. Next step is building the snapshot and release branch
 verification.

 There are two issues [1] that are still not resolved that are
 marked as blockers at the moment:
  * [2] BEAM-7478 - remote cluster submission from Flink Runner
 broken;
  * [3] BEAM-7424 - retries for GCS;

 [1]
 https://issues.apache.org/jira/browse/BEAM-7478?jql=project%20%3D%20BEAM%20%20AND%20fixVersion%20%3D%202.14.0%20AND%20status%20!%3D%20Closed%20AND%20status%20!%3DResolved
 [2] https://issues.apache.org/jira/browse/BEAM-7478
 [3] https://issues.apache.org/jira/browse/BEAM-7424

 Regards,
 Anton

>>>


Re: [2.14.0] Release Progress Update

2019-07-25 Thread Pablo Estrada
Hi Anton,
are there updates on the release?
Thanks!
-P.

On Fri, Jul 19, 2019 at 12:33 PM Anton Kedin  wrote:

> Verification build succeeds except for AWS IO (which has tests hanging). I
> will continue the release process as normal and will investigate the AWS IO
> issue meanwhile. Will either disable the hanging tests to get the artifacts
> for an RC or will continue without it temporarily, will need to re-validate
> it when the issue is resolved.
>
> Regards,
> Anton
>
> On Thu, Jul 18, 2019 at 8:54 AM Anton Kedin  wrote:
>
>> All cherry-picks are merged, blocker jiras closed, running the
>> verification build.
>>
>> On Mon, Jul 15, 2019 at 4:53 PM Ahmet Altay  wrote:
>>
>>> Anton, any updates on this release? Do you need help?
>>>
>>> On Fri, Jun 28, 2019 at 11:42 AM Anton Kedin  wrote:
>>>
 I have been running validation builds (had some hickups with that),
 everything looks mostly good, except failures in `:beam-test-tools` and
 `:io:aws`. Now I will start cherry-picking other fixes and trying to figure
 the specific issues out.

 Regards,
 Anton

 On Fri, Jun 21, 2019 at 3:17 PM Anton Kedin  wrote:

> Not much progress today. Debugging build issues when running global
> `./gradlew build -PisRelease --scan`
>
> Regards,
> Anton
>
> On Thu, Jun 20, 2019 at 4:12 PM Anton Kedin  wrote:
>
>> Published the snapshots, working through the
>> verify_release_validation script
>>
>> Got another blocker to be cherry-picked when merged:
>> https://issues.apache.org/jira/browse/BEAM-7603
>>
>> Regards,
>> Anton
>>
>>
>> On Wed, Jun 19, 2019 at 4:17 PM Anton Kedin  wrote:
>>
>>> I have cut the release branch for 2.14.0 and working through the
>>> release process. Next step is building the snapshot and release branch
>>> verification.
>>>
>>> There are two issues [1] that are still not resolved that are marked
>>> as blockers at the moment:
>>>  * [2] BEAM-7478 - remote cluster submission from Flink Runner
>>> broken;
>>>  * [3] BEAM-7424 - retries for GCS;
>>>
>>> [1]
>>> https://issues.apache.org/jira/browse/BEAM-7478?jql=project%20%3D%20BEAM%20%20AND%20fixVersion%20%3D%202.14.0%20AND%20status%20!%3D%20Closed%20AND%20status%20!%3DResolved
>>> [2] https://issues.apache.org/jira/browse/BEAM-7478
>>> [3] https://issues.apache.org/jira/browse/BEAM-7424
>>>
>>> Regards,
>>> Anton
>>>
>>


Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-25 Thread rahul patwari
Hi Kenn,

If we consider the following two *Unbounded* PCollections:
- PCollection1 => [*Non-Global* Window with Default Trigger]
- PCollection2 => [Global Window with *Non-Default* Trigger] :)
coincidentally turned out to be the opposite

Joining these two PCollections in BeamSql currently is not possible because
of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn Mismatch)
But in this case, PCollection1 can be joined with PCollection2 using
SideInputJoin (
https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
which is being done for Joining an Unbounded PCollection with Bounded
PCollection. I am thinking that Beam can guarantee it joins all input
elements once per window for this case.
The result of the join might be fuzzy for the window when the Trigger for
PCollection2 fires and sideinput gets loaded into Memory.

PCollection2 can be considered as Slowly Changing Lookup Cache and BeamSql
can support Pattern:
https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
which is currently not possible.
I am working on https://jira.apache.org/jira/browse/BEAM-7758 for BeamSql
to natively support PCollectionView so that BeamSql supports "Slowly
Updating Global Window Sideinput Pattern" using SqlTransform's
TableProvider.

If we can support this, User will be able to do:
PCollection mainStream = ...;
PCollection lookupStream = ...;
PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"), new
TupleTag("LookupTable"));
tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));

Can we implement SideInputJoin for this case?
I might be wrong in my understanding. Please let me know your thoughts.

Thanks,
Rahul

On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles  wrote:

> I think the best way to approach this is probably to have an example SQL
> statement and to discuss what the relational semantics should be.
>
> Windowing is not really part of SQL (yet) and in a way it just needs very
> minimal extensions. See https://arxiv.org/abs/1905.12133. In this
> proposal for SQL, windowed aggregation is explicitly be part of the GROUP
> BY operation, where you GROUP BY window columns that were added. So it is
> more explicit than in Beam. Relations do not have a WindowFn so there is no
> problem of them being incompatible.
>
> With Beam SQL there are basically two ways of windowing that work totally
> differently:
>
> 1. SQL style windowing where you GROUP BY windows. This does not use the
> input PCollection windowfn
> 2. PCollection windowing where the SQL does not do any windowing - this
> should apply the SQL expression to each window independently
>
> In order to support a hybrid of these, it might be:
>
> 3. SQL style windowing, where when a PCollection has window assigned, the
> window columns are added before the SQL is applied. It is a bit strange but
> might enable your use.
>
> Kenn
>
> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari 
> wrote:
>
>> Hi,
>>
>> Beam currently doesn't support Join of Unbounded PCollections of
>> different WindowFns (
>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>> ).
>>
>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
>> performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>
>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection], when
>> one of the Unbounded PCollection has [GlobalWindows Applied with
>> Non-Default Trigger(probably a slow-changing lookup cache
>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>> by performing 'SideInputJoin'?
>>
>> Regards,
>> Rahul
>>
>


Re: [PROPOSAL] Revised streaming extensions for Beam SQL

2019-07-25 Thread Kenneth Knowles
We hope it does enter the SQL standard. It is one reason for coming
together to write this paper.

OVER clause is mentioned often.

 - TUMBLE can actually just be a function so you don't need OVER or any of
the fancy stuff we propose; it is just done to make them all look similar
 - HOP still doesn't work since OVER clause has one value per input row, it
is still 1 to 1 input/output ratio
 - SESSION GAP 5 MINUTES (PARTITION BY key) is actually a natural syntax
that could work well

None of them require ORDER, by design.

On the other hand, implementing the general OVER clause and the rank,
running sum, etc, could be done with GBK + sort values. That is not related
to windowing. And since in SQL users of windowing will think of OVER as
related to ordering, I personally don't want to also use it for something
that has nothing to do with ordering.

But if you would write up something that could be interesting to discuss
more.

Kenn

On Wed, Jul 24, 2019 at 2:24 PM Mingmin Xu  wrote:

> +1 to remove those magic words in Calcite streaming SQL, just because
> they're not SQL standard. The idea to replace HOP/TUMBLE with
> table-view-functions makes it concise, my only question is, is it(or will
> it be) part of SQL standard? --I'm a big fan to align with standards :lol
>
> Ps, although the concept of `window` used here are different from window
> function in SQL, the syntax gives some insight. Take the example of 
> `ROW_NUMBER()
> OVER (PARTITION BY COL1 ORDER BY COL2) AS row_number`, `ROW_NUMBER()`
> assigns a sequence value for records in subgroup with key 'COL1'. We can
> introduce another function, like TUMBLE() which will assign a window
> instance(more instances for HOP()) for the record.
>
> Mingmin
>
>
> On Sun, Jul 21, 2019 at 9:42 PM Manu Zhang 
> wrote:
>
>> Thanks Kenn,
>> great paper and left some newbie questions on the proposal.
>>
>> Manu
>>
>> On Fri, Jul 19, 2019 at 1:51 AM Kenneth Knowles  wrote:
>>
>>> Hi all,
>>>
>>> I recently had the great privilege to work with others from Beam plus
>>> Calcite and Flink SQL contributors to build a new and minimal proposal for
>>> adding streaming extensions to standard SQL: event time, watermarks,
>>> windowing, triggers, stream materialization.
>>>
>>> We hope this will influence the standard body and also Calcite and Flink
>>> and other projects working on the streaming SQL.
>>>
>>> I would like to start implementing these extensions in Beam, moving from
>>> our current streaming extensions to the new proposal.
>>>
>>>The whole paper is https://arxiv.org/abs/1905.12133
>>>
>>>My small proposal to start in Beam:
>>> https://s.apache.org/streaming-beam-sql
>>>
>>> TL;DR: replace `GROUP BY Tumble/Hop/Session` with table functions that
>>> do Tumble, Hop, Session. The details of why to make this change are
>>> explained in the appendix to my proposal. For the big picture of how it
>>> fits in, the full paper is best.
>>>
>>> Kenn
>>>
>>
>
> --
> 
> Mingmin
>


Re: How to expose/use the External transform on Java SDK

2019-07-25 Thread Kenneth Knowles
Top posting just to address the vendoring question. We didn't have
vendoring of gRPC until very recently. I think all rationale about keeping
it off the SDK surface are obsolete now. It will probably unlock a lot of
simplification to just go for it and use gRPC in the core SDK. Notably,
runners-core-construction-java is separate for only this reason, and it is
a pretty confusing distinction* to get right sometimes. But I could be
missing another pitfall. For example we wouldn't just undo all the API
decisions in runners-core-construction-java, because we still want to
isolate the user from gRPC when it is not relevant to them, and we also
want transform authors to be able to register payload translators.

Kenn

 * sdks-java-core versus runners-core-construction-java is all
construction-time bits and you have to think about whether it is
user-facing or runner-facing / whether it depends on proto
 * runners-core-construction-java versus runners-core-java is all
runner/harness-facing but you have to think about whether it is
construction-time or run-time

On Thu, Jul 25, 2019 at 9:45 AM Ismaël Mejía  wrote:

> It seems we mostly agree that this is a ‘core’ feature, and it already
> is at least in the other SDKs (python and go). So if its place is in
> sdks/java/core then the correct path might be #1.
>
> Robert where did the discussion about merging transform translation
> happened (I probably missed it) because that could be an extra point
> to decide to do this.
>
> I thought that gRPC leaked more stuff so that's better than I expected
> and vendoring helps with possible versions conflicts. Bad part is it
> is still 14MB so this is more stuff to stage and bigger fat jars for
> deployments in the open source runners. Not to mention the potential
> repeated versions that could arise from runner repackaging (as we
> lived with guava in the past).
>
> The other thing I feel weird about is that this feels a bit like
> mixing the execution part with the definition part that is something
> that I really appreciated of the current sdks/java/core separation, at
> least in Java.
>
> Is there a way just to include just the generated clients for an gRPC
> service and not the full gRPC stuff (side not I am an gRPC newbie
> still)? That could make it at least a bit more constrained (even if
> still mixing runtime with definition).
>
> On Thu, Jul 25, 2019 at 10:20 AM Robert Bradshaw 
> wrote:
> >
> > From the portability perspective,
> >
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto
> > and the associated services for executing pipelines is about as "core"
> > as it gets, and eventually I'd like to see all runners being portable
> > (even if they have an option of running user code in process rather
> > than in separate docker images) and the API between SDKs and Runners
> > would be these beam model protos rather than some parallel set of Java
> > classes. This would argue for #1. (There was also some discussion
> > recently about merging the transform translation into core as well, as
> > the current structure of keeping it separate introduces a lot of extra
> > hoops and makes it difficult to define user-level transforms that have
> > proper translation, which is along the same lines.)
> >
> > I'm not quite sure I follow the downsides of leaking the vendored
> > classes into the users classpath--isn't the point of vendoring to make
> > such exposure benign (and as you'd almost always be linking in a
> > runner, you'd get this anyway).
> >
> > Finally, from a simple user's API perspective, having
> > ExternalTransform in core makes a lot of sense and it'd be unfortunate
> > to contort the API for underlying technical reasons if it can be
> > avoided.
> >
> > On Wed, Jul 24, 2019 at 9:18 PM Heejong Lee  wrote:
> > >
> > > I think it depends how we define "the core" part of the SDK. If we
> define the core as only the (abstract) data types which describe BEAM
> pipeline model then it would be more sensible to put external transform
> into a separate extension module (option 4). Otherwise, option 1 makes
> sense.
> > >
> > > On Wed, Jul 24, 2019 at 11:56 AM Chamikara Jayalath <
> chamik...@google.com> wrote:
> > >>
> > >> The idea of 'ExternalTransform' is to allow users to use transforms
> in SDK X from SDK Y. I think this should be a core part of each SDK and
> corresponding external transforms ([a] for Java, [b] for Python) should be
> released with each SDK. This will also allow us to add core external
> transforms to some of the critical transforms that are not available in
> certain SDKs. So I prefer option (1).
> > >>
> > >> Rebo, I didn't realize there's an external transform in Go SDK.
> Looking at it, seems like it's more of an interface for native transforms
> implemented in each runner, not for cross-language use-cases. Is that
> correct ? May be we can reuse it for latter as well.
> > >>
> > >> Thanks,
> > >> Cham
> > >>
> > >> [a]
> https://github.com/apache

Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread rahul patwari
Yes. But, GroupIntoBatches works on KV. We are working on
PCollection throughout our pipeline.
We can convert Row to KV. But, we only have a few keys and a Bounded
PCollection. As we have Global windows and a few keys, the opportunity for
parallelism is limited to [No. of keys] with Stateful ParDo [per Key, Per
Window] Processing.

On Thu, Jul 25, 2019 at 10:08 PM Reuven Lax  wrote:

> Have you looked at the GroupIntoBatches transform?
>
> On Thu, Jul 25, 2019 at 9:34 AM rahul patwari 
> wrote:
>
>> So, If an RPC call has to be performed for a batch of
>> Rows(PCollection), instead of each Row, the recommended way is to
>> batch the Rows in startBundle() of DoFn(
>> https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
>> I thought Stateful and Timely Processing could be helpful here.
>>
>> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw 
>> wrote:
>>
>>> Though it's not obvious in the name, Stateful ParDos can only be
>>> applied to keyed PCollections, similar to GroupByKey. (You could,
>>> however, assign every element to the same key and then apply a
>>> Stateful DoFn, though in that case all elements would get processed on
>>> the same worker.)
>>>
>>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>>>  wrote:
>>> >
>>> > Hi,
>>> >
>>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>> gives an example of assigning an arbitrary-but-consistent index to each
>>> element on a per key-and-window basis.
>>> >
>>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
>>> PCollection with Fixed Windows, the state is maintained per window and
>>> every element in the window will be assigned a consistent index?
>>> > Does this mean every element belonging to the window will be processed
>>> in a single DoFn Instance, which otherwise could have been done in multiple
>>> parallel instances, limiting performance?
>>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
>>> PCollection?
>>> >
>>> > Thanks,
>>> > Rahul
>>>
>>


Re: Choosing a coder for a class that contains a Row?

2019-07-25 Thread Brian Hulette
I know Reuven has put some thought into evolving schemas, but I'm not sure
it's documented anywhere as of now. The only documentation I've come across
as I bump around the schema code are some comments deep in RowCoder [1].
Essentially the current serialization format for a row includes a row count
as a prefix so we can detect "simple" schema changes like column additions
and deletions. When decoding a Row, if the current schema contains
*more* fields
than the encoded Row, the remaining fields are populated with nulls in the
resulting Row object. If the current schema contains *fewer* fields than
the encoded Row, the additional ones are just dropped.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296

On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba  wrote:

> I'm also really interested in the question of evolving schemas... It's
> something I've also put off figuring out :D
>
> With all its warts, the LazyAvroCoder technique (a coder backed by
> some sort of schema registry) _could_ work with "homogeneish" data
> (i.e. if the number of schemas in play for a single coder is much,
> much smaller than the number of elements), even if none of the the
> schemas are known at Pipeline construction.  The portability job
> server (which already stores and serves artifacts for running jobs)
> might be the right place to put a schema registry... but I'm not
> entirely convinced it's the right way to go either.
>
> At the same time, "simply" bumping a known schema to a new version is
> roughly equivalent to updating a pipeline in place.
>
> Sending the data as Java-serialized Rows will be equivalent to sending
> the entire schema with every record, so it _would_ work without
> involving a new, distributed state between one coders encode and
> anothers decode (at the cost of message size, of course).
>
> Ryan
>
>
> On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada  wrote:
> >
> > +dev
> > Thanks Ryan! This is quite helpful. Still not what I need : ) - but
> useful.
> >
> > The data is change data capture from databases, and I'm putting it into
> a Beam Row. The schema for the Row is generally homogeneous, but subject to
> change at some point in the future if the schema in the database changes.
> It's unusual and unlikely, but possible. I have no idea how Beam deals with
> evolving schemas. +Reuven Lax is there documentation / examples / anything
> around this? : )
> >
> > I think evolving schemas is an interesting question
> >
> > For now, I am going to Java-serialize the objects, and delay figuring
> this out. But I reckon I'll have to come back to this...
> >
> > Best
> > -P.
> >
> > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba  wrote:
> >>
> >> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
> >> pipeline construction time, but can be discovered from the instance of
> >> MyData?
> >>
> >> Once discovered, is the schema "homogeneous" for all instance of
> >> MyData?  (i.e. someRow will always have the same schema for all
> >> instances afterwards, and there won't be another someRow with a
> >> different schema).
> >>
> >> We've encountered a parallel "problem" with pure Avro data, where the
> >> instance is a GenericRecord containing it's own Avro schema but
> >> *without* knowing the schema until the pipeline is run.  The solution
> >> that we've been using is a bit hacky, but we're using an ad hoc
> >> per-job schema registry and a custom coder where each worker saves the
> >> schema in the `encode` before writing the record, and loads it lazily
> >> in the `decode` before reading.
> >>
> >> The original code is available[1] (be gentle, it was written with Beam
> >> 0.4.0-incubating... and has continued to work until now).
> >>
> >> In practice, the ad hoc schema registry is just a server socket in the
> >> Spark driver, in-memory for DirectRunner / local mode, and a a
> >> read/write to a known location in other runners.  There are definitely
> >> other solutions with side-inputs and providers, and the job server in
> >> portability looks like an exciting candidate for per-job schema
> >> registry story...
> >>
> >> I'm super eager to see if there are other ideas or a contribution we
> >> can make in this area that's "Beam Row" oriented!
> >>
> >> Ryan
> >>
> >> [1]
> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
> >>
> >> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada 
> wrote:
> >> >
> >> > Hello all,
> >> > I am writing a utility to push data to PubSub. My data class looks
> something like so:
> >> > ==
> >> > class MyData {
> >> >   String someId;
> >> >   Row someRow;
> >> >   Row someOtherRow;
> >> > }
> >> > ==
> >> > The schema for the Rows is not known a-priori. It is contained by the
> Row. I am then pushing this data to pubsub:
> >> > ===
> >> > MyData pushingData = 
> >> > 

Re: How to expose/use the External transform on Java SDK

2019-07-25 Thread Ismaël Mejía
It seems we mostly agree that this is a ‘core’ feature, and it already
is at least in the other SDKs (python and go). So if its place is in
sdks/java/core then the correct path might be #1.

Robert where did the discussion about merging transform translation
happened (I probably missed it) because that could be an extra point
to decide to do this.

I thought that gRPC leaked more stuff so that's better than I expected
and vendoring helps with possible versions conflicts. Bad part is it
is still 14MB so this is more stuff to stage and bigger fat jars for
deployments in the open source runners. Not to mention the potential
repeated versions that could arise from runner repackaging (as we
lived with guava in the past).

The other thing I feel weird about is that this feels a bit like
mixing the execution part with the definition part that is something
that I really appreciated of the current sdks/java/core separation, at
least in Java.

Is there a way just to include just the generated clients for an gRPC
service and not the full gRPC stuff (side not I am an gRPC newbie
still)? That could make it at least a bit more constrained (even if
still mixing runtime with definition).

On Thu, Jul 25, 2019 at 10:20 AM Robert Bradshaw  wrote:
>
> From the portability perspective,
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto
> and the associated services for executing pipelines is about as "core"
> as it gets, and eventually I'd like to see all runners being portable
> (even if they have an option of running user code in process rather
> than in separate docker images) and the API between SDKs and Runners
> would be these beam model protos rather than some parallel set of Java
> classes. This would argue for #1. (There was also some discussion
> recently about merging the transform translation into core as well, as
> the current structure of keeping it separate introduces a lot of extra
> hoops and makes it difficult to define user-level transforms that have
> proper translation, which is along the same lines.)
>
> I'm not quite sure I follow the downsides of leaking the vendored
> classes into the users classpath--isn't the point of vendoring to make
> such exposure benign (and as you'd almost always be linking in a
> runner, you'd get this anyway).
>
> Finally, from a simple user's API perspective, having
> ExternalTransform in core makes a lot of sense and it'd be unfortunate
> to contort the API for underlying technical reasons if it can be
> avoided.
>
> On Wed, Jul 24, 2019 at 9:18 PM Heejong Lee  wrote:
> >
> > I think it depends how we define "the core" part of the SDK. If we define 
> > the core as only the (abstract) data types which describe BEAM pipeline 
> > model then it would be more sensible to put external transform into a 
> > separate extension module (option 4). Otherwise, option 1 makes sense.
> >
> > On Wed, Jul 24, 2019 at 11:56 AM Chamikara Jayalath  
> > wrote:
> >>
> >> The idea of 'ExternalTransform' is to allow users to use transforms in SDK 
> >> X from SDK Y. I think this should be a core part of each SDK and 
> >> corresponding external transforms ([a] for Java, [b] for Python) should be 
> >> released with each SDK. This will also allow us to add core external 
> >> transforms to some of the critical transforms that are not available in 
> >> certain SDKs. So I prefer option (1).
> >>
> >> Rebo, I didn't realize there's an external transform in Go SDK. Looking at 
> >> it, seems like it's more of an interface for native transforms implemented 
> >> in each runner, not for cross-language use-cases. Is that correct ? May be 
> >> we can reuse it for latter as well.
> >>
> >> Thanks,
> >> Cham
> >>
> >> [a] 
> >> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
> >> [b] 
> >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py
> >>
> >> On Wed, Jul 24, 2019 at 10:25 AM Robert Burke  wrote:
> >>>
> >>> Ideas inline.
> >>>
> >>> On Wed, Jul 24, 2019, 9:56 AM Ismaël Mejía  wrote:
> 
>  After Beam Summit EU I was curious about the External transform. I was
>  interested on the scenario of using it to call python code in the
>  middle of a Java pipeline. This is a potentially useful scenario for
>  example to evaluate models from python ML frameworks on Java
>  pipelines. In my example I did a transform to classify elements in a
>  simple Python ParDo and tried to connect it via the Java External
>  transform.
> 
>  I found that the ExternalTransform code was added into
>  `runners/core-construction-java` as part of BEAM-6747 [1]. However
>  this code is not exposed currently as part of the Beam Java SDK, so
>  end users won’t be able to find it easily. I found this weird and
>  thought well it will be as simple as to move it into the Java SDK and
>  voila!

Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread Reuven Lax
Have you looked at the GroupIntoBatches transform?

On Thu, Jul 25, 2019 at 9:34 AM rahul patwari 
wrote:

> So, If an RPC call has to be performed for a batch of
> Rows(PCollection), instead of each Row, the recommended way is to
> batch the Rows in startBundle() of DoFn(
> https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
> I thought Stateful and Timely Processing could be helpful here.
>
> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw 
> wrote:
>
>> Though it's not obvious in the name, Stateful ParDos can only be
>> applied to keyed PCollections, similar to GroupByKey. (You could,
>> however, assign every element to the same key and then apply a
>> Stateful DoFn, though in that case all elements would get processed on
>> the same worker.)
>>
>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>>  wrote:
>> >
>> > Hi,
>> >
>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>> gives an example of assigning an arbitrary-but-consistent index to each
>> element on a per key-and-window basis.
>> >
>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
>> PCollection with Fixed Windows, the state is maintained per window and
>> every element in the window will be assigned a consistent index?
>> > Does this mean every element belonging to the window will be processed
>> in a single DoFn Instance, which otherwise could have been done in multiple
>> parallel instances, limiting performance?
>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
>> PCollection?
>> >
>> > Thanks,
>> > Rahul
>>
>


Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread rahul patwari
So, If an RPC call has to be performed for a batch of
Rows(PCollection), instead of each Row, the recommended way is to
batch the Rows in startBundle() of DoFn(
https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
I thought Stateful and Timely Processing could be helpful here.

On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw  wrote:

> Though it's not obvious in the name, Stateful ParDos can only be
> applied to keyed PCollections, similar to GroupByKey. (You could,
> however, assign every element to the same key and then apply a
> Stateful DoFn, though in that case all elements would get processed on
> the same worker.)
>
> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>  wrote:
> >
> > Hi,
> >
> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives
> an example of assigning an arbitrary-but-consistent index to each element
> on a per key-and-window basis.
> >
> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
> PCollection with Fixed Windows, the state is maintained per window and
> every element in the window will be assigned a consistent index?
> > Does this mean every element belonging to the window will be processed
> in a single DoFn Instance, which otherwise could have been done in multiple
> parallel instances, limiting performance?
> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
> PCollection?
> >
> > Thanks,
> > Rahul
>


Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread Robert Bradshaw
Though it's not obvious in the name, Stateful ParDos can only be
applied to keyed PCollections, similar to GroupByKey. (You could,
however, assign every element to the same key and then apply a
Stateful DoFn, though in that case all elements would get processed on
the same worker.)

On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
 wrote:
>
> Hi,
>
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an 
> example of assigning an arbitrary-but-consistent index to each element on a 
> per key-and-window basis.
>
> If the Stateful ParDo is applied on a Non-Keyed PCollection, say, 
> PCollection with Fixed Windows, the state is maintained per window and 
> every element in the window will be assigned a consistent index?
> Does this mean every element belonging to the window will be processed in a 
> single DoFn Instance, which otherwise could have been done in multiple 
> parallel instances, limiting performance?
> Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?
>
> Thanks,
> Rahul


Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread rahul patwari
Hi,

https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an
example of assigning an arbitrary-but-consistent index to each element on a
per key-and-window basis.

If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
PCollection with Fixed Windows, the state is maintained per window and
every element in the window will be assigned a consistent index?
Does this mean every element belonging to the window will be processed in a
single DoFn Instance, which otherwise could have been done in multiple
parallel instances, limiting performance?
Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?

Thanks,
Rahul


Re: Sort Merge Bucket - Action Items

2019-07-25 Thread Gleb Kanterov
What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going
away in favor of SDF, or we are always going to have both?

I was looking into AvroIO.read and AvroIO.readAll, both of them
use AvroSource. AvroIO.readAll is using SDF, and it's implemented with
ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at
ReadAllViaFileBasedSource I find it not necessary to use Source, it
should be enough to have something like (KV,
OutputReceiver), as we have discussed in this thread, and that should be
fine for SMB as well. It would require duplicating code from AvroSource,
but in the end, I don't see it as a problem if AvroSource is going away.

I'm attaching a small diagram I put for myself to better understand the
code.

AvroIO.readAll :: PTransform> ->

FileIO.matchAll :: PTransform,
PCollection>
FileIO.readMatches :: PTransform,
PCollection>
AvroIO.readFiles :: PTransform,
PCollection> ->

ReadAllViaFileBasedSource :: PTransform,
PCollection> ->

ParDo.of(SplitIntoRangesFn :: DoFn>) (splittable do fn)

Reshuffle.viaRandomKey()

ParDo.of(ReadFileRangesFn(createSource) :: DoFn, T>) where

createSource :: String -> FileBasedSource

createSource = AvroSource


AvroIO.read without getHintMatchedManyFiles() :: PTransform> ->

Read.Bounded.from(createSource) where

createSource :: String -> FileBasedSource

createSource = AvroSource


Gleb


On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw  wrote:

> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles  wrote:
> >
> > From the peanut gallery, keeping a separate implementation for SMB seems
> fine. Dependencies are serious liabilities for both upstream and
> downstream. It seems like the reuse angle is generating extra work, and
> potentially making already-complex implementations more complex, instead of
> helping things.
>
> +1
>
> To be clear, what I care about is that WriteFiles(X) and
> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
> TFRecord, ...}. In other words composability of the API (vs. manually
> filling out the matrix). If WriteFiles and WriteSmbFiles find
> opportunities for (easy, clean) implementation sharing, that'd be
> nice, but not the primary goal.
>
> (Similarly for reading, though that's seem less obvious. Certainly
> whatever T is useful for ReadSmb(T) could be useful for a
> (non-liquid-shading) ReadAll(T) however.)
>
> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li 
> wrote:
> >>
> >> I spoke too soon. Turns out for unsharded writes, numShards can't be
> determined until the last finalize transform, which is again different from
> the current SMB proposal (static number of buckets & shards).
> >> I'll end up with more code specialized for SMB in order to generalize
> existing sink code, which I think we all want to avoid.
> >>
> >> Seems the only option is duplicating some logic like temp file
> handling, which is exactly what we did in the original PR.
> >> I can reuse Compression & Sink for file level writes but that seems
> about the most I can reuse right now.
> >>
> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li 
> wrote:
> >>>
> >>> So I spent one afternoon trying some ideas for reusing the last few
> transforms WriteFiles.
> >>>
> >>> WriteShardsIntoTempFilesFn extends DoFn,
> Iterable>, FileResult>
> >>> => GatherResults extends PTransform,
> PCollection>>
> >>> => FinalizeTempFileBundles extends
> PTransform>>,
> WriteFilesResult>
> >>>
> >>> I replaced FileResult with KV
> so I can use pre-compute SMB destination file names for the transforms.
> >>> I'm also thinking of parameterizing ShardedKey for SMB's
> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
> private and easy to change/pull out.
> >>>
> >>> OTOH they are somewhat coupled with the package private
> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
> temp file handing logic lives). Might be hard to decouple either modifying
> existing code or creating new transforms, unless if we re-write most of
> FileBasedSink from scratch.
> >>>
> >>> Let me know if I'm on the wrong track.
> >>>
> >>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
> >>>
> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> 
> 
> 
>  On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw 
> wrote:
> >
> > On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <
> kirpic...@google.com> wrote:
> > >
> > > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <
> rober...@google.com> wrote:
> > >>
> > >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li 
> wrote:
> > >> >
> > >> > Thanks Robert. Agree with the FileIO point. I'll look into it
> and see what needs to be done.
> > >> >
> > >> > Eugene pointed out that we shouldn't build on
> FileBased{Source,Sink}. So for writes I'll probably build on top of
> WriteFiles.
> > >>
> > >> Meaning it could be parameterized by FileIO.Sink, right?
> > >>
> > >>
> https

Re: Sort Merge Bucket - Action Items

2019-07-25 Thread Robert Bradshaw
On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles  wrote:
>
> From the peanut gallery, keeping a separate implementation for SMB seems 
> fine. Dependencies are serious liabilities for both upstream and downstream. 
> It seems like the reuse angle is generating extra work, and potentially 
> making already-complex implementations more complex, instead of helping 
> things.

+1

To be clear, what I care about is that WriteFiles(X) and
WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
TFRecord, ...}. In other words composability of the API (vs. manually
filling out the matrix). If WriteFiles and WriteSmbFiles find
opportunities for (easy, clean) implementation sharing, that'd be
nice, but not the primary goal.

(Similarly for reading, though that's seem less obvious. Certainly
whatever T is useful for ReadSmb(T) could be useful for a
(non-liquid-shading) ReadAll(T) however.)

> On Wed, Jul 24, 2019 at 11:59 AM Neville Li  wrote:
>>
>> I spoke too soon. Turns out for unsharded writes, numShards can't be 
>> determined until the last finalize transform, which is again different from 
>> the current SMB proposal (static number of buckets & shards).
>> I'll end up with more code specialized for SMB in order to generalize 
>> existing sink code, which I think we all want to avoid.
>>
>> Seems the only option is duplicating some logic like temp file handling, 
>> which is exactly what we did in the original PR.
>> I can reuse Compression & Sink for file level writes but that seems about 
>> the most I can reuse right now.
>>
>> On Tue, Jul 23, 2019 at 6:36 PM Neville Li  wrote:
>>>
>>> So I spent one afternoon trying some ideas for reusing the last few 
>>> transforms WriteFiles.
>>>
>>> WriteShardsIntoTempFilesFn extends DoFn, 
>>> Iterable>, FileResult>
>>> => GatherResults extends PTransform, 
>>> PCollection>>
>>> => FinalizeTempFileBundles extends 
>>> PTransform>>, 
>>> WriteFilesResult>
>>>
>>> I replaced FileResult with KV so I 
>>> can use pre-compute SMB destination file names for the transforms.
>>> I'm also thinking of parameterizing ShardedKey for SMB's 
>>> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are 
>>> private and easy to change/pull out.
>>>
>>> OTOH they are somewhat coupled with the package private 
>>> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of 
>>> temp file handing logic lives). Might be hard to decouple either modifying 
>>> existing code or creating new transforms, unless if we re-write most of 
>>> FileBasedSink from scratch.
>>>
>>> Let me know if I'm on the wrong track.
>>>
>>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>>>
>>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath  
>>> wrote:



 On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw  
 wrote:
>
> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov  
> wrote:
> >
> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw  
> > wrote:
> >>
> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li  
> >> wrote:
> >> >
> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and 
> >> > see what needs to be done.
> >> >
> >> > Eugene pointed out that we shouldn't build on 
> >> > FileBased{Source,Sink}. So for writes I'll probably build on top of 
> >> > WriteFiles.
> >>
> >> Meaning it could be parameterized by FileIO.Sink, right?
> >>
> >> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
> >
> > Yeah if possible, parameterize FileIO.Sink.
> > I would recommend against building on top of WriteFiles either. FileIO 
> > being implemented on top of WriteFiles was supposed to be a temporary 
> > measure - the longer-term plan was to rewrite it from scratch (albeit 
> > with a similar structure) and throw away WriteFiles.
> > If possible, I would recommend to pursue this path: if there are parts 
> > of WriteFiles you want to reuse, I would recommend to implement them as 
> > new transforms, not at all tied to FileBasedSink (but ok if tied to 
> > FileIO.Sink), with the goal in mind that FileIO could be rewritten on 
> > top of these new transforms, or maybe parts of WriteFiles could be 
> > swapped out for them incrementally.
>
> Thanks for the feedback. There's a lot that was done, but looking at
> the code it feels like there's a lot that was not yet done either, and
> the longer-term plan wasn't clear (though perhaps I'm just not finding
> the right docs).


 I'm also a bit unfamiliar with original plans for WriteFiles and for 
 updating source interfaces, but I prefer not significantly modifying 
 existing IO transforms to suite the SMB use-case. If there are existing 
 pieces of code that can be easily re-used that is fine, but existing 
 sources/sinks are designed to perform

Re: Write-through-cache in State logic

2019-07-25 Thread Robert Bradshaw
On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar  wrote:
>
> Thanks Robert,
>
>  I stumble on the jira that you have created some time ago
> https://jira.apache.org/jira/browse/BEAM-5428
>
> You also marked code where code changes are required:
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>
> I am willing to provide help to implement this. Let me know how I can help.

As far as I'm aware, no one is actively working on it right now.
Please feel free to assign yourself the JIRA entry and I'll be happy
to answer any questions you might have if (well probably when) these
pointers are insufficient.

> On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw  wrote:
>>
>> This is documented at
>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>> . Note that it requires participation of both the runner and the SDK
>> (though there are no correctness issues if one or the other side does
>> not understand the protocol, caching just won't be used).
>>
>> I don't think it's been implemented anywhere, but could be very
>> beneficial for performance.
>>
>> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar  wrote:
>> >
>> > I checked the python sdk[1] and it has similar implementation as Java SDK.
>> >
>> > I would agree with Thomas. In case of high volume event stream and bigger 
>> > cluster size, network call can potentially cause a bottleneck.
>> >
>> > @Robert
>> > I am interested to see the proposal. Can you provide me the link of the 
>> > proposal?
>> >
>> > [1]: 
>> > https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>> >
>> >
>> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise  wrote:
>> >>
>> >> Thanks for the pointer. For streaming, it will be important to support 
>> >> caching across bundles. It appears that even the Java SDK doesn't support 
>> >> that yet?
>> >>
>> >> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>> >>
>> >> Regarding clear/append: It would be nice if both could occur within a 
>> >> single Fn Api roundtrip when the state is persisted.
>> >>
>> >> Thanks,
>> >> Thomas
>> >>
>> >>
>> >>
>> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik  wrote:
>> >>>
>> >>> User state is built on top of read, append and clear and not off a read 
>> >>> and write paradigm to allow for blind appends.
>> >>>
>> >>> The optimization you speak of can be done completely inside the SDK 
>> >>> without any additional protocol being required as long as you clear the 
>> >>> state first and then append all your new data. The Beam Java SDK does 
>> >>> this for all runners when executed portably[1]. You could port the same 
>> >>> logic to the Beam Python SDK as well.
>> >>>
>> >>> 1: 
>> >>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>> >>>
>> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw  
>> >>> wrote:
>> 
>>  Python workers also have a per-bundle SDK-side cache. A protocol has
>>  been proposed, but hasn't yet been implemented in any SDKs or runners.
>> 
>>  On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax  wrote:
>>  >
>>  > It's runner dependent. Some runners (e.g. the Dataflow runner) do 
>>  > have such a cache, though I think it's currently has a cap for large 
>>  > bags.
>>  >
>>  > Reuven
>>  >
>>  > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar  
>>  > wrote:
>>  >>
>>  >> Hi,
>>  >>
>>  >> I have been using python sdk for the application and also using 
>>  >> BagState in production. I was wondering whether state logic has any 
>>  >> write-through-cache implemented or not. If we are sending every read 
>>  >> and write request through network then it comes with a performance 
>>  >> cost. We can avoid network call for a read operation if we have 
>>  >> write-through-cache.
>>  >> I have superficially looked into the implementation and I didn't see 
>>  >> any cache implementation.
>>  >>
>>  >> is it possible to have this cache? would it cause any issue if we 
>>  >> have the caching layer?
>>  >>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-25 Thread Robert Bradshaw
On Thu, Jul 25, 2019 at 5:31 AM Thomas Weise  wrote:
>
> Hi Jincheng,
>
> It is very exciting to see this follow-up, that you have done your research 
> on the current state and that there is the intention to join forces on the 
> portability effort!
>
> I have added a few pointers inline.
>
> Several of the issues you identified affect our usage of Beam as well. These 
> present an opportunity for collaboration.

+1, a lot of this aligns with improvements we'd like to make as well.

> On Wed, Jul 24, 2019 at 2:53 AM jincheng sun  wrote:
>>
>> Hi all,
>>
>> Thanks Max and all of your kind words. :)
>>
>> Sorry for the late reply as I'm busy working on the Flink 1.9 release. For 
>> the next major release of Flink, we plan to add Python user defined 
>> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam 
>> portability framework and think that it is perfect for our requirements. 
>> However we also find some improvements needed for Beam:
>>
>> Must Have:
>> 
>> 1) Currently only BagState is supported in gRPC protocol and I think we 
>> should support more kinds of state types, such as MapState, ValueState, 
>> ReducingState, CombiningState(AggregatingState in Flink), etc. That's 
>> because these kinds of state will be used in both user-defined function or 
>> Flink Python DataStream API.
>
> There has been discussion about the need for different state types and to 
> efficiently support those on the runner side there may be a need to look at 
> the over the wire representation also.
>
> https://lists.apache.org/thread.html/ccc0d548e440b63897b6784cd7896c266498df64c9c63ce6c52ae098@%3Cdev.beam.apache.org%3E
> https://lists.apache.org/thread.html/ccf8529a49003a7be622b4d3403eba2c633caeaf5ced033e25d4c2e2@%3Cdev.beam.apache.org%3E

There are two distinct levels at which one can talk about a certain
type of state being supported: the user-visible SDK's API and the
runner's API. For example, BagState, ValueState, ReducingState,
CombiningState,  can all be implemented on top of a runner-offered
MapState in the SDK. On the one hand, there's a desire to keep the
number of "primitive" states types to a minimum (to ease the use of
authoring runners), but if a runner can perform a specific
optimization due to knowing about the particular state type it might
be preferable to pass it through rather than emulate it in the SDK.

>> 2) There are warnings that Python 3 is not fully supported in Beam 
>> (beam/sdks/python/setup.py). We should support Python 3.x for the beam 
>> portability framework due to Python 2 will be not supported officially.
>
> This must be obsolete per latest comments on: 
> https://issues.apache.org/jira/browse/BEAM-1251
>
>> 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory at 
>> the runner side. Why I think it's  must to have is because when the 
>> environment type is "PROCESS", the default value "/tmp" may become a big 
>> problem.

There are still some issues to be worked out around exactly how
environments are set up (most notably around dependencies that are
external to the docker images, but also things like this).

>> 4) The buffer size configure policy should be improved, such as:
>>At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver 
>> is size based. We should also support time based especially for the 
>> streaming case.
>>At Python SDK Harness, the buffer size is not configurable in 
>> GrpcDataService. The input queue size of the input buffer in Python SDK 
>> Harness is not size limited.
>>   The flush threshold of the output buffer in Python SDK Harness is 10 MB by 
>> default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the 
>> threshold configurable and support time based threshold.

I'm wary of having too many buffer size configuration options (is
there a compelling reason to make it bigger or smaller?) but something
timebased would be very useful.

>> Nice To Have:
>> ---
>> 1) Improves the interfaces of FnDataService, BundleProcessor, ActiveBundle, 
>> etc, to change the parameter type from WindowedValue to T. (We have 
>> already discussed in the previous mails)
>>
>> 2) Refactor the code to avoid unnecessary dependencies pull in. For example, 
>> beam-sdks-java-core(11MB) is a package for Java SDK users and it is pull in 
>> because there are a few classes in beam-sdks-java-core are used in 
>> beam-runners-java-fn-execution, such as:
>> PipelineOptions used in DefaultJobBundleFactory FileSystems used in 
>> BeamFileSystemArtifactRetrievalService.
>> It means maybe we can add a new module such as beam-sdks-java-common to hold 
>> the classes used by both runner and SDK.
>>
>> 3) State cache is not shared between bundles which is performance critical 
>> for streaming jobs.
>
> This is rather important to address:
>
> https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E
>
>>
>>
>> 4) The coder of Wi

Re: How to expose/use the External transform on Java SDK

2019-07-25 Thread Robert Bradshaw
>From the portability perspective,
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto
and the associated services for executing pipelines is about as "core"
as it gets, and eventually I'd like to see all runners being portable
(even if they have an option of running user code in process rather
than in separate docker images) and the API between SDKs and Runners
would be these beam model protos rather than some parallel set of Java
classes. This would argue for #1. (There was also some discussion
recently about merging the transform translation into core as well, as
the current structure of keeping it separate introduces a lot of extra
hoops and makes it difficult to define user-level transforms that have
proper translation, which is along the same lines.)

I'm not quite sure I follow the downsides of leaking the vendored
classes into the users classpath--isn't the point of vendoring to make
such exposure benign (and as you'd almost always be linking in a
runner, you'd get this anyway).

Finally, from a simple user's API perspective, having
ExternalTransform in core makes a lot of sense and it'd be unfortunate
to contort the API for underlying technical reasons if it can be
avoided.

On Wed, Jul 24, 2019 at 9:18 PM Heejong Lee  wrote:
>
> I think it depends how we define "the core" part of the SDK. If we define the 
> core as only the (abstract) data types which describe BEAM pipeline model 
> then it would be more sensible to put external transform into a separate 
> extension module (option 4). Otherwise, option 1 makes sense.
>
> On Wed, Jul 24, 2019 at 11:56 AM Chamikara Jayalath  
> wrote:
>>
>> The idea of 'ExternalTransform' is to allow users to use transforms in SDK X 
>> from SDK Y. I think this should be a core part of each SDK and corresponding 
>> external transforms ([a] for Java, [b] for Python) should be released with 
>> each SDK. This will also allow us to add core external transforms to some of 
>> the critical transforms that are not available in certain SDKs. So I prefer 
>> option (1).
>>
>> Rebo, I didn't realize there's an external transform in Go SDK. Looking at 
>> it, seems like it's more of an interface for native transforms implemented 
>> in each runner, not for cross-language use-cases. Is that correct ? May be 
>> we can reuse it for latter as well.
>>
>> Thanks,
>> Cham
>>
>> [a] 
>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
>> [b] 
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py
>>
>> On Wed, Jul 24, 2019 at 10:25 AM Robert Burke  wrote:
>>>
>>> Ideas inline.
>>>
>>> On Wed, Jul 24, 2019, 9:56 AM Ismaël Mejía  wrote:

 After Beam Summit EU I was curious about the External transform. I was
 interested on the scenario of using it to call python code in the
 middle of a Java pipeline. This is a potentially useful scenario for
 example to evaluate models from python ML frameworks on Java
 pipelines. In my example I did a transform to classify elements in a
 simple Python ParDo and tried to connect it via the Java External
 transform.

 I found that the ExternalTransform code was added into
 `runners/core-construction-java` as part of BEAM-6747 [1]. However
 this code is not exposed currently as part of the Beam Java SDK, so
 end users won’t be able to find it easily. I found this weird and
 thought well it will be as simple as to move it into the Java SDK and
 voila!

 But of course this could not be so easy because this transform calls
 the Expansion service via gRPC and Java SDK does not have (and
 probably should not have) gRPC in its dependencies.
 So my second reflex was to add it into Java SDK and translate it a
 generic expansion all the runners, but this may not make sense because
 the External transform is not part of the runner translation since
 this is part of the Pipeline construction process (as pointed to me by
 Max in a slack discussion).

 So the question is: How do you think this should be exposed to the end 
 users?

 1. Should we add gRPC with all its deps to SDKs Java core? (this of
 course it is not nice because we will leak our vendored gRPC and
 friends into users classpath).
>>>
>>> If there's separation between the SDK and the Harness then this makes 
>>> sense. Otherwise the portable harness depends on GRPC at present, doesn't 
>>> it? Presently the Go SDK kicks off the harness, and then carries the GRPC 
>>> dependency (Though that's separable if necessary.)

 2. Should we do the dynamic loading of classes only an runtime if the
 transform is used to avoid the big extra compile dependency (and add
 runners/core-construction-java) as a runtime dependency.
 3. Should we create a ‘shim’ module to hide the gRPC dependency and