applying keyed state on top of stream from co-groupByKey output
Hi All, I am trying to merge 2 data streams using coGroupByKey and applying stateful ParDo. Input to the cogroup fun is (word,count) , Full problem explained on stack over flow https://stackoverflow.com/questions/57162131/applying-keyed-state-on-top-of-stream-from-co-group-stream Please help me -- Regards, *Kiran M Hurakadli.*
Re: Choosing a coder for a class that contains a Row?
+dev Thanks Ryan! This is quite helpful. Still not what I need : ) - but useful. The data is change data capture from databases, and I'm putting it into a Beam Row. The schema for the Row is generally homogeneous, but subject to change at some point in the future if the schema in the database changes. It's unusual and unlikely, but possible. I have no idea how Beam deals with evolving schemas. +Reuven Lax is there documentation / examples / anything around this? : ) I think evolving schemas is an interesting question For now, I am going to Java-serialize the objects, and delay figuring this out. But I reckon I'll have to come back to this... Best -P. On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba wrote: > Hello Pablo! Just to clarify -- the Row schemas aren't known at > pipeline construction time, but can be discovered from the instance of > MyData? > > Once discovered, is the schema "homogeneous" for all instance of > MyData? (i.e. someRow will always have the same schema for all > instances afterwards, and there won't be another someRow with a > different schema). > > We've encountered a parallel "problem" with pure Avro data, where the > instance is a GenericRecord containing it's own Avro schema but > *without* knowing the schema until the pipeline is run. The solution > that we've been using is a bit hacky, but we're using an ad hoc > per-job schema registry and a custom coder where each worker saves the > schema in the `encode` before writing the record, and loads it lazily > in the `decode` before reading. > > The original code is available[1] (be gentle, it was written with Beam > 0.4.0-incubating... and has continued to work until now). > > In practice, the ad hoc schema registry is just a server socket in the > Spark driver, in-memory for DirectRunner / local mode, and a a > read/write to a known location in other runners. There are definitely > other solutions with side-inputs and providers, and the job server in > portability looks like an exciting candidate for per-job schema > registry story... > > I'm super eager to see if there are other ideas or a contribution we > can make in this area that's "Beam Row" oriented! > > Ryan > > [1] > https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java > > On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada wrote: > > > > Hello all, > > I am writing a utility to push data to PubSub. My data class looks > something like so: > > == > > class MyData { > > String someId; > > Row someRow; > > Row someOtherRow; > > } > > == > > The schema for the Rows is not known a-priori. It is contained by the > Row. I am then pushing this data to pubsub: > > === > > MyData pushingData = > > WhatCoder? coder = > > > > ByteArrayOutputStream os = new ByteArrayOutputStream(); > > coder.encode(this, os); > > > > pubsubClient.connect(); > > > pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build()); > > pubsubClient.close(); > > = > > What's the right coder to use in this case? I don't know if SchemaCoder > will work, because it seems that it requires the Row's schema a priori. I > have not been able to make AvroCoder work. > > > > Any tips? > > Best > > -P. >
Re: Portability framework: multiple environments in one pipeline
On Tue, Jul 23, 2019 at 3:45 PM Chad Dombrova wrote: > Our specific situation is pretty unique, but I think it fits a more > general pattern. We use a number of media applications and each comes with > its own built-in python interpreter (Autodesk Maya and SideFX Houndini, for > example), and the core modules for each application can only be imported > within their respective interpreter. We want to be able to create > pipelines where certain transforms are hosted within different application > interpreters, so that we can avoid the ugly workarounds that we have to do > now. > > I can imagine a similar scenario where a user wants to use a number of > different libraries for different transforms, but the libraries’ > requirements conflict with each other, or perhaps some require python3 and > others are stuck on python2. > Thanks for the details. > > Where can I find documentation on the expansion service? I found a design > doc which was helpful, but it seems to hew toward the hypothetical, so I > think there have been a number of concrete steps taken since it was > written: > > https://docs.google.com/document/d/1veiDF2dVH_5y56YxCcri2elCtYJgJnqw9aKPWAgddH8/mobilebasic > For now, I suggest you follow one of the existing examples. For example, We start an expansion service here: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_xlang.py#L123 Expansion jar file generation and Gradle build logic is here: https://github.com/apache/beam/blob/master/sdks/python/build.gradle#L408 Currently this is only supported by Flink runner. Support for Dataflow runner is in the works. There have been discussions about automatically starting expansion services and we should be adding documentation around current solution in the future. Thanks, Cham > > -chad > > > > On Tue, Jul 23, 2019 at 1:39 PM Chamikara Jayalath > wrote: > >> I think we have primary focussed on the ability run transforms from >> multiple SDK in the same pipeline (cross-language) so far, but as Robert >> mentioned the framework currently in development should also be usable for >> running pipelines that use multiple environments that have the same SDK >> installed as well. I'd love to get more clarity on the exact use-case here >> (for example, details on why you couldn't run all Python transforms in a >> single environment) and to know if others have the same requirement. >> >> Thanks, >> Cham >> >> >> On Mon, Jul 22, 2019 at 12:31 AM Robert Bradshaw >> wrote: >> >>> Yes, for sure. Support for this is available in some runners (like the >>> Python Universal Local Runner and Flink) and actively being added to >>> others (e.g. Dataflow). There are still some rough edges however--one >>> currently must run an expansion service to define a pipeline step in >>> an alternative environment (e.g. by registering your transforms and >>> running >>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/portability/expansion_service_test.py >>> ). >>> We'd like to make this process a lot smoother (and feedback would be >>> appreciated). >>> >>> On Sat, Jul 20, 2019 at 7:57 PM Chad Dombrova wrote: >>> > >>> > Hi all, >>> > I'm interested to know if others on the list would find value in the >>> ability to use multiple environments (e.g. docker images) within a single >>> pipeline, using some mechanism to identify the environment(s) that a >>> transform should use. It would be quite useful for us, since our transforms >>> can have conflicting python requirements, or worse, conflicting interpreter >>> requirements. Currently to solve this we have to break the pipeline up >>> into multiple pipelines and use pubsub to communicate between them, which >>> is not ideal. >>> > >>> > -chad >>> > >>> >>
Re: Portability framework: multiple environments in one pipeline
Our specific situation is pretty unique, but I think it fits a more general pattern. We use a number of media applications and each comes with its own built-in python interpreter (Autodesk Maya and SideFX Houndini, for example), and the core modules for each application can only be imported within their respective interpreter. We want to be able to create pipelines where certain transforms are hosted within different application interpreters, so that we can avoid the ugly workarounds that we have to do now. I can imagine a similar scenario where a user wants to use a number of different libraries for different transforms, but the libraries’ requirements conflict with each other, or perhaps some require python3 and others are stuck on python2. Where can I find documentation on the expansion service? I found a design doc which was helpful, but it seems to hew toward the hypothetical, so I think there have been a number of concrete steps taken since it was written: https://docs.google.com/document/d/1veiDF2dVH_5y56YxCcri2elCtYJgJnqw9aKPWAgddH8/mobilebasic -chad On Tue, Jul 23, 2019 at 1:39 PM Chamikara Jayalath wrote: > I think we have primary focussed on the ability run transforms from > multiple SDK in the same pipeline (cross-language) so far, but as Robert > mentioned the framework currently in development should also be usable for > running pipelines that use multiple environments that have the same SDK > installed as well. I'd love to get more clarity on the exact use-case here > (for example, details on why you couldn't run all Python transforms in a > single environment) and to know if others have the same requirement. > > Thanks, > Cham > > > On Mon, Jul 22, 2019 at 12:31 AM Robert Bradshaw > wrote: > >> Yes, for sure. Support for this is available in some runners (like the >> Python Universal Local Runner and Flink) and actively being added to >> others (e.g. Dataflow). There are still some rough edges however--one >> currently must run an expansion service to define a pipeline step in >> an alternative environment (e.g. by registering your transforms and >> running >> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/portability/expansion_service_test.py >> ). >> We'd like to make this process a lot smoother (and feedback would be >> appreciated). >> >> On Sat, Jul 20, 2019 at 7:57 PM Chad Dombrova wrote: >> > >> > Hi all, >> > I'm interested to know if others on the list would find value in the >> ability to use multiple environments (e.g. docker images) within a single >> pipeline, using some mechanism to identify the environment(s) that a >> transform should use. It would be quite useful for us, since our transforms >> can have conflicting python requirements, or worse, conflicting interpreter >> requirements. Currently to solve this we have to break the pipeline up >> into multiple pipelines and use pubsub to communicate between them, which >> is not ideal. >> > >> > -chad >> > >> >
Re: Live fixing of a Beam bug on July 25 at 3:30pm-4:30pm PST
Pablo, Assigned https://issues.apache.org/jira/browse/BEAM-7607 to you, to make even more likely that it is still around on the 25th :-) Cheers, Austin On Tue, Jul 23, 2019 at 11:24 AM Pablo Estrada wrote: > Hi all, > I've just realized that https://issues.apache.org/jira/browse/BEAM-7607 is > a single-line change - and we'd spend 40 minutes chitchatting, so I'll also > be working on https://jira.apache.org/jira/browse/BEAM-7803, which is a > Python issue (also for the BigQuery sink!). > Thanks! > -P. > > On Sat, Jul 20, 2019 at 2:05 PM Pablo Estrada wrote: > >> Hello all, >> >> This will be streamed on youtube on this link: >> https://www.youtube.com/watch?v=xpIpEO4PUDo >> >> I think there will be a live chat, so I will hopefully be available to >> answer questions. To be honest, my workflow is not super efficient, but... >> oh well, hopefully it will be at least somewhat helpful to others : ) >> Best >> -P. >> >> On Thu, Jul 18, 2019 at 12:59 AM Tim Sell wrote: >> >>> +1, I'd love to see this as a recording. Will you stick it up on youtube >>> afterwards? >>> >>> On Thu, Jul 18, 2019 at 4:00 AM sridhar inuog >>> wrote: >>> Thanks, Pablo! Looking forward to it! Hopefully, it will also be recorded as well. On Wed, Jul 17, 2019 at 2:50 PM Pablo Estrada wrote: > Yes! So I will be working on a small feature request for Java's > BigQueryIO: https://issues.apache.org/jira/browse/BEAM-7607 > > Maybe I'll do something for Python next month. : ) > Best > -P. > > On Wed, Jul 17, 2019 at 12:32 PM Rakesh Kumar > wrote: > >> +1, I really appreciate this initiative. It would be really helpful >> newbies like me. >> >> Is it possible to list out what are the things that you are planning >> to cover? >> >> >> >> >> On Tue, Jul 16, 2019 at 11:19 AM Yichi Zhang >> wrote: >> >>> Thanks for organizing this Pablo, it'll be very helpful! >>> >>> On Tue, Jul 16, 2019 at 10:57 AM Pablo Estrada >>> wrote: >>> Hello all, I'll be having a session where I live-fix a Beam bug for 1 hour next week. Everyone is invited. It will be on July 25, between 3:30pm and 4:30pm PST. Hopefully I will finish a full change in that time frame, but we'll see. I have not yet decided if I will do this via hangouts, or via a youtube livestream. In any case, I will share the link here in the next few days. I will most likely work on the Java SDK (I have a little feature request in mind). Thanks! -P. >>>
Re: Portability framework: multiple environments in one pipeline
I think we have primary focussed on the ability run transforms from multiple SDK in the same pipeline (cross-language) so far, but as Robert mentioned the framework currently in development should also be usable for running pipelines that use multiple environments that have the same SDK installed as well. I'd love to get more clarity on the exact use-case here (for example, details on why you couldn't run all Python transforms in a single environment) and to know if others have the same requirement. Thanks, Cham On Mon, Jul 22, 2019 at 12:31 AM Robert Bradshaw wrote: > Yes, for sure. Support for this is available in some runners (like the > Python Universal Local Runner and Flink) and actively being added to > others (e.g. Dataflow). There are still some rough edges however--one > currently must run an expansion service to define a pipeline step in > an alternative environment (e.g. by registering your transforms and > running > https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/portability/expansion_service_test.py > ). > We'd like to make this process a lot smoother (and feedback would be > appreciated). > > On Sat, Jul 20, 2019 at 7:57 PM Chad Dombrova wrote: > > > > Hi all, > > I'm interested to know if others on the list would find value in the > ability to use multiple environments (e.g. docker images) within a single > pipeline, using some mechanism to identify the environment(s) that a > transform should use. It would be quite useful for us, since our transforms > can have conflicting python requirements, or worse, conflicting interpreter > requirements. Currently to solve this we have to break the pipeline up > into multiple pipelines and use pubsub to communicate between them, which > is not ideal. > > > > -chad > > >
Re: Live fixing of a Beam bug on July 25 at 3:30pm-4:30pm PST
Hi all, I've just realized that https://issues.apache.org/jira/browse/BEAM-7607 is a single-line change - and we'd spend 40 minutes chitchatting, so I'll also be working on https://jira.apache.org/jira/browse/BEAM-7803, which is a Python issue (also for the BigQuery sink!). Thanks! -P. On Sat, Jul 20, 2019 at 2:05 PM Pablo Estrada wrote: > Hello all, > > This will be streamed on youtube on this link: > https://www.youtube.com/watch?v=xpIpEO4PUDo > > I think there will be a live chat, so I will hopefully be available to > answer questions. To be honest, my workflow is not super efficient, but... > oh well, hopefully it will be at least somewhat helpful to others : ) > Best > -P. > > On Thu, Jul 18, 2019 at 12:59 AM Tim Sell wrote: > >> +1, I'd love to see this as a recording. Will you stick it up on youtube >> afterwards? >> >> On Thu, Jul 18, 2019 at 4:00 AM sridhar inuog >> wrote: >> >>> Thanks, Pablo! Looking forward to it! Hopefully, it will also be >>> recorded as well. >>> >>> On Wed, Jul 17, 2019 at 2:50 PM Pablo Estrada >>> wrote: >>> Yes! So I will be working on a small feature request for Java's BigQueryIO: https://issues.apache.org/jira/browse/BEAM-7607 Maybe I'll do something for Python next month. : ) Best -P. On Wed, Jul 17, 2019 at 12:32 PM Rakesh Kumar wrote: > +1, I really appreciate this initiative. It would be really helpful > newbies like me. > > Is it possible to list out what are the things that you are planning > to cover? > > > > > On Tue, Jul 16, 2019 at 11:19 AM Yichi Zhang > wrote: > >> Thanks for organizing this Pablo, it'll be very helpful! >> >> On Tue, Jul 16, 2019 at 10:57 AM Pablo Estrada >> wrote: >> >>> Hello all, >>> I'll be having a session where I live-fix a Beam bug for 1 hour next >>> week. Everyone is invited. >>> >>> It will be on July 25, between 3:30pm and 4:30pm PST. Hopefully I >>> will finish a full change in that time frame, but we'll see. >>> >>> I have not yet decided if I will do this via hangouts, or via a >>> youtube livestream. In any case, I will share the link here in the next >>> few >>> days. >>> >>> I will most likely work on the Java SDK (I have a little feature >>> request in mind). >>> >>> Thanks! >>> -P. >>> >>
Re: Choosing a coder for a class that contains a Row?
Hello Pablo! Just to clarify -- the Row schemas aren't known at pipeline construction time, but can be discovered from the instance of MyData? Once discovered, is the schema "homogeneous" for all instance of MyData? (i.e. someRow will always have the same schema for all instances afterwards, and there won't be another someRow with a different schema). We've encountered a parallel "problem" with pure Avro data, where the instance is a GenericRecord containing it's own Avro schema but *without* knowing the schema until the pipeline is run. The solution that we've been using is a bit hacky, but we're using an ad hoc per-job schema registry and a custom coder where each worker saves the schema in the `encode` before writing the record, and loads it lazily in the `decode` before reading. The original code is available[1] (be gentle, it was written with Beam 0.4.0-incubating... and has continued to work until now). In practice, the ad hoc schema registry is just a server socket in the Spark driver, in-memory for DirectRunner / local mode, and a a read/write to a known location in other runners. There are definitely other solutions with side-inputs and providers, and the job server in portability looks like an exciting candidate for per-job schema registry story... I'm super eager to see if there are other ideas or a contribution we can make in this area that's "Beam Row" oriented! Ryan [1] https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada wrote: > > Hello all, > I am writing a utility to push data to PubSub. My data class looks something > like so: > == > class MyData { > String someId; > Row someRow; > Row someOtherRow; > } > == > The schema for the Rows is not known a-priori. It is contained by the Row. I > am then pushing this data to pubsub: > === > MyData pushingData = > WhatCoder? coder = > > ByteArrayOutputStream os = new ByteArrayOutputStream(); > coder.encode(this, os); > > pubsubClient.connect(); > pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build()); > pubsubClient.close(); > = > What's the right coder to use in this case? I don't know if SchemaCoder will > work, because it seems that it requires the Row's schema a priori. I have not > been able to make AvroCoder work. > > Any tips? > Best > -P.