Thank you for your replies, Lukasz, The kryo does not fail on the WindowedValue but on the user class which in some cases is not kryo-serializable usually when there is no default constructor. Switching to java serialization does fail on WindowedValue as it is not defined serializable
Thomas, We thought about writing a custom serializer but it cannot work without knowing the exact coders of each RDD, it is similar to the discussion that was in the past is it mandatory for the user to set coders for each PCollection or not. This suggestion is like asking Beam to auto detect coders for all Pcollections. Ben, Spark uses java serialization to send closures and it will not work if the cluster contains different JVMs that are not compatible. We agree with you all that it will be more efficient to use coders for caching but that means a major rewrite of the Spark runner and changing almost all methods signatures and internal RDDs from a specific type to byte[]. As i said before this is fallback for users who have non serializable user classes, even today if an RDD is cached with kryo it doesn't pass the coder so we are not worsening the situation but enabling users to use the Spark runner. Thanks Kobi בתאריך 24 באוג' 2017 20:37, "Thomas Weise" <[email protected]> כתב: > Would a custom Kryo serializer that uses the coders to perform > serialization help? > > There are various ways Kryo let's you annotate such serializer without full > surgery, including @Bind on the field level or at class level. > > Thanks, > Thomas > > > On Thu, Aug 24, 2017 at 9:57 AM, Lukasz Cwik <[email protected]> > wrote: > > > How does Kryo's FieldSerializer fail on WindowedValue/PaneInfo, it seems > > like those are pretty simple types. > > > > Also, I don't see why they can't be tagged with Serializable but I think > > the original reasoning was that coders should be used to guarantee a > stable > > representation across JVM versions. > > > > On Thu, Aug 24, 2017 at 5:38 AM, Kobi Salant <[email protected]> > > wrote: > > > > > Hi All, > > > > > > I am working on Spark runner issue > > > https://issues.apache.org/jira/browse/BEAM-2669 > > > "Kryo serialization exception when DStreams containing > > > non-Kryo-serializable data are cached" > > > > > > Currently, Spark runner enforce the kryo serializer and in shuffling > > > scenarios we always uses coders to transfer bytes over the network. But > > > Spark can also serialize data for caching/persisting and currently it > > uses > > > kryo for this. > > > > > > Today, when the user uses a class which is not kryo serializable the > > > caching fails and we thought to open the option to use java > serialization > > > as a fallback. > > > > > > Our RDDs/DStreams behind the PCollections are usually typed > > > RDD/DStream<WindowedValue<InputT>> > > > and when Spark tries to java serialize them for caching purposes it > fails > > > on WindowedValue not being java serializable. > > > > > > Is there any objection to add Serializable implements to SDK classes > like > > > WindowedValue, PaneInfo and others? > > > > > > Again, i want to emphasise that coders are a big part of the Spark > runner > > > code and we do use them whenever a shuffle is expected. Changing the > > types > > > of the RDDs/DStreams to byte[] will make the code pretty unreadable > and > > > will weaken type safety checks. > > > > > > Thanks > > > Kobi > > > > > >
