Thanks, works like a charm! For such hidden gems there should be a Beam runner newbie guide ;-)
Thomas On Thu, Jun 2, 2016 at 11:59 AM, Thomas Groh <tg...@google.com.invalid> wrote: > The Beam Model ensures that all PCollections have a Coder; the PCollection > Coder is the standard way to materialize the elements of a > PCollection[1][2]. Most SDK-provided classes that will need to be > transferred across the wire have an associated coder, and some additional > default datatypes have coders associated with (in the CoderRegistry[3]). > > FullWindowedValueCoder[4] is capable of encoding and decoding the entirety > of a WindowedValue, and is constructed from a ValueCoder (obtained from the > PCollection) and a WindowCoder (obtained from the WindowFn of the > WindowingStrategy of the PCollection). Given an input PCollection `pc`, you > can construct the FullWindowedValueCoder with the following code snippet > > ``` > FullWindowedValueCoder.of(pc.getCoder(), > pc.getWindowingStrategy().getWindowFn().windowCoder()) > ``` > > [1] > > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java > [2] > > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java#L130 > [3] > > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java#L94 > [4] > > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L515 > > On Thu, Jun 2, 2016 at 10:41 AM, Thomas Weise <thomas.we...@gmail.com> > wrote: > > > Hi Amit, > > > > Thanks for the help. I implemented the same serialization workaround for > > the PipelineOptions. Since every distributed runner will have to solve > > this, would it make sense to provide the serialization support along with > > the interface proxy? > > > > Here is the exception I get with with WindowedValue: > > > > com.esotericsoftware.kryo.KryoException: Class cannot be created (missing > > no-arg constructor): > > org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow > > at > > > > > com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) > > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) > > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) > > at > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) > > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > > > > Thanks, > > Thomas > > > > > > On Wed, Jun 1, 2016 at 12:45 AM, Amit Sela <amitsel...@gmail.com> wrote: > > > > > Hi Thomas, > > > > > > Spark and the Spark runner are using kryo for serialization and it > seems > > to > > > work just fine. What is your exact problem ? stack trace/message ? > > > I've hit an issue with Guava's ImmutableList/Map etc. and used > > > https://github.com/magro/kryo-serializers for that. > > > > > > For PipelineOptions you can take a look at the Spark runner code here: > > > > > > > > > https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java#L73 > > > > > > I'd be happy to assist with Kryo. > > > > > > Thanks, > > > Amit > > > > > > On Wed, Jun 1, 2016 at 7:10 AM Thomas Weise <t...@apache.org> wrote: > > > > > > > Hi, > > > > > > > > I'm working on putting together a basic runner for Apache Apex. > > > > > > > > Hitting a couple of serialization related issues with running tests. > > Apex > > > > is using Kryo for serialization by default (and Kryo can delegate to > > > other > > > > serialization frameworks). > > > > > > > > The inner classes of WindowedValue are private and have no default > > > > constructor, which the Kryo field serializer does not like. Also > these > > > > classes are not Java serializable, so that's not a fallback option > (not > > > > that it would be efficient anyways). > > > > > > > > What's the recommended technique to move the WindowedValues over the > > > wire? > > > > > > > > Also, PipelineOptions aren't serializable, while most other classes > > are. > > > > They are needed for example with DoFnRunnerBase, so what's the > > > recommended > > > > way to distribute them? Disassemble/reassemble? :) > > > > > > > > Thanks, > > > > Thomas > > > > > > > > > >