applying keyed state on top of stream from co-groupByKey output

2019-07-23 Thread Kiran Hurakadli
Hi All,
I am trying to merge  2 data streams using coGroupByKey and applying
stateful
ParDo. Input to the cogroup fun is (word,count) , Full problem explained on
stack over flow
https://stackoverflow.com/questions/57162131/applying-keyed-state-on-top-of-stream-from-co-group-stream

Please help me
-- 
Regards,
*Kiran M Hurakadli.*


Re: Choosing a coder for a class that contains a Row?

2019-07-23 Thread Pablo Estrada
+dev 
Thanks Ryan! This is quite helpful. Still not what I need : ) - but useful.

The data is change data capture from databases, and I'm putting it into a
Beam Row. The schema for the Row is generally homogeneous, but subject to
change at some point in the future if the schema in the database changes.
It's unusual and unlikely, but possible. I have no idea how Beam deals with
evolving schemas. +Reuven Lax  is there documentation /
examples / anything around this? : )

I think evolving schemas is an interesting question

For now, I am going to Java-serialize the objects, and delay figuring this
out. But I reckon I'll have to come back to this...

Best
-P.

On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba  wrote:

> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
> pipeline construction time, but can be discovered from the instance of
> MyData?
>
> Once discovered, is the schema "homogeneous" for all instance of
> MyData?  (i.e. someRow will always have the same schema for all
> instances afterwards, and there won't be another someRow with a
> different schema).
>
> We've encountered a parallel "problem" with pure Avro data, where the
> instance is a GenericRecord containing it's own Avro schema but
> *without* knowing the schema until the pipeline is run.  The solution
> that we've been using is a bit hacky, but we're using an ad hoc
> per-job schema registry and a custom coder where each worker saves the
> schema in the `encode` before writing the record, and loads it lazily
> in the `decode` before reading.
>
> The original code is available[1] (be gentle, it was written with Beam
> 0.4.0-incubating... and has continued to work until now).
>
> In practice, the ad hoc schema registry is just a server socket in the
> Spark driver, in-memory for DirectRunner / local mode, and a a
> read/write to a known location in other runners.  There are definitely
> other solutions with side-inputs and providers, and the job server in
> portability looks like an exciting candidate for per-job schema
> registry story...
>
> I'm super eager to see if there are other ideas or a contribution we
> can make in this area that's "Beam Row" oriented!
>
> Ryan
>
> [1]
> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
>
> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada  wrote:
> >
> > Hello all,
> > I am writing a utility to push data to PubSub. My data class looks
> something like so:
> > ==
> > class MyData {
> >   String someId;
> >   Row someRow;
> >   Row someOtherRow;
> > }
> > ==
> > The schema for the Rows is not known a-priori. It is contained by the
> Row. I am then pushing this data to pubsub:
> > ===
> > MyData pushingData = 
> > WhatCoder? coder = 
> >
> > ByteArrayOutputStream os = new ByteArrayOutputStream();
> > coder.encode(this, os);
> >
> > pubsubClient.connect();
> >
> pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
> > pubsubClient.close();
> > =
> > What's the right coder to use in this case? I don't know if SchemaCoder
> will work, because it seems that it requires the Row's schema a priori. I
> have not been able to make AvroCoder work.
> >
> > Any tips?
> > Best
> > -P.
>


Re: Portability framework: multiple environments in one pipeline

2019-07-23 Thread Chamikara Jayalath
On Tue, Jul 23, 2019 at 3:45 PM Chad Dombrova  wrote:

> Our specific situation is pretty unique, but I think it fits a more
> general pattern.  We use a number of media applications and each comes with
> its own built-in python interpreter (Autodesk Maya and SideFX Houndini, for
> example), and the core modules for each application can only be imported
> within their respective interpreter.  We want to be able to create
> pipelines where certain transforms are hosted within different application
> interpreters, so that we can avoid the ugly workarounds that we have to do
> now.
>
> I can imagine a similar scenario where a user wants to use a number of
> different libraries for different transforms, but the libraries’
> requirements conflict with each other, or perhaps some require python3 and
> others are stuck on python2.
>

Thanks for the details.


>
> Where can I find documentation on the expansion service?  I found a design
> doc which was helpful, but it seems to hew toward the hypothetical, so I
> think there have been a number of concrete steps taken since it was
> written:
>
> https://docs.google.com/document/d/1veiDF2dVH_5y56YxCcri2elCtYJgJnqw9aKPWAgddH8/mobilebasic
>

For now, I suggest you follow one of the existing examples. For example,

We start an expansion service here:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_xlang.py#L123
Expansion jar file generation and Gradle build logic is here:
https://github.com/apache/beam/blob/master/sdks/python/build.gradle#L408

Currently this is only supported by Flink runner. Support for Dataflow
runner is in the works.

