I can try the netty version then. But what is the cause of reset by peer? Time out? And if it happens, how can I reestablish the connection? I can add some code to check the connection first and to reestablish the connection if reset by peer before calling putVertexIdMessagesList.
Yuanyuan From: Avery Ching <ach...@apache.org> To: user@giraph.apache.org Date: 06/28/2012 01:20 AM Subject: Re: wierd communication errors In my testing, I found the netty implementation of Giraph (trunk) to be more stable than Hadoop RPC. But you can't do too much (other than reestablish the connection) when the connection is reset by peer. Avery On 6/28/12 12:29 AM, Yuanyuan Tian wrote: I want to make a correction about the errors. The error should be as follows. The errors in my previous email are from my added debug message. But the problem is the same, somehow some connection was reset by peer. I did more tries. Occasionally, my job can actually run without a problem, then more times the job fails because of this connection reset problem. I really don't have a clue what the problem is. Yuanyuan java.lang.IllegalStateException: run: Caught an unrecoverable exception flush: Got ExecutionException at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:859) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369) at org.apache.hadoop.mapred.Child$4.run(Child.java:259) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) at org.apache.hadoop.mapred.Child.main(Child.java:253) Caused by: java.lang.IllegalStateException: flush: Got ExecutionException at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1085) at org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:1080) at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:806) at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:850) ... 7 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.IOException: Call to idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception: java.io.IOException: Connection reset by peer at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) at java.util.concurrent.FutureTask.get(FutureTask.java:83) at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1080) ... 10 more Caused by: java.lang.RuntimeException: java.io.IOException: Call to idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception: java.io.IOException: Connection reset by peer at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:379) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Caused by: java.io.IOException: Call to idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception: java.io.IOException: Connection reset by peer at org.apache.hadoop.ipc.Client.wrapException(Client.java:1065) at org.apache.hadoop.ipc.Client.call(Client.java:1033) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:224) at $Proxy3.putVertexIdMessagesList(Unknown Source) at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:339) ... 6 more Caused by: java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202) at sun.nio.ch.IOUtil.read(IOUtil.java:175) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243) at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:55) at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128) at java.io.FilterInputStream.read(FilterInputStream.java:116) at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:343) at java.io.BufferedInputStream.fill(BufferedInputStream.java:218) at java.io.BufferedInputStream.read(BufferedInputStream.java:237) at java.io.DataInputStream.readInt(DataInputStream.java:370) at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:767) at org.apache.hadoop.ipc.Client$Connection.run(Client.java:712) From: Yuanyuan Tian/Almaden/IBM@IBMUS To: user@giraph.apache.org Cc: user@giraph.apache.org Date: 06/27/2012 10:02 PM Subject: Re: wierd communication errors What do you mean using netty? I am not aware that Giraph is using netty. I am just using what ever the default giraph release 1.0 is using. Yuanyuan From: Avery Ching <ach...@apache.org> To: user@giraph.apache.org Date: 06/27/2012 07:57 PM Subject: Re: wierd communication errors Same issue using netty as well? On 6/27/12 6:14 PM, Yuanyuan Tian wrote: Hi, I was running a giraph job where I constantly got the following communication related errors. The symptom is that in super step 0, most of the workers succeeded but a few of the workers produced the errors below, the machines that caused the connection reset are different in each failed worker. To rule out the probability of the cluster setup error, I also ran a different job and it worked fine. So, the error must be caused by this particular giraph job. My giraph job is just normal message propagation type of job, except that the message is not a of a unique type. Therefore, I defined a special message type (also copied in this email) that incorporates two different types of messages: integer message and double array message. I have tried all day but still couldn't ping point the source of the bug. Can anyone give me some hints on what may have caused this error? Thanks a lot, java.lang.IllegalStateException: run: Caught an unrecoverable exception flush: Got ExecutionException at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:859) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369) at org.apache.hadoop.mapred.Child$4.run(Child.java:259) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) at org.apache.hadoop.mapred.Child.main(Child.java:253) Caused by: java.lang.IllegalStateException: flush: Got ExecutionException at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1082) at org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:1080) at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:806) at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:850) ... 7 more Caused by: java.util.concurrent.ExecutionException: java.lang.reflect.UndeclaredThrowableException at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) at java.util.concurrent.FutureTask.get(FutureTask.java:83) at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1077) ... 10 more Caused by: java.lang.reflect.UndeclaredThrowableException at $Proxy3.getName(Unknown Source) at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:335) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Caused by: java.io.IOException: Call to idp35.almaden.ibm.com/172.16.0.35:30083 failed on local exception: java.io.IOException: Connection reset by peer at org.apache.hadoop.ipc.Client.wrapException(Client.java:1065) at org.apache.hadoop.ipc.Client.call(Client.java:1033) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:224) ... 8 more Caused by: java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202) at sun.nio.ch.IOUtil.read(IOUtil.java:175) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243) at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:55) at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128) at java.io.FilterInputStream.read(FilterInputStream.java:116) at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:343) at java.io.BufferedInputStream.fill(BufferedInputStream.java:218) at java.io.BufferedInputStream.read(BufferedInputStream.java:237) at java.io.DataInputStream.readInt(DataInputStream.java:370) at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:767) at org.apache.hadoop.ipc.Client$Connection.run(Client.java:712) My special messge type: public class MyMessageWritable implements Writable{ public byte msgType=0; public long vertexID=-1; public double[] arrayMsg=null; public int intMsg=-1; public MyMessageWritable () { } public MyMessageWritable (long id, byte tp, int msg) { vertexID=id; msgType=tp; intMsg=msg; } public MyMessageWritable (long id, byte tp, double[] arr) { vertexID=id; msgType=tp; arrayMsg=arr; } @Override public void readFields(DataInput in) throws IOException { vertexID=in.readLong(); msgType=in.readByte(); switch(msgType) { case 1: case 4: intMsg=in.readInt(); break; case 2: case 3: if(arrayMsg==null) arrayMsg=new double[MyVertex.K]; for(int i=0; i<MyVertex.K; i++) arrayMsg[i]=in.readDouble(); break; default: throw new IOException("message type invalid: "+msgType); } } @Override public void write(DataOutput out) throws IOException { out.writeLong(vertexID); out.writeByte(msgType); switch(msgType) { case 1: case 4: out.writeInt(intMsg); break; case 2: case 3: if(arrayMsg==null) throw new IOException("array message is null"); for(int i=0; i<MyVertex.K; i++) out.writeDouble(arrayMsg[i]); break; default: throw new IOException("message type invalid: "+msgType); } }