Hi,

We use Spark to build graphs of events after querying cassandra. We use
mapPartition for both aggregating events and building two graphs per
partition. Graphs are returned as Tuple2 as follows :

      val nodes = events.mapPartitions(part => {

      var nodeLeft : Node = null
      var nodeRight: Node = null

      part.foreach(seqEvents => {

        val (left, right) = // build graphs from seqEvents

// Merge nodes
        nodeLeft = if( nodeLeft == null) left else nodeLeft.merge(left)
        nodeRight = if( nodeRight == null) right else nodeRight.merge(right)
      })

      if( nodeLeft == null || nodeRight == null) throw new
IllegalStateException("Left/Right node cannot be null")

      val l = (nodeLeft, nodeRight) :: Nil
      l.iterator
    })

    val graph = nodes.reduce((t1, t2) => {
      val (left1, right1) = t1
      val (left2, right2) = t2

      // Always throws NullPointerException on right1
      (left1.merge(left2), right1.merge(right2))
    })


Tuples of graphs are serialized correctly (we use a custom serializer).
However, in the reduce method we get null instead of the right graph.

The problem seems to be cause by the Kryo Serialization. In fact, when we
used java serialization Tuples are deserialized correctly.

In addition, using two mapPartition/reduce, the first one for left graphs
and the second one for right graphs the objects are deserialized correctly
by kryo.

This is our serializer class :

    public class NodeSerializer  extends Serializer<Node> {

    /**
     * {@inheritDoc}
     */
    @Override
    public void write(Kryo kryo, Output output, Node object) {

        LinkedList<Node> nQueue = new LinkedList<>();
        nQueue.add(object);

        LinkedList<Integer> sQueue = new LinkedList<>();

        Iterator<Node> it = nQueue.iterator();

        Integer eol = 1;
        while (it.hasNext()) {
            eol--;
            if( eol < 0) {
                eol = sQueue.poll();
                output.writeInt(eol);
                continue;
            }
            Node o = nQueue.poll();
            nQueue.addAll(o.getChildren());
            sQueue.add(o.getChildren().size());
            kryo.writeObject(output, o.getEvent());
        }
        output.writeInt(0); // EOF
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Node read(Kryo kryo, Input input, Class<Node> type) {

        Event object = kryo.readObject(input, Event.class);
        Node root = new Node(object);
        Node leaf = root;
        int eol = input.readInt();

        LinkedList<Node> queue = new LinkedList<>();

        boolean eof = eol == 0;
        while( ! eof ) {
            if( eol == 0 ) {
                eol  = input.readInt();
                leaf = queue.poll();
                eof = eol == 0;
                continue;
            }
            object = kryo.readObject(input, Event.class);
            queue.add(leaf.add(object));
            eol--;
        }
        return root;
    }
}

Thanks you

-- 
Florian HUSSONNOIS

Reply via email to