edgeList.add(
                        new DefaultEdge<Text, NullWritable>(
                            new Text(tokens[i]),
                            null
                        )
                    );

This is the issue: NullWritable and null are not the same.
You should replace null with NullWritable.get(), or even better use:

new EdgeNoValue<Text>(new Text(tokens[i]))

Let me know if this doesn't work.

From: Zachary Hanif <zh4...@gmail.com<mailto:zh4...@gmail.com>>
Reply-To: "user@giraph.apache.org<mailto:user@giraph.apache.org>" 
<user@giraph.apache.org<mailto:user@giraph.apache.org>>
Date: Wednesday, February 13, 2013 1:11 PM
To: "user@giraph.apache.org<mailto:user@giraph.apache.org>" 
<user@giraph.apache.org<mailto:user@giraph.apache.org>>
Subject: Re: Giraph/Netty issues on a cluster

Sure thing!

Credit where it's due, this is heavily cribbed from 
https://github.com/castagna/jena-grande/tree/master/src/main/java/org/apache/jena/grande/giraph

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.giraph.bsp.BspUtils;
import org.apache.giraph.vertex.Vertex;
import org.apache.giraph.graph.DefaultEdge;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;


public class TestingVertexInputFormat extends TextVertexInputFormat<Text, 
DoubleWritable, NullWritable, DoubleWritable> {

    private static final Logger log = 
LoggerFactory.getLogger(TestingVertexReader.class);

    @Override
    public TextVertexReader createVertexReader(InputSplit split, 
TaskAttemptContext context) throws IOException {
        return new TestingVertexReader();
    }

    public class TestingVertexReader extends TextVertexInputFormat<Text, 
DoubleWritable, NullWritable, DoubleWritable>.TextVertexReader {

        @Override
        public boolean nextVertex() throws IOException, InterruptedException {
            boolean result = getRecordReader().nextKeyValue();
            return result;
        }

        @Override
        public Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> 
getCurrentVertex() throws IOException, InterruptedException {
            Configuration conf = getContext().getConfiguration();
            String line = getRecordReader().getCurrentValue().toString();
            Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> vertex = 
BspUtils.createVertex(conf);
            log.info<http://log.info>("tokens() --> {}", "originalString = ", 
line);
            String tokens[] = line.split(",");
            Text vertexId = new Text(tokens[0]);

            DoubleWritable vertexValue = new 
DoubleWritable(Double.valueOf(tokens[1]));
            List<Edge<Text, NullWritable>> edgeList = Lists.newArrayList();
            for ( int i = 2; i < tokens.length; i++ ) {
                if ( !tokens[0].equals(tokens[i]) ) {
                    edgeList.add(
                        new DefaultEdge<Text, NullWritable>(
                            new Text(tokens[i]),
                            null
                        )
                    );
                }
            }
            if(vertexValue.get() != -1.0 || vertexValue.get() != 1.0){
                vertexValue = new DoubleWritable(Double.valueOf(1.0));
                log.info<http://log.info>("tokens() --> {}", "val1 = ", 
tokens[0]);
                log.info<http://log.info>("tokens() --> {}", "val2 = ", 
tokens[1]);
                log.info<http://log.info>("tokens() --> {}", "val2 = ", line);
                log.info<http://log.info>("tokens() --> {}", "key = ", 
vertexId);
                log.info<http://log.info>("tokens() --> {}", "value = ", 
vertexValue);

            }
            vertex.initialize ( vertexId, vertexValue, edgeList );
            return vertex;
        }
    }
}

On Wed, Feb 13, 2013 at 3:59 PM, Alessandro Presta 
<alessan...@fb.com<mailto:alessan...@fb.com>> wrote:
Can you post your VertexInputFormat code?

From: Zachary Hanif <zh4...@gmail.com<mailto:zh4...@gmail.com>>
Reply-To: "user@giraph.apache.org<mailto:user@giraph.apache.org>" 
<user@giraph.apache.org<mailto:user@giraph.apache.org>>
Date: Wednesday, February 13, 2013 12:31 PM
To: "user@giraph.apache.org<mailto:user@giraph.apache.org>" 
<user@giraph.apache.org<mailto:user@giraph.apache.org>>
Subject: Re: Giraph/Netty issues on a cluster

