Oh sorry, of course I meant Thomas Groh in my previous email.. But @Thomas
Weise this example
<https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L108>
might
help, this is how the Spark runner uses Coders like Thomas Groh described.

And i agree that we should consider making PipelineOptions Serializable or
provide a generic solution for Runners.

Hope this helps,
Amit

On Thu, Jun 2, 2016 at 10:35 PM Amit Sela <amitsel...@gmail.com> wrote:

> Thomas is right, though in my case, I encountered this issue when using
> Spark's new API that uses Encoders
> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala>
>  not
> just for serialization but also for "translating" the object into a schema
> of optimized execution with Tungsten
> <https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html>.
>
> I this case I'm using Kryo and I've solved this by registering (in Spark
> not Beam) custom serializers from
> https://github.com/magro/kryo-serializers
> I would consider (in the future) to implement Encoders with the help of
> Coders but I still didn't wrap my mind around this.
>
> On Thu, Jun 2, 2016 at 9:59 PM Thomas Groh <tg...@google.com.invalid>
> wrote:
>
>> The Beam Model ensures that all PCollections have a Coder; the PCollection
>> Coder is the standard way to materialize the elements of a
>> PCollection[1][2]. Most SDK-provided classes that will need to be
>> transferred across the wire have an associated coder, and some additional
>> default datatypes have coders associated with (in the CoderRegistry[3]).
>>
>> FullWindowedValueCoder[4] is capable of encoding and decoding the entirety
>> of a WindowedValue, and is constructed from a ValueCoder (obtained from
>> the
>> PCollection) and a WindowCoder (obtained from the WindowFn of the
>> WindowingStrategy of the PCollection). Given an input PCollection `pc`,
>> you
>> can construct the FullWindowedValueCoder with the following code snippet
>>
>> ```
>> FullWindowedValueCoder.of(pc.getCoder(),
>> pc.getWindowingStrategy().getWindowFn().windowCoder())
>> ```
>>
>> [1]
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
>> [2]
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java#L130
>> [3]
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java#L94
>> [4]
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L515
>>
>> On Thu, Jun 2, 2016 at 10:41 AM, Thomas Weise <thomas.we...@gmail.com>
>> wrote:
>>
>> > Hi Amit,
>> >
>> > Thanks for the help. I implemented the same serialization workaround for
>> > the PipelineOptions. Since every distributed runner will have to solve
>> > this, would it make sense to provide the serialization support along
>> with
>> > the interface proxy?
>> >
>> > Here is the exception I get with with WindowedValue:
>> >
>> > com.esotericsoftware.kryo.KryoException: Class cannot be created
>> (missing
>> > no-arg constructor):
>> > org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
>> > at
>> >
>> >
>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>> > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>> > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>> > at
>> >
>> >
>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>> > at
>> >
>> >
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>> > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> >
>> > Thanks,
>> > Thomas
>> >
>> >
>> > On Wed, Jun 1, 2016 at 12:45 AM, Amit Sela <amitsel...@gmail.com>
>> wrote:
>> >
>> > > Hi Thomas,
>> > >
>> > > Spark and the Spark runner are using kryo for serialization and it
>> seems
>> > to
>> > > work just fine. What is your exact problem ? stack trace/message ?
>> > > I've hit an issue with Guava's ImmutableList/Map etc. and used
>> > > https://github.com/magro/kryo-serializers for that.
>> > >
>> > > For PipelineOptions you can take a look at the Spark runner code here:
>> > >
>> > >
>> >
>> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java#L73
>> > >
>> > > I'd be happy to assist with Kryo.
>> > >
>> > > Thanks,
>> > > Amit
>> > >
>> > > On Wed, Jun 1, 2016 at 7:10 AM Thomas Weise <t...@apache.org> wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I'm working on putting together a basic runner for Apache Apex.
>> > > >
>> > > > Hitting a couple of serialization related issues with running tests.
>> > Apex
>> > > > is using Kryo for serialization by default (and Kryo can delegate to
>> > > other
>> > > > serialization frameworks).
>> > > >
>> > > > The inner classes of WindowedValue are private and have no default
>> > > > constructor, which the Kryo field serializer does not like. Also
>> these
>> > > > classes are not Java serializable, so that's not a fallback option
>> (not
>> > > > that it would be efficient anyways).
>> > > >
>> > > > What's the recommended technique to move the WindowedValues over the
>> > > wire?
>> > > >
>> > > > Also, PipelineOptions aren't serializable, while most other classes
>> > are.
>> > > > They are needed for example with DoFnRunnerBase, so what's the
>> > > recommended
>> > > > way to distribute them? Disassemble/reassemble? :)
>> > > >
>> > > > Thanks,
>> > > > Thomas
>> > > >
>> > >
>> >
>>
>

Reply via email to