The Beam Model ensures that all PCollections have a Coder; the PCollection
Coder is the standard way to materialize the elements of a
PCollection[1][2]. Most SDK-provided classes that will need to be
transferred across the wire have an associated coder, and some additional
default datatypes have coders associated with (in the CoderRegistry[3]).

FullWindowedValueCoder[4] is capable of encoding and decoding the entirety
of a WindowedValue, and is constructed from a ValueCoder (obtained from the
PCollection) and a WindowCoder (obtained from the WindowFn of the
WindowingStrategy of the PCollection). Given an input PCollection `pc`, you
can construct the FullWindowedValueCoder with the following code snippet

```
FullWindowedValueCoder.of(pc.getCoder(),
pc.getWindowingStrategy().getWindowFn().windowCoder())
```

[1]
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
[2]
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java#L130
[3]
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java#L94
[4]
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L515

On Thu, Jun 2, 2016 at 10:41 AM, Thomas Weise <thomas.we...@gmail.com>
wrote:

> Hi Amit,
>
> Thanks for the help. I implemented the same serialization workaround for
> the PipelineOptions. Since every distributed runner will have to solve
> this, would it make sense to provide the serialization support along with
> the interface proxy?
>
> Here is the exception I get with with WindowedValue:
>
> com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
> no-arg constructor):
> org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
> at
>
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
> at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>
> Thanks,
> Thomas
>
>
> On Wed, Jun 1, 2016 at 12:45 AM, Amit Sela <amitsel...@gmail.com> wrote:
>
> > Hi Thomas,
> >
> > Spark and the Spark runner are using kryo for serialization and it seems
> to
> > work just fine. What is your exact problem ? stack trace/message ?
> > I've hit an issue with Guava's ImmutableList/Map etc. and used
> > https://github.com/magro/kryo-serializers for that.
> >
> > For PipelineOptions you can take a look at the Spark runner code here:
> >
> >
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java#L73
> >
> > I'd be happy to assist with Kryo.
> >
> > Thanks,
> > Amit
> >
> > On Wed, Jun 1, 2016 at 7:10 AM Thomas Weise <t...@apache.org> wrote:
> >
> > > Hi,
> > >
> > > I'm working on putting together a basic runner for Apache Apex.
> > >
> > > Hitting a couple of serialization related issues with running tests.
> Apex
> > > is using Kryo for serialization by default (and Kryo can delegate to
> > other
> > > serialization frameworks).
> > >
> > > The inner classes of WindowedValue are private and have no default
> > > constructor, which the Kryo field serializer does not like. Also these
> > > classes are not Java serializable, so that's not a fallback option (not
> > > that it would be efficient anyways).
> > >
> > > What's the recommended technique to move the WindowedValues over the
> > wire?
> > >
> > > Also, PipelineOptions aren't serializable, while most other classes
> are.
> > > They are needed for example with DoFnRunnerBase, so what's the
> > recommended
> > > way to distribute them? Disassemble/reassemble? :)
> > >
> > > Thanks,
> > > Thomas
> > >
> >
>

Reply via email to