There have been discussions about automatically starting expansion services
and we should be adding documentation around current solution in the
future.

Thanks,
Cham



>
> -chad
>
>
>
> On Tue, Jul 23, 2019 at 1:39 PM Chamikara Jayalath 
> wrote:
>
>> I think we have primary focussed on the ability run transforms from
>> multiple SDK in the same pipeline (cross-language) so far, but as Robert
>> mentioned the framework currently in development should also be usable for
>> running pipelines that use multiple environments that have the same SDK
>> installed as well. I'd love to get more clarity on the exact use-case here
>> (for example, details on why you couldn't run all Python transforms in a
>> single environment) and to know if others have the same requirement.
>>
>> Thanks,
>> Cham
>>
>>
>> On Mon, Jul 22, 2019 at 12:31 AM Robert Bradshaw 
>> wrote:
>>
>>> Yes, for sure. Support for this is available in some runners (like the
>>> Python Universal Local Runner and Flink) and actively being added to
>>> others (e.g. Dataflow). There are still some rough edges however--one
>>> currently must run an expansion service to define a pipeline step in
>>> an alternative environment (e.g. by registering your transforms and
>>> running
>>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/portability/expansion_service_test.py
>>> ).
>>> We'd like to make this process a lot smoother (and feedback would be
>>> appreciated).
>>>
>>> On Sat, Jul 20, 2019 at 7:57 PM Chad Dombrova  wrote:
>>> >
>>> > Hi all,
>>> > I'm interested to know if others on the list would find value in the
>>> ability to use multiple environments (e.g. docker images) within a single
>>> pipeline, using some mechanism to identify the environment(s) that a
>>> transform should use. It would be quite useful for us, since our transforms
>>> can have conflicting python requirements, or worse, conflicting interpreter
>>> requirements.  Currently to solve this we have to break the pipeline up
>>> into multiple pipelines and use pubsub to communicate between them, which
>>> is not ideal.
>>> >
>>> > -chad
>>> >
>>>
>>


Re: Portability framework: multiple environments in one pipeline

2019-07-23 Thread Chad Dombrova
Our specific situation is pretty unique, but I think it fits a more general
pattern.  We use a number of media applications and each comes with its own
built-in python interpreter (Autodesk Maya and SideFX Houndini, for
example), and the core modules for each application can only be imported
within their respective interpreter.  We want to be able to create
pipelines where certain transforms are hosted within different application
interpreters, so that we can avoid the ugly workarounds that we have to do
now.

I can imagine a similar scenario where a user wants to use a number of
different libraries for different transforms, but the libraries’
requirements conflict with each other, or perhaps some require python3 and
others are stuck on python2.

Where can I find documentation on the expansion service?  I found a design
doc which was helpful, but it seems to hew toward the hypothetical, so I
think there have been a number of concrete steps taken since it was
written:
https://docs.google.com/document/d/1veiDF2dVH_5y56YxCcri2elCtYJgJnqw9aKPWAgddH8/mobilebasic

-chad



On Tue, Jul 23, 2019 at 1:39 PM Chamikara Jayalath 
wrote:

> I think we have primary focussed on the ability run transforms from
> multiple SDK in the same pipeline (cross-language) so far, but as Robert
> mentioned the framework currently in development should also be usable for
> running pipelines that use multiple environments that have the same SDK
> installed as well. I'd love to get more clarity on the exact use-case here
> (for example, details on why you couldn't run all Python transforms in a
> single environment) and to know if others have the same requirement.
>
> Thanks,
> Cham
>
>
> On Mon, Jul 22, 2019 at 12:31 AM Robert Bradshaw 
> wrote:
>
>> Yes, for sure. Support for this is available in some runners (like the
>> Python Universal Local Runner and Flink) and actively being added to
>> others (e.g. Dataflow). There are still some rough edges however--one
>> currently must run an expansion service to define a pipeline step in
>> an alternative environment (e.g. by registering your transforms and
>> running
>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/portability/expansion_service_test.py
>> ).
>> We'd like to make this process a lot smoother (and feedback would be
>> appreciated).
>>
>> On Sat, Jul 20, 2019 at 7:57 PM Chad Dombrova  wrote:
>> >
>> > Hi all,
>> > I'm interested to know if others on the list would find value in the
>> ability to use multiple environments (e.g. docker images) within a single
>> pipeline, using some mechanism to identify the environment(s) that a
>> transform should use. It would be quite useful for us, since our transforms
>> can have conflicting python requirements, or worse, conflicting interpreter
>> requirements.  Currently to solve this we have to break the pipeline up
>> into multiple pipelines and use pubsub to communicate between them, which
>> is not ideal.
>> >
>> > -chad
>> >
>>
>


Re: Live fixing of a Beam bug on July 25 at 3:30pm-4:30pm PST