It is my own code. I'm staring at my VertexInputFormat class right now. It 
extends TextVertexInputFormat<Text, DoubleWritable, NullWritable, 
DoubleWritable>. I cannot imagine why a value would not be set for these 
vertexes, but I'll drop in some code to more stringently ensure value creation.

Why would this begin to fail on a distributed deployment (multiple workers) but 
not with a single worker? The dataset is identical between the two executions.

On Wed, Feb 13, 2013 at 2:35 PM, Alessandro Presta 
<alessan...@fb.com<mailto:alessan...@fb.com>> wrote:
Hi Zachary,

Are you running one of the examples or your own code?
It seems to me that a call to edge.getValue() is returning null, which should 
never happen.

Alessandro

From: Zachary Hanif <zh4...@gmail.com<mailto:zh4...@gmail.com>>
Reply-To: "user@giraph.apache.org<mailto:user@giraph.apache.org>" 
<user@giraph.apache.org<mailto:user@giraph.apache.org>>
Date: Wednesday, February 13, 2013 11:29 AM
To: "user@giraph.apache.org<mailto:user@giraph.apache.org>" 
<user@giraph.apache.org<mailto:user@giraph.apache.org>>
Subject: Giraph/Netty issues on a cluster

(How embarrassing! I forgot a subject header in a previous attempt to post 
this. Please reply to this thread, not the other.)

Hi everyone,

I am having some odd issues when trying to run a Giraph 0.2 job across my CDH 
3u3 cluster. After building the jar, and deploying it across the cluster, I 
start to notice a handful of my nodes reporting the following error:

2013-02-13 17:47:43,341 WARN 
org.apache.giraph.comm.netty.handler.ResponseClientHandler: exceptionCaught: 
Channel failed with remote address 
<EDITED_INTERNAL_DNS>/10.2.0.16:30001<http://10.2.0.16:30001>
java.lang.NullPointerException
    at 
org.apache.giraph.vertex.EdgeListVertexBase.write(EdgeListVertexBase.java:106)
    at 
org.apache.giraph.partition.SimplePartition.write(SimplePartition.java:169)
    at 
org.apache.giraph.comm.requests.SendVertexRequest.writeRequest(SendVertexRequest.java:71)
    at 
org.apache.giraph.comm.requests.WritableRequest.write(WritableRequest.java:127)
    at 
org.apache.giraph.comm.netty.handler.RequestEncoder.encode(RequestEncoder.java:96)
    at 
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:61)
    at 
org.jboss.netty.handler.execution.ExecutionHandler.handleDownstream(ExecutionHandler.java:185)
    at org.jboss.netty.channel.Channels.write(Channels.java:712)
    at org.jboss.netty.channel.Channels.write(Channels.java:679)
    at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:246)
    at 
org.apache.giraph.comm.netty.NettyClient.sendWritableRequest(NettyClient.java:655)
    at 
org.apache.giraph.comm.netty.NettyWorkerClient.sendWritableRequest(NettyWorkerClient.java:144)
    at 
org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.doRequest(NettyWorkerClientRequestProcessor.java:425)
    at 
org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.sendPartitionRequest(NettyWorkerClientRequestProcessor.java:195)
    at 
org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.flush(NettyWorkerClientRequestProcessor.java:365)
    at 
org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:190)
    at 
org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:58)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)

What would be causing this? All other Hadoop jobs run well on the cluster, and 
when the Giraph job is run with only one worker, it completes without any 
issues. When run with any number of workers >1, the above error occurs. I have 
referenced this 
post<http://mail-archives.apache.org/mod_mbox/giraph-user/201209.mbox/%3ccaeq6y7shc4in-l73nr7abizspmrrfw9sfa8tmi3myqml8vk...@mail.gmail.com%3E>
 where superficially similar issues were discussed, but the root cause appears 
to be different, and suggested methods of resolution are not panning out.

As extra background, the 'remote address' changes, as the error cycles through 
my available cluster nodes, and the failing workers do not seem to favor one 
physical machine over another. Not all nodes present this issue, only a handful 
per job. Is there soemthing simple that I am missing?


Reply via email to