Hi, We use Spark to build graphs of events after querying cassandra. We use mapPartition for both aggregating events and building two graphs per partition. Graphs are returned as Tuple2 as follows :
val nodes = events.mapPartitions(part => { var nodeLeft : Node = null var nodeRight: Node = null part.foreach(seqEvents => { val (left, right) = // build graphs from seqEvents // Merge nodes nodeLeft = if( nodeLeft == null) left else nodeLeft.merge(left) nodeRight = if( nodeRight == null) right else nodeRight.merge(right) }) if( nodeLeft == null || nodeRight == null) throw new IllegalStateException("Left/Right node cannot be null") val l = (nodeLeft, nodeRight) :: Nil l.iterator }) val graph = nodes.reduce((t1, t2) => { val (left1, right1) = t1 val (left2, right2) = t2 // Always throws NullPointerException on right1 (left1.merge(left2), right1.merge(right2)) }) Tuples of graphs are serialized correctly (we use a custom serializer). However, in the reduce method we get null instead of the right graph. The problem seems to be cause by the Kryo Serialization. In fact, when we used java serialization Tuples are deserialized correctly. In addition, using two mapPartition/reduce, the first one for left graphs and the second one for right graphs the objects are deserialized correctly by kryo. This is our serializer class : public class NodeSerializer extends Serializer<Node> { /** * {@inheritDoc} */ @Override public void write(Kryo kryo, Output output, Node object) { LinkedList<Node> nQueue = new LinkedList<>(); nQueue.add(object); LinkedList<Integer> sQueue = new LinkedList<>(); Iterator<Node> it = nQueue.iterator(); Integer eol = 1; while (it.hasNext()) { eol--; if( eol < 0) { eol = sQueue.poll(); output.writeInt(eol); continue; } Node o = nQueue.poll(); nQueue.addAll(o.getChildren()); sQueue.add(o.getChildren().size()); kryo.writeObject(output, o.getEvent()); } output.writeInt(0); // EOF } /** * {@inheritDoc} */ @Override public Node read(Kryo kryo, Input input, Class<Node> type) { Event object = kryo.readObject(input, Event.class); Node root = new Node(object); Node leaf = root; int eol = input.readInt(); LinkedList<Node> queue = new LinkedList<>(); boolean eof = eol == 0; while( ! eof ) { if( eol == 0 ) { eol = input.readInt(); leaf = queue.poll(); eof = eol == 0; continue; } object = kryo.readObject(input, Event.class); queue.add(leaf.add(object)); eol--; } return root; } } Thanks you -- Florian HUSSONNOIS