edgeList.add( new DefaultEdge<Text, NullWritable>( new Text(tokens[i]), null ) );
This is the issue: NullWritable and null are not the same. You should replace null with NullWritable.get(), or even better use: new EdgeNoValue<Text>(new Text(tokens[i])) Let me know if this doesn't work. From: Zachary Hanif <zh4...@gmail.com<mailto:zh4...@gmail.com>> Reply-To: "user@giraph.apache.org<mailto:user@giraph.apache.org>" <user@giraph.apache.org<mailto:user@giraph.apache.org>> Date: Wednesday, February 13, 2013 1:11 PM To: "user@giraph.apache.org<mailto:user@giraph.apache.org>" <user@giraph.apache.org<mailto:user@giraph.apache.org>> Subject: Re: Giraph/Netty issues on a cluster Sure thing! Credit where it's due, this is heavily cribbed from https://github.com/castagna/jena-grande/tree/master/src/main/java/org/apache/jena/grande/giraph import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.giraph.bsp.BspUtils; import org.apache.giraph.vertex.Vertex; import org.apache.giraph.graph.DefaultEdge; import org.apache.giraph.graph.Edge; import org.apache.giraph.io.formats.TextVertexInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.collect.Maps; public class TestingVertexInputFormat extends TextVertexInputFormat<Text, DoubleWritable, NullWritable, DoubleWritable> { private static final Logger log = LoggerFactory.getLogger(TestingVertexReader.class); @Override public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException { return new TestingVertexReader(); } public class TestingVertexReader extends TextVertexInputFormat<Text, DoubleWritable, NullWritable, DoubleWritable>.TextVertexReader { @Override public boolean nextVertex() throws IOException, InterruptedException { boolean result = getRecordReader().nextKeyValue(); return result; } @Override public Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> getCurrentVertex() throws IOException, InterruptedException { Configuration conf = getContext().getConfiguration(); String line = getRecordReader().getCurrentValue().toString(); Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> vertex = BspUtils.createVertex(conf); log.info<http://log.info>("tokens() --> {}", "originalString = ", line); String tokens[] = line.split(","); Text vertexId = new Text(tokens[0]); DoubleWritable vertexValue = new DoubleWritable(Double.valueOf(tokens[1])); List<Edge<Text, NullWritable>> edgeList = Lists.newArrayList(); for ( int i = 2; i < tokens.length; i++ ) { if ( !tokens[0].equals(tokens[i]) ) { edgeList.add( new DefaultEdge<Text, NullWritable>( new Text(tokens[i]), null ) ); } } if(vertexValue.get() != -1.0 || vertexValue.get() != 1.0){ vertexValue = new DoubleWritable(Double.valueOf(1.0)); log.info<http://log.info>("tokens() --> {}", "val1 = ", tokens[0]); log.info<http://log.info>("tokens() --> {}", "val2 = ", tokens[1]); log.info<http://log.info>("tokens() --> {}", "val2 = ", line); log.info<http://log.info>("tokens() --> {}", "key = ", vertexId); log.info<http://log.info>("tokens() --> {}", "value = ", vertexValue); } vertex.initialize ( vertexId, vertexValue, edgeList ); return vertex; } } } On Wed, Feb 13, 2013 at 3:59 PM, Alessandro Presta <alessan...@fb.com<mailto:alessan...@fb.com>> wrote: Can you post your VertexInputFormat code? From: Zachary Hanif <zh4...@gmail.com<mailto:zh4...@gmail.com>> Reply-To: "user@giraph.apache.org<mailto:user@giraph.apache.org>" <user@giraph.apache.org<mailto:user@giraph.apache.org>> Date: Wednesday, February 13, 2013 12:31 PM To: "user@giraph.apache.org<mailto:user@giraph.apache.org>" <user@giraph.apache.org<mailto:user@giraph.apache.org>> Subject: Re: Giraph/Netty issues on a cluster It is my own code. I'm staring at my VertexInputFormat class right now. It extends TextVertexInputFormat<Text, DoubleWritable, NullWritable, DoubleWritable>. I cannot imagine why a value would not be set for these vertexes, but I'll drop in some code to more stringently ensure value creation. Why would this begin to fail on a distributed deployment (multiple workers) but not with a single worker? The dataset is identical between the two executions. On Wed, Feb 13, 2013 at 2:35 PM, Alessandro Presta <alessan...@fb.com<mailto:alessan...@fb.com>> wrote: Hi Zachary, Are you running one of the examples or your own code? It seems to me that a call to edge.getValue() is returning null, which should never happen. Alessandro From: Zachary Hanif <zh4...@gmail.com<mailto:zh4...@gmail.com>> Reply-To: "user@giraph.apache.org<mailto:user@giraph.apache.org>" <user@giraph.apache.org<mailto:user@giraph.apache.org>> Date: Wednesday, February 13, 2013 11:29 AM To: "user@giraph.apache.org<mailto:user@giraph.apache.org>" <user@giraph.apache.org<mailto:user@giraph.apache.org>> Subject: Giraph/Netty issues on a cluster (How embarrassing! I forgot a subject header in a previous attempt to post this. Please reply to this thread, not the other.) Hi everyone, I am having some odd issues when trying to run a Giraph 0.2 job across my CDH 3u3 cluster. After building the jar, and deploying it across the cluster, I start to notice a handful of my nodes reporting the following error: 2013-02-13 17:47:43,341 WARN org.apache.giraph.comm.netty.handler.ResponseClientHandler: exceptionCaught: Channel failed with remote address <EDITED_INTERNAL_DNS>/10.2.0.16:30001<http://10.2.0.16:30001> java.lang.NullPointerException at org.apache.giraph.vertex.EdgeListVertexBase.write(EdgeListVertexBase.java:106) at org.apache.giraph.partition.SimplePartition.write(SimplePartition.java:169) at org.apache.giraph.comm.requests.SendVertexRequest.writeRequest(SendVertexRequest.java:71) at org.apache.giraph.comm.requests.WritableRequest.write(WritableRequest.java:127) at org.apache.giraph.comm.netty.handler.RequestEncoder.encode(RequestEncoder.java:96) at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:61) at org.jboss.netty.handler.execution.ExecutionHandler.handleDownstream(ExecutionHandler.java:185) at org.jboss.netty.channel.Channels.write(Channels.java:712) at org.jboss.netty.channel.Channels.write(Channels.java:679) at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:246) at org.apache.giraph.comm.netty.NettyClient.sendWritableRequest(NettyClient.java:655) at org.apache.giraph.comm.netty.NettyWorkerClient.sendWritableRequest(NettyWorkerClient.java:144) at org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.doRequest(NettyWorkerClientRequestProcessor.java:425) at org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.sendPartitionRequest(NettyWorkerClientRequestProcessor.java:195) at org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.flush(NettyWorkerClientRequestProcessor.java:365) at org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:190) at org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:58) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) at java.util.concurrent.FutureTask.run(FutureTask.java:166) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) What would be causing this? All other Hadoop jobs run well on the cluster, and when the Giraph job is run with only one worker, it completes without any issues. When run with any number of workers >1, the above error occurs. I have referenced this post<http://mail-archives.apache.org/mod_mbox/giraph-user/201209.mbox/%3ccaeq6y7shc4in-l73nr7abizspmrrfw9sfa8tmi3myqml8vk...@mail.gmail.com%3E> where superficially similar issues were discussed, but the root cause appears to be different, and suggested methods of resolution are not panning out. As extra background, the 'remote address' changes, as the error cycles through my available cluster nodes, and the failing workers do not seem to favor one physical machine over another. Not all nodes present this issue, only a handful per job. Is there soemthing simple that I am missing?