Dan LaRocque created TINKERPOP-1321:
---------------------------------------
Summary: Loosen coupling between TinkerPop serialization logic and
shaded Kryo
Key: TINKERPOP-1321
URL: https://issues.apache.org/jira/browse/TINKERPOP-1321
Project: TinkerPop
Issue Type: Improvement
Reporter: Dan LaRocque
When running jobs on Spark, TinkerPop currently recommends setting
spark.serializer=GryoSerializer. This makes GryoSerializer responsible for
serializing not just TinkerPop types but also scala runtime types and Spark
internals. GryoSerializer doesn't extend either of the two serializers
provided by Spark. It effectively assumes responsibility for reimplementing
them.
This is problematic. It is not totally trivial to replicate the functionality
of Spark's standard serializers. It is also not easy to empirically test all
meaningful cases. For instance, there is a conditional within Spark that
selects between two concrete Map implementations depending on whether the
current RDD partition count exceeds 2k
(https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala#L47-L53).
The implementation used below this threshold serializes fine on
GryoSerializer. The implementation used above the threshold does not. Above
the partition threshold, I've started getting
{{org.apache.spark.SparkException: Job aborted due to stage failure: Exception
while getting task result: org.apache.tinkerpop.shaded.kryo.KryoException:
java.io.IOException: I failed to find one of the right cookies.}} Google leads
to https://github.com/RoaringBitmap/RoaringBitmap/issues/64. However, just
switching to Spark's {{KryoSerializer}} without changing anything somehow fixes
the problem in my environment, implying that Spark has done something to
address this problem that may not be fully replicated in TinkerPop.
However, "just switching to Spark's KryoSerializer" is not a great approach.
For one thing, we lose the benefit of TinkerPop's space-efficient StarGraph
serializer, and Spark traversals can produce a lot of little ego-StarGraphs.
These still serialize, but KryoSerializer uses its default behavior
(FieldSerializer), which is not as clever about StarGraphs as TinkerPop's
StarGraphSerializer. TinkerPop's reliance on its own internal shaded Kryo
means that its serializers cannot be registered with Spark's unshaded Kryo.
More concerning, it's impossible to completely switch to KryoSerializer just by
tweaking the configuration. Besides spark.serializer, there is also a setting
spark.closure.serializer for which the only supported value is JavaSerializer.
Key TP classes that make it into the object reference graphs of Spark closures
implement Serializable by resorting to TinkerPop's shaded Kryo via HadoopPools
(looking at Object/VertexWritable). This leads to surprises with custom
property data types. It doesn't matter if those types implement Serializable,
and it doesn't matter if Spark's KryoSerializer is configured to accept those
types. If those types are reachable from Object/VertexWritable, then they
must be registered with TinkerPop's internal shaded Kryo, or else it will choke
on them (unless it was explicitly configured to allow unregistered classes).
I suggest the following change to give users more flexibility in their choice
of spark.serializer, and to allow them to reuse TinkerPop's serializers if they
choose not to use GryoSerializer: introduce lightweight interfaces that
decouple TinkerPop's serialization logic from the exact Kryo shaded/unshaded
package doing the work. TinkerPop's serialization logic would be written
against interfaces that replicate a minimal subset of Kryo, and then TP's
shaded Kryo or Spark's unshaded Kryo could be plugged in underneath without
having to touch the source, recompile any TinkerPop code, or munge bytecode at
runtime.
This would not resolve all of the potential problems/complexity around
TinkerPop serialization, but it would make it possible for users to apply the
spark.serializer of their choice, switching off GryoSerializer if they so
choose. Users could also continue to use GyroSerializer as they have until now.
I've already run this through a few iterations locally and have an abstraction
that allows me to run with spark.serializer=KryoSerializer, register
GryoMapper/GryoSerializer's standard set of types, reuse TinkerPop's StarGraph
serializer, and bypass GryoSerializer in HadoopPools to boot. I just rebased
it on current master. I'll submit a PR for discussion/review.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)