FWIW adding (and removing) things from the highly dynamic parameter list of @ProcessElement is precisely what it is intended for.
Kenn On Wed, May 23, 2018 at 2:31 PM Reuven Lax <re...@google.com> wrote: > Romain, maybe it would be useful for us to find some time on slack. I'd > like to understand your concerns. Also keep in mind that I'm tagging all > these classes as Experimental for now, so we can definitely change these > interfaces around if we decide they are not the best ones. > > Reuven > > On Tue, May 22, 2018 at 11:35 PM Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> Why not extending ProcessContext to add the new remapped output? But >> looks good (the part i dont like is that creating a new context each time a >> new feature is added is hurting users. What when beam will add some >> reactive support? ReactiveOutputReceiver?) >> >> Pipeline sounds the wrong storage since once distributed you serialized >> the instances so kind of broke the lifecycle of the original instance and >> have no real release/close hook on them anymore right? Not sure we can do >> better than dofn/source embedded instances today. >> >> >> >> >> Le mer. 23 mai 2018 08:02, Romain Manni-Bucau <rmannibu...@gmail.com> a >> écrit : >> >>> >>> >>> Le mer. 23 mai 2018 07:55, Jean-Baptiste Onofré <j...@nanthrax.net> a >>> écrit : >>> >>>> Hi, >>>> >>>> IMHO, it would be better to have a explicit transform/IO as converter. >>>> >>>> It would be easier for users. >>>> >>>> Another option would be to use a "TypeConverter/SchemaConverter" map as >>>> we do in Camel: Beam could check the source/destination "type" and check >>>> in the map if there's a converter available. This map can be store as >>>> part of the pipeline (as we do for filesystem registration). >>>> >>> >>> >>> It works in camel because it is not strongly typed, isnt it? So can >>> require a beam new pipeline api. >>> >>> +1 for the explicit transform, if added to the pipeline api as coder it >>> wouldnt break the fluent api: >>> >>> p.apply(io).setOutputType(Foo.class) >>> >>> Coders can be a workaround since they owns the type but since the >>> pcollection is the real owner it is surely saner this way, no? >>> >>> Also it needs to ensure all converters are present before running the >>> pipeline probably, no implicit environment converter support is probably >>> good to start to avoid late surprises. >>> >>> >>> >>>> My $0.01 >>>> >>>> Regards >>>> JB >>>> >>>> On 23/05/2018 07:51, Romain Manni-Bucau wrote: >>>> > How does it work on the pipeline side? >>>> > Do you generate these "virtual" IO at build time to enable the fluent >>>> > API to work not erasing generics? >>>> > >>>> > ex: SQL(row)->BigQuery(native) will not compile so we need a >>>> > SQL(row)->BigQuery(row) >>>> > >>>> > Side note unrelated to Row: if you add another registry maybe a >>>> pretask >>>> > is to ensure beam has a kind of singleton/context to avoid to >>>> duplicate >>>> > it or not track it properly. These kind of converters will need a >>>> global >>>> > close and not only per record in general: >>>> > converter.init();converter.convert(row);....converter.destroy();, >>>> > otherwise it easily leaks. This is why it can require some way to not >>>> > recreate it. A quick fix, if you are in bytebuddy already, can be to >>>> add >>>> > it to setup/teardown pby, being more global would be nicer but is more >>>> > challenging. >>>> > >>>> > Romain Manni-Bucau >>>> > @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>> > <https://rmannibucau.metawerx.net/> | Old Blog >>>> > <http://rmannibucau.wordpress.com> | Github >>>> > <https://github.com/rmannibucau> | LinkedIn >>>> > <https://www.linkedin.com/in/rmannibucau> | Book >>>> > < >>>> https://www.packtpub.com/application-development/java-ee-8-high-performance >>>> > >>>> > >>>> > >>>> > Le mer. 23 mai 2018 à 07:22, Reuven Lax <re...@google.com >>>> > <mailto:re...@google.com>> a écrit : >>>> > >>>> > No - the only modules we need to add to core are the ones we >>>> choose >>>> > to add. For example, I will probably add a registration for >>>> > TableRow/TableSchema (GCP BigQuery) so these can work seamlessly >>>> > with schemas. However I will add that to the GCP module, so only >>>> > someone depending on that module need to pull in that dependency. >>>> > The Java ServiceLoader framework can be used by these modules to >>>> > register schemas for their types (we already do something similar >>>> > for FileSystem and for coders as well). >>>> > >>>> > BTW, right now the conversion back and forth between Row objects >>>> I'm >>>> > doing in the ByteBuddy generated bytecode that we generate in >>>> order >>>> > to invoke DoFns. >>>> > >>>> > Reuven >>>> > >>>> > On Tue, May 22, 2018 at 10:04 PM Romain Manni-Bucau >>>> > <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com>> wrote: >>>> > >>>> > Hmm, the pluggability part is close to what I wanted to do >>>> with >>>> > JsonObject as a main API (to avoid to redo a "row" API and >>>> > schema API) >>>> > Row.as(Class<T>) sounds good but then, does it mean we'll get >>>> > beam-sdk-java-row-jsonobject like modules (I'm not against, >>>> just >>>> > trying to understand here)? >>>> > If so, how an IO can use as() with the type it expects? Doesnt >>>> > it lead to have a tons of these modules at the end? >>>> > >>>> > Romain Manni-Bucau >>>> > @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>> > <https://rmannibucau.metawerx.net/> | Old Blog >>>> > <http://rmannibucau.wordpress.com> | Github >>>> > <https://github.com/rmannibucau> | LinkedIn >>>> > <https://www.linkedin.com/in/rmannibucau> | Book >>>> > < >>>> https://www.packtpub.com/application-development/java-ee-8-high-performance >>>> > >>>> > >>>> > >>>> > Le mer. 23 mai 2018 à 04:57, Reuven Lax <re...@google.com >>>> > <mailto:re...@google.com>> a écrit : >>>> > >>>> > By the way Romain, if you have specific scenarios in mind >>>> I >>>> > would love to hear them. I can try and guess what exactly >>>> > you would like to get out of schemas, but it would work >>>> > better if you gave me concrete scenarios that you would >>>> like >>>> > to work. >>>> > >>>> > Reuven >>>> > >>>> > On Tue, May 22, 2018 at 7:45 PM Reuven Lax < >>>> re...@google.com >>>> > <mailto:re...@google.com>> wrote: >>>> > >>>> > Yeah, what I'm working on will help with IO. Basically >>>> > if you register a function with SchemaRegistry that >>>> > converts back and forth between a type (say >>>> JsonObject) >>>> > and a Beam Row, then it is applied by the framework >>>> > behind the scenes as part of DoFn invocation. Concrete >>>> > example: let's say I have an IO that reads json >>>> objects >>>> > class MyJsonIORead extends PTransform<PBegin, >>>> > JsonObject> {...} >>>> > >>>> > If you register a schema for this type (or you can >>>> also >>>> > just set the schema directly on the output >>>> PCollection), >>>> > then Beam knows how to convert back and forth between >>>> > JsonObject and Row. So the next ParDo can look like >>>> > >>>> > p.apply(new MyJsonIORead()) >>>> > .apply(ParDo.of(new DoFn<JsonObject, T>.... >>>> > @ProcessElement void process(@Element Row row) { >>>> > }) >>>> > >>>> > And Beam will automatically convert JsonObject to a >>>> Row >>>> > for processing (you aren't forced to do this of >>>> course - >>>> > you can always ask for it as a JsonObject). >>>> > >>>> > The same is true for output. If you have a sink that >>>> > takes in JsonObject but the transform before it >>>> produces >>>> > Row objects (for instance - because the transform >>>> before >>>> > it is Beam SQL), Beam can automatically convert Row >>>> back >>>> > to JsonObject for you. >>>> > >>>> > All of this was detailed in the Schema doc I shared a >>>> > few months ago. There was a lot of discussion on that >>>> > document from various parties, and some of this API >>>> is a >>>> > result of that discussion. This is also working in the >>>> > branch JB and I were working on, though not yet >>>> > integrated back to master. >>>> > >>>> > I would like to actually go further and make Row an >>>> > interface and provide a way to automatically put a Row >>>> > interface on top of any other object (e.g. JsonObject, >>>> > Pojo, etc.) This won't change the way the user writes >>>> > code, but instead of Beam having to copy and convert >>>> at >>>> > each stage (e.g. from JsonObject to Row) it simply >>>> will >>>> > create a Row object that uses the the JsonObject as >>>> its >>>> > underlying storage. >>>> > >>>> > Reuven >>>> > >>>> > On Tue, May 22, 2018 at 11:37 AM Romain Manni-Bucau >>>> > <rmannibu...@gmail.com <mailto:rmannibu...@gmail.com >>>> >> >>>> > wrote: >>>> > >>>> > Well, beam can implement a new mapper but it >>>> doesnt >>>> > help for io. Most of modern backends will take >>>> json >>>> > directly, even javax one and it must stay generic. >>>> > >>>> > Then since json to pojo mapping is already done a >>>> > dozen of times, not sure it is worth it for now. >>>> > >>>> > Le mar. 22 mai 2018 20:27, Reuven Lax >>>> > <re...@google.com <mailto:re...@google.com>> a >>>> écrit : >>>> > >>>> > We can do even better btw. Building a >>>> > SchemaRegistry where automatic conversions can >>>> > be registered between schema and Java data >>>> > types. With this the user won't even need a >>>> DoFn >>>> > to do the conversion. >>>> > >>>> > On Tue, May 22, 2018, 10:13 AM Romain >>>> > Manni-Bucau <rmannibu...@gmail.com >>>> > <mailto:rmannibu...@gmail.com>> wrote: >>>> > >>>> > Hi guys, >>>> > >>>> > Checked out what has been done on schema >>>> > model and think it is acceptable - >>>> regarding >>>> > the json debate - >>>> > if >>>> https://issues.apache.org/jira/browse/BEAM-4381 >>>> > can be fixed. >>>> > >>>> > High level, it is about providing a >>>> > mainstream and not too impacting model >>>> OOTB >>>> > and JSON seems the most valid option for >>>> > now, at least for IO and some user >>>> transforms. >>>> > >>>> > Wdyt? >>>> > >>>> > Le ven. 27 avr. 2018 18:36, Romain >>>> > Manni-Bucau <rmannibu...@gmail.com >>>> > <mailto:rmannibu...@gmail.com>> a écrit : >>>> > >>>> > Can give it a try end of may, sure. >>>> > (holidays and work constraints will >>>> make >>>> > it hard before). >>>> > >>>> > Le 27 avr. 2018 18:26, "Anton Kedin" >>>> > <ke...@google.com >>>> > <mailto:ke...@google.com>> a écrit : >>>> > >>>> > Romain, >>>> > >>>> > I don't believe that JSON approach >>>> > was investigated very thoroughIy. >>>> I >>>> > mentioned few reasons which will >>>> > make it not the best choice my >>>> > opinion, but I may be wrong. Can >>>> you >>>> > put together a design doc or a >>>> > prototype? >>>> > >>>> > Thank you, >>>> > Anton >>>> > >>>> > >>>> > On Thu, Apr 26, 2018 at 10:17 PM >>>> > Romain Manni-Bucau >>>> > <rmannibu...@gmail.com >>>> > <mailto:rmannibu...@gmail.com>> >>>> wrote: >>>> > >>>> > >>>> > >>>> > Le 26 avr. 2018 23:13, "Anton >>>> > Kedin" <ke...@google.com >>>> > <mailto:ke...@google.com>> a >>>> écrit : >>>> > >>>> > BeamRecord (Row) has very >>>> > little in common with >>>> > JsonObject (I assume >>>> you're >>>> > talking about javax.json), >>>> > except maybe some >>>> > similarities of the API. >>>> Few >>>> > reasons why JsonObject >>>> > doesn't work: >>>> > >>>> > * it is a Java EE API: >>>> > o Beam SDK is not >>>> > limited to Java. >>>> > There are probably >>>> > similar APIs for >>>> > other languages >>>> but >>>> > they might not >>>> > necessarily carry >>>> > the same >>>> semantics / >>>> > APIs; >>>> > >>>> > >>>> > Not a big deal I think. At >>>> least >>>> > not a technical blocker. >>>> > >>>> > o It can change >>>> > between Java >>>> versions; >>>> > >>>> > No, this is javaee ;). >>>> > >>>> > >>>> > o Current Beam java >>>> > implementation is >>>> an >>>> > experimental >>>> feature >>>> > to identify what's >>>> > needed from such >>>> > API, in the end we >>>> > might end up with >>>> > something similar >>>> to >>>> > JsonObject API, >>>> but >>>> > likely not >>>> > >>>> > >>>> > I dont get that point as a >>>> blocker >>>> > >>>> > o ; >>>> > * represents JSON, which >>>> > is not an API but an >>>> > object notation: >>>> > o it is defined as >>>> > unicode string in >>>> a >>>> > certain format. If >>>> > you choose to >>>> adhere >>>> > to ECMA-404, then >>>> it >>>> > doesn't sound like >>>> > JsonObject can >>>> > represent an Avro >>>> > object, if I'm >>>> > reading it right; >>>> > >>>> > >>>> > It is in the generator impl, >>>> you >>>> > can impl an avrogenerator. >>>> > >>>> > * doesn't define a type >>>> > system (JSON does, but >>>> > it's lacking): >>>> > o for example, JSON >>>> > doesn't define >>>> > semantics for >>>> numbers; >>>> > o doesn't define >>>> > date/time types; >>>> > o doesn't allow >>>> > extending JSON >>>> type >>>> > system at all; >>>> > >>>> > >>>> > That is why you need a metada >>>> > object, or simpler, a schema >>>> > with that data. Json or beam >>>> > record doesnt help here and >>>> you >>>> > end up on the same outcome if >>>> > you think about it. >>>> > >>>> > * lacks schemas; >>>> > >>>> > Jsonschema are standard, >>>> widely >>>> > spread and tooled compared to >>>> > alternative. >>>> > >>>> > You can definitely try >>>> > loosen the requirements >>>> and >>>> > define everything in JSON >>>> in >>>> > userland, but the point of >>>> > Row/Schema is to avoid it >>>> > and define everything in >>>> > Beam model, which can be >>>> > extended, mapped to JSON, >>>> > Avro, BigQuery Schemas, >>>> > custom binary format etc., >>>> > with same semantics across >>>> > beam SDKs. >>>> > >>>> > >>>> > This is what jsonp would allow >>>> > with the benefit of a natural >>>> > pojo support through jsonb. >>>> > >>>> > >>>> > >>>> > On Thu, Apr 26, 2018 at >>>> > 12:28 PM Romain >>>> Manni-Bucau >>>> > <rmannibu...@gmail.com >>>> > <mailto: >>>> rmannibu...@gmail.com>> >>>> > wrote: >>>> > >>>> > Just to let it be >>>> clear >>>> > and let me understand: >>>> > how is BeamRecord >>>> > different from a >>>> > JsonObject which is an >>>> > API without >>>> > implementation (not >>>> > event a json one >>>> OOTB)? >>>> > Advantage of json >>>> *api* >>>> > are indeed natural >>>> > mapping (jsonb is >>>> based >>>> > on jsonp so no new >>>> > binding to reinvent) >>>> and >>>> > simple serialization >>>> > (json+gzip for ex, or >>>> > avro if you want to be >>>> > geeky). >>>> > >>>> > I fail to see the >>>> point >>>> > to rebuild an >>>> ecosystem ATM. >>>> > >>>> > Le 26 avr. 2018 19:12, >>>> > "Reuven Lax" >>>> > <re...@google.com >>>> > <mailto: >>>> re...@google.com>> >>>> > a écrit : >>>> > >>>> > Exactly what JB >>>> > said. We will >>>> write >>>> > a generic >>>> conversion >>>> > from Avro (or >>>> json) >>>> > to Beam schemas, >>>> > which will make >>>> them >>>> > work transparently >>>> > with SQL. The plan >>>> > is also to migrate >>>> > Anton's work so >>>> that >>>> > POJOs works >>>> > generically for >>>> any >>>> > schema. >>>> > >>>> > Reuven >>>> > >>>> > On Thu, Apr 26, >>>> 2018 >>>> > at 1:17 AM >>>> > Jean-Baptiste >>>> Onofré >>>> > <j...@nanthrax.net >>>> > <mailto: >>>> j...@nanthrax.net>> >>>> > wrote: >>>> > >>>> > For now we >>>> have >>>> > a generic >>>> schema >>>> > interface. >>>> > Json-b can be >>>> an >>>> > impl, avro >>>> could >>>> > be another >>>> one. >>>> > >>>> > Regards >>>> > JB >>>> > Le 26 avr. >>>> 2018, >>>> > à 12:08, >>>> Romain >>>> > Manni-Bucau >>>> > < >>>> rmannibu...@gmail.com >>>> > <mailto: >>>> rmannibu...@gmail.com>> >>>> > a écrit: >>>> > >>>> > Hmm, >>>> > >>>> > avro has >>>> > still the >>>> > pitfalls >>>> to >>>> > have an >>>> > >>>> uncontrolled >>>> > stack >>>> which >>>> > brings way >>>> > too much >>>> > >>>> dependencies >>>> > to be part >>>> > of any >>>> API, >>>> > this is >>>> why >>>> > I >>>> proposed a >>>> > JSON-P >>>> based >>>> > API >>>> > >>>> (JsonObject) >>>> > with a >>>> > custom >>>> beam >>>> > entry for >>>> > some >>>> > metadata >>>> > (headers >>>> "à >>>> > la >>>> Camel"). >>>> > >>>> > >>>> > Romain >>>> > >>>> Manni-Bucau >>>> > >>>> @rmannibucau >>>> > < >>>> https://twitter.com/rmannibucau> >>>> > | Blog >>>> > < >>>> https://rmannibucau.metawerx.net/> | >>>> > Old Blog >>>> > < >>>> http://rmannibucau.wordpress.com> >>>> > | Github >>>> > < >>>> https://github.com/rmannibucau> | >>>> > LinkedIn >>>> > < >>>> https://www.linkedin.com/in/rmannibucau> | >>>> > Book >>>> > < >>>> https://www.packtpub.com/application-development/java-ee-8-high-performance >>>> > >>>> > >>>> > >>>> > 2018-04-26 >>>> > 9:59 >>>> > GMT+02:00 >>>> > >>>> Jean-Baptiste Onofré >>>> > < >>>> j...@nanthrax.net >>>> > <mailto: >>>> j...@nanthrax.net>>: >>>> > >>>> > >>>> > Hi >>>> Ismael >>>> > >>>> > You >>>> mean >>>> > >>>> directly >>>> > in >>>> Beam >>>> > SQL ? >>>> > >>>> > That >>>> > will >>>> be >>>> > part >>>> of >>>> > schema >>>> > >>>> support: >>>> > >>>> generic >>>> > record >>>> > could >>>> be >>>> > one of >>>> > the >>>> > >>>> payload >>>> > with >>>> > across >>>> > >>>> schema. >>>> > >>>> > >>>> Regards >>>> > JB >>>> > Le 26 >>>> > avr. >>>> > 2018, >>>> à >>>> > 11:39, >>>> > >>>> "Ismaël >>>> > >>>> Mejía" < >>>> > >>>> ieme...@gmail.com >>>> > >>>> <mailto:ieme...@gmail.com>> >>>> > a >>>> écrit: >>>> > >>>> > >>>> Hello Anton, >>>> > >>>> > >>>> Thanks for the descriptive email and the really useful work. Any plans >>>> > >>>> to tackle PCollections of GenericRecord/IndexedRecords? it seems Avro >>>> > >>>> is a natural fit for this approach too. >>>> > >>>> > >>>> Regards, >>>> > >>>> Ismaël >>>> > >>>> > >>>> On Wed, Apr 25, 2018 at 9:04 PM, Anton Kedin <ke...@google.com >>>> > >>>> <mailto:ke...@google.com>> wrote: >>>> > >>>> > >>>> >>>> > >>>> > >>>> Hi, >>>> > >>>> > >>>> I want >>>> > >>>> to >>>> > >>>> highlight >>>> > >>>> a couple >>>> > >>>> of >>>> > >>>> improvements >>>> > >>>> to >>>> > >>>> Beam >>>> > >>>> SQL >>>> > >>>> we >>>> > >>>> have >>>> > >>>> been >>>> > >>>> > >>>> working >>>> > >>>> on >>>> > >>>> recently >>>> > >>>> which >>>> > >>>> are >>>> > >>>> targeted >>>> > >>>> to >>>> > >>>> make >>>> > >>>> Beam >>>> > >>>> SQL >>>> > >>>> API >>>> > >>>> easier >>>> > >>>> to >>>> > >>>> use. >>>> > >>>> > >>>> Specifically >>>> > >>>> these >>>> > >>>> features >>>> > >>>> simplify >>>> > >>>> conversion >>>> > >>>> of >>>> > >>>> Java >>>> > >>>> Beans >>>> > >>>> and >>>> > >>>> JSON >>>> > >>>> > >>>> strings >>>> > >>>> to >>>> > >>>> Rows. >>>> > >>>> > >>>> > >>>> Feel >>>> > >>>> free >>>> > >>>> to >>>> > >>>> try >>>> > >>>> this >>>> > >>>> and >>>> > >>>> send >>>> > >>>> any >>>> > >>>> bugs/comments/PRs >>>> > >>>> my >>>> > >>>> way. >>>> > >>>> > >>>> > >>>> **Caveat: >>>> > >>>> this >>>> > >>>> is >>>> > >>>> still >>>> > >>>> work >>>> > >>>> in >>>> > >>>> progress, >>>> > >>>> and >>>> > >>>> has >>>> > >>>> known >>>> > >>>> bugs >>>> > >>>> and >>>> > >>>> incomplete >>>> > >>>> > >>>> features, >>>> > >>>> see >>>> > >>>> below >>>> > >>>> for >>>> > >>>> details.** >>>> > >>>> > >>>> > >>>> Background >>>> > >>>> > >>>> > >>>> Beam >>>> > >>>> SQL >>>> > >>>> queries >>>> > >>>> can >>>> > >>>> only >>>> > >>>> be >>>> > >>>> applied >>>> > >>>> to >>>> > >>>> PCollection<Row>. >>>> > >>>> This >>>> > >>>> means >>>> > >>>> that >>>> > >>>> > >>>> users >>>> > >>>> need >>>> > >>>> to >>>> > >>>> convert >>>> > >>>> whatever >>>> > >>>> PCollection >>>> > >>>> elements >>>> > >>>> they >>>> > >>>> have >>>> > >>>> to >>>> > >>>> Rows >>>> > >>>> before >>>> > >>>> > >>>> querying >>>> > >>>> them >>>> > >>>> with >>>> > >>>> SQL. >>>> > >>>> This >>>> > >>>> usually >>>> > >>>> requires >>>> > >>>> >>> >>>