Awesome! We have't wrapped any stateful processing API in scala but if you have working snippet or ideas it'd be great to share in that ticket.
On Sat, Jan 20, 2018 at 4:31 PM Carlos Alonso <car...@mrcalonso.com> wrote: > Thanks Neville!! > > Your recommendation worked great. Thanks for your help!! > > As a side note, I found this issue: > https://github.com/spotify/scio/issues/448 > > I can share/help there with our experience, as our job, with scio + > stateful + timely processing is working fine as of today > > Regards!! > > On Fri, Jan 19, 2018 at 6:21 PM Neville Li <neville....@gmail.com> wrote: > >> Welcome. >> >> Added an issue so we may improve this in the future: >> https://github.com/spotify/scio/issues/1020 >> >> >> On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <car...@mrcalonso.com> >> wrote: >> >>> To build the beam transform I was following this example: >>> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala >>> >>> To be honest I don't know how to apply timely and stateful processing >>> without using a beam transform or how to rewrite it using the scio built-in >>> you suggest. Could you please give me an example? >>> >>> Thanks for your help! >>> >>> On Fri, Jan 19, 2018 at 5:04 PM Neville Li <neville....@gmail.com> >>> wrote: >>> >>>> That happens when you mix beam transforms into scio and defeats the >>>> safety we have in place. Map the values into something beam-serializable >>>> first or rewrite the transform with a scio built-in which takes care of >>>> KvCoder. >>>> >>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <car...@mrcalonso.com> >>>> wrote: >>>> >>>>> I'm following this example: >>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60 >>>>> >>>>> because I'm building something very similar to a group into batches >>>>> functionality. If I don't set the coder manually, this exception arises: >>>>> https://pastebin.com/xxdDMXSf >>>>> >>>>> Thanks! >>>>> >>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <neville....@gmail.com> >>>>> wrote: >>>>> >>>>>> You shouldn't manually set coder in most cases. It defaults to >>>>>> KryoAtomicCoder for most Scala types. >>>>>> More details: >>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders >>>>>> >>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <car...@mrcalonso.com> >>>>>> wrote: >>>>>> >>>>>>> May it be because I’m using >>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(), >>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) >>>>>>> at >>>>>>> some point in the pipeline >>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]) >>>>>>> outputs a SerializableCoder)? >>>>>>> >>>>>>> This is something I've always wondered. How does one specify a coder >>>>>>> for a case class? >>>>>>> >>>>>>> Regards >>>>>>> >>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <neville....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH >>>>>>>> issue with ideally a snippet that can reproduce the problem? >>>>>>>> >>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <car...@mrcalonso.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi everyone!! >>>>>>>>> >>>>>>>>> I'm building a pipeline to store items from a Google PubSub >>>>>>>>> subscription into GCS buckets. In order to do it I'm using both >>>>>>>>> stateful >>>>>>>>> and timely processing and after building and testing the project >>>>>>>>> locally I >>>>>>>>> tried to run it on Google Dataflow and I started getting those errors. >>>>>>>>> >>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq >>>>>>>>> >>>>>>>>> The item I'm trying to serialize is a KV[String, >>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class >>>>>>>>> defined as >>>>>>>>> (content: String, attrs: Map[String, String]) >>>>>>>>> >>>>>>>>> The underlying clause is java.io.NotSerializableException: >>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's >>>>>>>>> Scio as >>>>>>>>> well) which may suggest that the issue is on serializing the Map, but >>>>>>>>> to be >>>>>>>>> honest, I don't know what does it mean and how to fix it. >>>>>>>>> >>>>>>>>> Can anyone help me, please? >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>