2019-07-23 Thread Austin Bennett
Pablo,

Assigned  https://issues.apache.org/jira/browse/BEAM-7607 to you, to make
even more likely that it is still around on the 25th :-)

Cheers,
Austin

On Tue, Jul 23, 2019 at 11:24 AM Pablo Estrada  wrote:

> Hi all,
> I've just realized that https://issues.apache.org/jira/browse/BEAM-7607 is
> a single-line change - and we'd spend 40 minutes chitchatting, so I'll also
> be working on https://jira.apache.org/jira/browse/BEAM-7803, which is a
> Python issue (also for the BigQuery sink!).
> Thanks!
> -P.
>
> On Sat, Jul 20, 2019 at 2:05 PM Pablo Estrada  wrote:
>
>> Hello all,
>>
>> This will be streamed on youtube on this link:
>> https://www.youtube.com/watch?v=xpIpEO4PUDo
>>
>> I think there will be a live chat, so I will hopefully be available to
>> answer questions. To be honest, my workflow is not super efficient, but...
>> oh well, hopefully it will be at least somewhat helpful to others : )
>> Best
>> -P.
>>
>> On Thu, Jul 18, 2019 at 12:59 AM Tim Sell  wrote:
>>
>>> +1, I'd love to see this as a recording. Will you stick it up on youtube
>>> afterwards?
>>>
>>> On Thu, Jul 18, 2019 at 4:00 AM sridhar inuog 
>>> wrote:
>>>
 Thanks, Pablo! Looking forward to it! Hopefully, it will also be
 recorded as well.

 On Wed, Jul 17, 2019 at 2:50 PM Pablo Estrada 
 wrote:

> Yes! So I will be working on a small feature request for Java's
> BigQueryIO: https://issues.apache.org/jira/browse/BEAM-7607
>
> Maybe I'll do something for Python next month. : )
> Best
> -P.
>
> On Wed, Jul 17, 2019 at 12:32 PM Rakesh Kumar 
> wrote:
>
>> +1, I really appreciate this initiative. It would be really helpful
>> newbies like me.
>>
>> Is it possible to list out what are the things that you are planning
>> to cover?
>>
>>
>>
>>
>> On Tue, Jul 16, 2019 at 11:19 AM Yichi Zhang 
>> wrote:
>>
>>> Thanks for organizing this Pablo, it'll be very helpful!
>>>
>>> On Tue, Jul 16, 2019 at 10:57 AM Pablo Estrada 
>>> wrote:
>>>
 Hello all,
 I'll be having a session where I live-fix a Beam bug for 1 hour
 next week. Everyone is invited.

 It will be on July 25, between 3:30pm and 4:30pm PST. Hopefully I
 will finish a full change in that time frame, but we'll see.

 I have not yet decided if I will do this via hangouts, or via a
 youtube livestream. In any case, I will share the link here in the 
 next few
 days.

 I will most likely work on the Java SDK (I have a little feature
 request in mind).

 Thanks!
 -P.

>>>


Re: Portability framework: multiple environments in one pipeline

2019-07-23 Thread Chamikara Jayalath
I think we have primary focussed on the ability run transforms from
multiple SDK in the same pipeline (cross-language) so far, but as Robert
mentioned the framework currently in development should also be usable for
running pipelines that use multiple environments that have the same SDK
installed as well. I'd love to get more clarity on the exact use-case here
(for example, details on why you couldn't run all Python transforms in a
single environment) and to know if others have the same requirement.

Thanks,
Cham

On Mon, Jul 22, 2019 at 12:31 AM Robert Bradshaw 
wrote:

> Yes, for sure. Support for this is available in some runners (like the
> Python Universal Local Runner and Flink) and actively being added to
> others (e.g. Dataflow). There are still some rough edges however--one
> currently must run an expansion service to define a pipeline step in
> an alternative environment (e.g. by registering your transforms and
> running
> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/portability/expansion_service_test.py
> ).
> We'd like to make this process a lot smoother (and feedback would be
> appreciated).
>
> On Sat, Jul 20, 2019 at 7:57 PM Chad Dombrova  wrote:
> >
> > Hi all,
> > I'm interested to know if others on the list would find value in the
> ability to use multiple environments (e.g. docker images) within a single
> pipeline, using some mechanism to identify the environment(s) that a
> transform should use. It would be quite useful for us, since our transforms
> can have conflicting python requirements, or worse, conflicting interpreter
> requirements.  Currently to solve this we have to break the pipeline up
> into multiple pipelines and use pubsub to communicate between them, which
> is not ideal.
> >
> > -chad
> >
>


Re: Live fixing of a Beam bug on July 25 at 3:30pm-4:30pm PST

