[ 
https://issues.apache.org/jira/browse/TINKERPOP-1341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355382#comment-15355382
 ] 

Dan LaRocque commented on TINKERPOP-1341:
-----------------------------------------

In my preceding comments, I tried to figure out which classes I had missed by 
reading the relevant TinkerPop, Spark, and Chill source.  This comment is about 
approaching the problem from the opposite direction: instantiate a pair of new 
and old serializers, then try to programmatically determine which classes are 
registered with the old but not with the new.

This is not completely straightforward.   As far as I know, I can't invoke some 
handy public Kryo method that returns a {{Set<Class>}} of registrations and 
compare old vs new.  Kryo instances abstract the idea of registered-ness behind 
the ClassResolver interface.   {{ClassResolver.getRegistration(Class)}} is the 
method most of Kryo calls to figure out whether a class is registered.  This is 
sort of a registered-ness predicate in that it returns a {{Registration}} 
instance for those classes which are registered and null for those which are 
not.

TinkerPop's old serialization code has a custom ClassResolver impl called 
{{GryoClassResolver}}. Its internals look quite a bit like those of Kryo's 
{{DefaultClassResolver}}, down to field names and its memoization technique.  
At first glance, one might think we could just compare the contents of the map 
named {{classToRegistration}} in {{GryoClassResolver}} (under the old system) 
vs the same-named map in {{DefaultClassResolver}} (under the new system).

This is a useful comparision, but an imperfect one.  It does not account for 
the five case statements in {{GryoClassResolver.getRegistration}} that consider 
effectively all subtypes of Vertex, Edge, Property  VertexProperty, and Path to 
be registered, and which work in concert with custom serializers that coerce 
instances of such types into detached equivalents.  IOW, there's some logic in 
the old serializer code's {{GryoClassResolver.getRegistration}} method that 
bypasses {{classToRegistration}}.  I think all affected classes are registered 
by the new serialization code's {{GryoRegistrator.getExtraRegistrations}}, 
which explicitly registers each of the relevant subtypes that TP needs (e.g. 
HadoopVertex, ComputerEdge, StarProperty, ...), obviating the custom resolver.  
Although that set of registrations was sufficient for the test suite, it is 
theoretically possible that {{GryoRegistrator.getExtraRegistrations}} omits 
some subtype of Vertex/Prop/etc.  Such an omission would not be apparent in a 
comparison of {{classToRegistration}} maps.
 
Despite that caveat, I still think it's worth comparing {{classToRegistration}} 
on new vs old.  Here's that comparison.
 
{noformat}
// Get set of classes registered under old system
gremlin> sc = new org.apache.spark.SparkConf(true)
==>org.apache.spark.SparkConf@285dc41f
gremlin> gs = new 
org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer(sc)
==>org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer@4f99a5d8
gremlin> oldClassResolverKeys = 
gs.getGryoPool().takeKryo().getClassResolver().classToRegistration.keys().toSet()
 ; []
