Currently, we are working on setting up a production hadoop cluster. We have a smaller cluster of four nodes we are working on for development (nn, snn, and two datanodes). From a high level, we flume our data into HDFS, and then due to the way that flume may resend data on error (due to the way it handles it's reliability guarantees), it is possible to get duplicate records. In our case, our batch size is 100 so we occasionally see 100 records duplicated. In order to remedy this, we de-duplicate the data using a map reduce job.
For this job, the mapper has the following generic parameters <LongWriteable, Text, Text, Text> LongWriteable - byte offset from beginning of file Text - Line of text currently being processed Text - Line of text currently being processed Text - Output File Indication And the reducer has the following generic parameters <Text, Text, NullWriteable, Text> Text - Line of text currently being processed Text - Output File Indication NullWriteable - Empty key Text - Line of text currently being processed We read the standard LongWriteable, Text as provided by a TextFileInputFormat input key, value types coming into the mapper. We emit from the mapper the value as the key (ie. the full text string it passed in as the value to the mapper). This would essentially mean that the reducer will receive as input Text, Text. The reducer key is the string of text, and the value is a destination file. The reducer uses the value it received from the mapper to determine an output file as it uses MultipleOutputs. So, our map phase reads the strings, pushes the lines out as the keys so that the sort and partition phase send any duplicate lines to a single reducer, and then the reducer writes no more than one key (line) to its output file. Again, this does work at smaller scales, but when we increase our payload, we start getting numerous IO errors. It fails when we start processing 10's of GB's of data. I have tried numerous different things and believe that either our configuration is an issue or we are doing something fundamentally wrong. Any advice or things to check would be greatly appreciated. I'd be happy to provide more information if it would help to diagnose the problem. Currently collected information: [analytics@bi-data-12 ~]$ hadoop version Hadoop 2.2.0.2.0.6.0-102 Subversion g...@github.com:hortonworks/hadoop.git<mailto:g...@github.com:hortonworks/hadoop.git> -r 02ad68f19849a0a986dac36b705491f6791f9179 Compiled by jenkins on 2014-01-21T00:56Z Compiled with protoc 2.5.0 >From source with checksum 66f6c486e27479105979740178fbf0 This command was run using /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar [analytics@bi-data-12 ~]$ >From the datanode service logs: 2014-04-25 13:23:00,349 ERROR datanode.DataNode (DataXceiver.java:writeBlock(540)) - DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current, /disk2/hdfs/data/current, /disk3/hdfs/data/current, /disk4/hdfs/data/current]'}, localName='bi-data-12.soleocommunications.com:50010', storageID='DS-225705953-10.1.21.12-50010-1394477125650', xmitsInProgress=1}:Exception transfering block BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror 10.1.21.10:50010: java.io.EOFException: Premature EOF: no length prefix available 2014-04-25 13:23:00,349 WARN datanode.DataNode (DataXceiverServer.java:run(155)) - bi-data-12.soleocommunications.com:50010:DataXceiverServer: java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent xcievers: 4096 at org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137) at java.lang.Thread.run(Thread.java:722) 2014-04-25 13:23:00,349 INFO datanode.DataNode (DataXceiver.java:writeBlock(600)) - opWriteBlock BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received exception java.io.EOFException: Premature EOF: no length prefix available 2014-04-25 13:23:00,350 ERROR datanode.DataNode (DataXceiver.java:run(225)) - bi-data-12.soleocommunications.com:50010:DataXceiver error processing WRITE_BLOCK operation src: /10.1.21.12:37307 dest: /10.1.21.12:50010 java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221) at java.lang.Thread.run(Thread.java:722) 2014-04-25 13:23:00,352 INFO datanode.DataNode (DataXceiver.java:writeBlock(432)) - Receiving BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: /10.1.21.12:37311 dest: /10.1.21.12:50010 >From the map job logs we see exceptions like: 2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218) at sun.nio.ch.IOUtil.read(IOUtil.java:191) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359) at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57) at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118) at java.io.FilterInputStream.read(FilterInputStream.java:83) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514) ... 2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending local=/10.1.21.10:43973 remote=/10.1.21.10:50010] at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514) 2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Abandoning BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558 2014-04-25 13:04:53,117 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Excluding datanode 10.1.21.10:50010 2014-04-25 13:06:33,056 WARN [ResponseProcessor for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012] org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor exception for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012 java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.1.21.10:49537 remote=/10.1.21.12:50010] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118) at java.io.FilterInputStream.read(FilterInputStream.java:83) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490) at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721) 2014-04-25 13:07:18,519 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes 10.1.21.12:50010 are bad. Aborting... 2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: All datanodes 10.1.21.12:50010 are bad. Aborting... at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475) ... 2014-04-25 13:32:40,197 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047) ... 2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)