2019-07-23 Thread Pablo Estrada
Hi all,
I've just realized that https://issues.apache.org/jira/browse/BEAM-7607 is
a single-line change - and we'd spend 40 minutes chitchatting, so I'll also
be working on https://jira.apache.org/jira/browse/BEAM-7803, which is a
Python issue (also for the BigQuery sink!).
Thanks!
-P.

On Sat, Jul 20, 2019 at 2:05 PM Pablo Estrada  wrote:

> Hello all,
>
> This will be streamed on youtube on this link:
> https://www.youtube.com/watch?v=xpIpEO4PUDo
>
> I think there will be a live chat, so I will hopefully be available to
> answer questions. To be honest, my workflow is not super efficient, but...
> oh well, hopefully it will be at least somewhat helpful to others : )
> Best
> -P.
>
> On Thu, Jul 18, 2019 at 12:59 AM Tim Sell  wrote:
>
>> +1, I'd love to see this as a recording. Will you stick it up on youtube
>> afterwards?
>>
>> On Thu, Jul 18, 2019 at 4:00 AM sridhar inuog 
>> wrote:
>>
>>> Thanks, Pablo! Looking forward to it! Hopefully, it will also be
>>> recorded as well.
>>>
>>> On Wed, Jul 17, 2019 at 2:50 PM Pablo Estrada 
>>> wrote:
>>>
 Yes! So I will be working on a small feature request for Java's
 BigQueryIO: https://issues.apache.org/jira/browse/BEAM-7607

 Maybe I'll do something for Python next month. : )
 Best
 -P.

 On Wed, Jul 17, 2019 at 12:32 PM Rakesh Kumar 
 wrote:

> +1, I really appreciate this initiative. It would be really helpful
> newbies like me.
>
> Is it possible to list out what are the things that you are planning
> to cover?
>
>
>
>
> On Tue, Jul 16, 2019 at 11:19 AM Yichi Zhang 
> wrote:
>
>> Thanks for organizing this Pablo, it'll be very helpful!
>>
>> On Tue, Jul 16, 2019 at 10:57 AM Pablo Estrada 
>> wrote:
>>
>>> Hello all,
>>> I'll be having a session where I live-fix a Beam bug for 1 hour next
>>> week. Everyone is invited.
>>>
>>> It will be on July 25, between 3:30pm and 4:30pm PST. Hopefully I
>>> will finish a full change in that time frame, but we'll see.
>>>
>>> I have not yet decided if I will do this via hangouts, or via a
>>> youtube livestream. In any case, I will share the link here in the next 
>>> few
>>> days.
>>>
>>> I will most likely work on the Java SDK (I have a little feature
>>> request in mind).
>>>
>>> Thanks!
>>> -P.
>>>
>>


Re: Choosing a coder for a class that contains a Row?

2019-07-23 Thread Ryan Skraba
Hello Pablo!  Just to clarify -- the Row schemas aren't known at
pipeline construction time, but can be discovered from the instance of
MyData?

Once discovered, is the schema "homogeneous" for all instance of
MyData?  (i.e. someRow will always have the same schema for all
instances afterwards, and there won't be another someRow with a
different schema).

We've encountered a parallel "problem" with pure Avro data, where the
instance is a GenericRecord containing it's own Avro schema but
*without* knowing the schema until the pipeline is run.  The solution
that we've been using is a bit hacky, but we're using an ad hoc
per-job schema registry and a custom coder where each worker saves the
schema in the `encode` before writing the record, and loads it lazily
in the `decode` before reading.

The original code is available[1] (be gentle, it was written with Beam
0.4.0-incubating... and has continued to work until now).

In practice, the ad hoc schema registry is just a server socket in the
Spark driver, in-memory for DirectRunner / local mode, and a a
read/write to a known location in other runners.  There are definitely
other solutions with side-inputs and providers, and the job server in
portability looks like an exciting candidate for per-job schema
registry story...

I'm super eager to see if there are other ideas or a contribution we
can make in this area that's "Beam Row" oriented!

Ryan

[1] 
https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java

On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada  wrote:
>
> Hello all,
> I am writing a utility to push data to PubSub. My data class looks something 
> like so:
> ==
> class MyData {
>   String someId;
>   Row someRow;
>   Row someOtherRow;
> }
> ==
> The schema for the Rows is not known a-priori. It is contained by the Row. I 
> am then pushing this data to pubsub:
> ===
> MyData pushingData = 
> WhatCoder? coder = 
>
> ByteArrayOutputStream os = new ByteArrayOutputStream();
> coder.encode(this, os);
>
> pubsubClient.connect();
> pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
> pubsubClient.close();
> =
> What's the right coder to use in this case? I don't know if SchemaCoder will 
> work, because it seems that it requires the Row's schema a priori. I have not 
> been able to make AvroCoder work.
>
> Any tips?
> Best
> -P.