Hi everybody, I'm really sorry to put this problem on the table again but this is blocking for my project, Please help me to solve it.If you need more information please don't hesitateThanks in advancechadi
From: chadijaber...@hotmail.com To: user@giraph.apache.org Subject: Problem with Giraph Date: Tue, 31 Dec 2013 17:09:39 +0100 Hello, I am not sure this is the right way to submit my problem with giraph otherwise I'm sorry.I have developped an algorithm inspired from the giraph shortest path example but where the graphis constructed during the first supersteps and before the shortest path search. My application works fine when used with one worker (on one machine). However when more workers (4 on one machine or on a cluster) are used the following error often appears: ...2013-12-31 16:27:33,472 INFO org.apache.giraph.comm.netty.NettyClient: Using Netty without authentication.2013-12-31 16:27:33,478 INFO org.apache.giraph.comm.netty.NettyServer: start: Using Netty without authentication.2013-12-31 16:27:33,480 INFO org.apache.giraph.comm.netty.NettyServer: start: Using Netty without authentication.2013-12-31 16:27:33,482 INFO org.apache.giraph.comm.netty.NettyServer: start: Using Netty without authentication.2013-12-31 16:27:33,484 INFO org.apache.giraph.comm.netty.NettyClient: Using Netty without authentication.2013-12-31 16:27:33,485 INFO org.apache.giraph.comm.netty.NettyClient: Using Netty without authentication.2013-12-31 16:27:33,487 INFO org.apache.giraph.comm.netty.NettyClient: Using Netty without authentication.2013-12-31 16:27:33,494 INFO org.apache.giraph.comm.netty.NettyClient: connectAllAddresses: Successfully added 4 connections, (4 total connected) 0 failed, 0 failures total.2013-12-31 16:27:33,501 INFO org.apache.giraph.worker.BspServiceWorker: loadInputSplits: Using 1 thread(s), originally 1 threads(s) for 1 total splits.2013-12-31 16:27:33,508 INFO org.apache.giraph.comm.SendPartitionCache: SendPartitionCache: maxVerticesPerTransfer = 100002013-12-31 16:27:33,508 INFO org.apache.giraph.comm.SendPartitionCache: SendPartitionCache: maxEdgesPerTransfer = 800002013-12-31 16:27:33,524 INFO org.apache.giraph.worker.InputSplitsCallable: call: Loaded 0 input splits in 0.020270009 secs, (v=0, e=0) 0.0 vertices/sec, 0.0 edges/sec2013-12-31 16:27:33,527 INFO org.apache.giraph.comm.netty.NettyClient: waitAllRequests: Finished all requests. MBytes/sec sent = 0, MBytes/sec received = 0, MBytesSent = 0, MBytesReceived = 0, ave sent req MBytes = 0, ave received req MBytes = 0, secs waited = 0.6562013-12-31 16:27:33,527 INFO org.apache.giraph.worker.BspServiceWorker: setup: Finally loaded a total of (v=0, e=0)2013-12-31 16:27:33,598 INFO org.apache.giraph.comm.netty.handler.RequestDecoder: decode: Server window metrics MBytes/sec sent = 0, MBytes/sec received = 0, MBytesSent = 0, MBytesReceived = 0, ave sent req MBytes = 0, ave received req MBytes = 0, secs waited = 0.8162013-12-31 16:27:33,605 WARN org.apache.giraph.comm.netty.handler.RequestServerHandler: exceptionCaught: Channel failed with remote address /172.16.45.53:59257java.io.EOFException at org.jboss.netty.buffer.ChannelBufferInputStream.checkAvailable(ChannelBufferInputStream.java:231) at org.jboss.netty.buffer.ChannelBufferInputStream.readInt(ChannelBufferInputStream.java:174) at org.apache.giraph.edge.ByteArrayEdges.readFields(ByteArrayEdges.java:172) at org.apache.giraph.utils.WritableUtils.reinitializeVertexFromDataInput(WritableUtils.java:480) at org.apache.giraph.utils.WritableUtils.readVertexFromDataInput(WritableUtils.java:511) at org.apache.giraph.partition.SimplePartition.readFields(SimplePartition.java:126) at org.apache.giraph.comm.requests.SendVertexRequest.readFieldsRequest(SendVertexRequest.java:66) at org.apache.giraph.comm.requests.WritableRequest.readFields(WritableRequest.java:120) at org.apache.giraph.comm.netty.handler.RequestDecoder.decode(RequestDecoder.java:92) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:72) at org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:69) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) the code for my vertex compute function : public class MergeVertex extendsVertex<LongWritable,DoubleWritable, DoubleWritable, NodeMessage> { ... /*** * Convert a Vertex Id from its LongWritable format to Point format (2 Element Array Format) * @param lng LongWritable Format of the VertexId * @return Alignment point Array */ public static int[] cvtLongToPoint(LongWritable lng){ int[] point={0,0}; point[0]=(int) (lng.get()/1000); point[1]=(int) (lng.get()% 1000); return point; } @Override public void compute(Iterable<NodeMessage> messages) throws IOException { int currentId[]= cvtLongToPoint(getId()); if (getSuperstep()==0) { //NodeValue nv=new NodeValue(); setValue(new DoubleWritable(0d)); } _signallength=getContext().getConfiguration().getInt("SignalLength",0); if((getSuperstep() < _signallength && getId().get()!=0L) || (getSuperstep()== 0 && getId().get()==0L)){ LongWritable dstId=new LongWritable(); //Nodes which are on Graph "Spine" //Remaining Edges Construction if(currentId[0]== currentId[1]){ //right Side for (int i=currentId[1]+1;i<_signallength;i++){ dstId=cvtPointToLong(currentId[0]+1,i); addVertexRequest(dstId,new DoubleWritable(Double.MAX_VALUE)); addEdgeRequest(getId(),EdgeFactory.create(dstId, new DoubleWritable(computeCost(getId(),dstId)))); } //Left Side for (int i=currentId[0]+2;i<_signallength;i++){ dstId=cvtPointToLong(i,currentId[1]+1); addVertexRequest(dstId,new DoubleWritable(Double.MAX_VALUE)); addEdgeRequest(getId(),EdgeFactory.create(dstId, new DoubleWritable(computeCost(getId(),dstId)))); } //Nodes which are not on Graph "Spine" //Remaining Edges Construction }else{ //right Side if(currentId[0]+1<_signallength){ for (int i=currentId[1]+1;i<_signallength;i++){ dstId=cvtPointToLong(currentId[0]+1,i); addEdgeRequest(getId(),EdgeFactory.create(dstId, new DoubleWritable(computeCost(getId(),dstId)))); } } //Left Side if(currentId[1]+1<_signallength){ for (int i=currentId[0]+2;i<_signallength;i++){ dstId=cvtPointToLong(i,currentId[1]+1); addEdgeRequest(getId(),EdgeFactory.create(dstId, new DoubleWritable(computeCost(getId(),dstId)))); } } } //No need to other vertex than source to be active if(getId().get() != 0L){ voteToHalt(); } }else if (getSuperstep() >= _signallength && getSuperstep() < 2*_signallength){ double minDist; long minSource=0L; if(getId().get() == 0L){ minDist=0; }else{ minDist=Double.MAX_VALUE; } for(NodeMessage message : messages){ if(minDist > message.get()){ minDist=message.get(); minSource=message.getSourceID(); } } if (minDist < getValue().get()){ setValue(new DoubleWritable(minDist)); for (Edge<LongWritable, DoubleWritable> edge : getEdges()) { double distance = minDist + edge.getValue().get(); sendMessage(edge.getTargetVertexId(), new NodeMessage(distance,getId().get())); } } //Only last Node is active if(currentId[0] != _signallength-1 || currentId[1] != _signallength-1){ voteToHalt(); } }else if(getSuperstep() >= 2*_signallength){ voteToHalt(); } } If you need more details please don't hesitate. Thanks in advance,Chadi