// Get set of classes registered under new system
gremlin> sc.set('spark.kryo.registrator', 
'org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator')
==>org.apache.spark.SparkConf@285dc41f
gremlin> sc.set('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer')
==>org.apache.spark.SparkConf@285dc41f
gremlin> ks = new org.apache.spark.serializer.KryoSerializer(sc)
==>org.apache.spark.serializer.KryoSerializer@12670955
gremlin> newClassResolverKeys = 
ks.newKryo().getClassResolver().classToRegistration.keys().toSet() ; []
// Show classes registered in the old system but not in the new one
gremlin> oldClassResolverKeys - 
oldClassResolverKeys.intersect(newClassResolverKeys)
==>void
==>class scala.reflect.ClassTag$$anon$1
==>class java.lang.Void
==>class [Lorg.apache.spark.util.collection.CompactBuffer;
==>class scala.reflect.ManifestFactory$$anon$1
{noformat}
 
ManifestFactory$$anon$1 and ClassTag$$anon$1 are pathological cases that only 
apparently come up in testing.  Setting the system property {{is.testing=true}} 
activates a condition in {{GryoRegistrator}} and makes it register them, and 
they disappear from the old-only set:
 
{noformat}
gremlin> System.setProperty('is.testing', 'true')
==>null
gremlin> ks = new org.apache.spark.serializer.KryoSerializer(sc)
==>org.apache.spark.serializer.KryoSerializer@4aefcefb
gremlin> newClassResolverKeys = 
ks.newKryo().getClassResolver().classToRegistration.keys().toSet() ; []
gremlin> oldClassResolverKeys - 
oldClassResolverKeys.intersect(newClassResolverKeys)
==>void
==>class java.lang.Void
==>class [Lorg.apache.spark.util.collection.CompactBuffer;
gremlin>
{noformat}
 
This leaves void, Void, and CompactBuffer[] (note leading "[L").  I'll add Void 
and void alongside CompactBuffer[] in my PR.
 
To check my assertion that CompactBuffer is registered but arrays of the same 
are not:

{noformat} 
gremlin> 
oldClassResolverKeys.contains(org.apache.spark.util.collection.CompactBuffer.class)
==>true
gremlin> 
oldClassResolverKeys.contains(org.apache.spark.util.collection.CompactBuffer[].class)
==>true
gremlin> 
newClassResolverKeys.contains(org.apache.spark.util.collection.CompactBuffer.class)
==>true
gremlin> 
newClassResolverKeys.contains(org.apache.spark.util.collection.CompactBuffer[].class)
==>false
{noformat}
 
It was interesting to see that BoxedUnit is apparently already getting 
registered somewhere.  This might be happening in the chill library that Spark 
uses, or it might be happening somewhere in Spark proper that I overlooked.  I 
won't need to register it in my PR.
 
{noformat}
gremlin> newClassResolverKeys.contains(scala.runtime.BoxedUnit.class)
==>true
{noformat}

> UnshadedKryoAdapter fails to deserialize StarGraph when SparkConf sets 
> spark.rdd.compress=true whereas GryoSerializer works
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: TINKERPOP-1341
>                 URL: https://issues.apache.org/jira/browse/TINKERPOP-1341
>             Project: TinkerPop
>          Issue Type: Bug
>          Components: io
>    Affects Versions: 3.2.1, 3.3.0
>            Reporter: Dylan Bethune-Waddell
>            Priority: Minor
>
> When trying to bulk load a large dataset into Titan I was running into OOM 
> errors and decided to try tweaking some spark configuration settings - 
> although I am having trouble bulk loading with the new 
> GryoRegistrator/UnshadedKryo serialization shim stuff in master whereby a few 
> hundred tasks into the edge loading stage (stage 5) exceptions are thrown 
> complaining about the need to explicitly register CompactBuffer[].class with 
> Kryo, this approach with spark.rdd.compress=true fails a few hundred tasks 
> into the vertex loading stage (stage 1) of BulkLoaderVertexProgram. 
> GryoSerializer instead of KryoSerializer with GryoRegistrator does not fail 
> and successfully loads the data with this compression flag flipped on whereas 
> before I would just get OOM errors until eventually the job was set back so 
> far that it just failed. So it would seem it is desirable in some instances 
> to use this setting, and the new Serialization stuff seems to break it. Could 
> be a Spark upstream issue based on this open JIRA ticket 
> (https://issues.apache.org/jira/browse/SPARK-3630). Here is the exception 
> that is thrown with the middle bits cut out:
> com.esotericsoftware.kryo.KryoException: java.io.IOException: PARSING_ERROR(2)
>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
>         at com.esotericsoftware.kryo.io.Input.require(Input.java:169)
>         at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:715)
>         at com.esotericsoftware.kryo.io.Input.readLong(Input.java:665)
>         at 
> com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:113)
>         at 
> com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(DefaultSerializers.java:103)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>         at 
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoAdapter.readClassAndObject(UnshadedKryoAdapter.java:48)
>         at 
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoAdapter.readClassAndObject(UnshadedKryoAdapter.java:30)
>         at 
> org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer.readEdges(StarGraphSerializer.java:134)
>         at 
> org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer.read(StarGraphSerializer.java:91)
>         at 
> org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer.read(StarGraphSerializer.java:45)
>         at 
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter.read(UnshadedSerializerAdapter.java:55)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
>         at 
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoAdapter.readObject(UnshadedKryoAdapter.java:42)
>         at 
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoAdapter.readObject(UnshadedKryoAdapter.java:30)
>         at 
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.VertexWritableSerializer.read(VertexWritableSerializer.java:46)
>         at 
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.VertexWritableSerializer.read(VertexWritableSerializer.java:36)
>         at 
> org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter.read(UnshadedSerializerAdapter.java:55)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>         at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
> ........................................................ and so on 
> .....................................
> Caused by: java.io.IOException: PARSING_ERROR(2)
>         at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
>         at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
>         at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
>         at 
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:358)
>         at 
> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
>         at 
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:140)
>         ... 51 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to