Currently, we are working on setting up a production hadoop cluster. We have a 
smaller cluster of four nodes we are working on for development  (nn, snn, and 
two datanodes). From a high level, we flume our data into HDFS, and then due to 
the way that flume may resend data on error (due to the way it handles it's 
reliability guarantees), it is possible to get duplicate records. In our case, 
our batch size is 100 so we occasionally see 100 records duplicated. In order 
to remedy this, we de-duplicate the data using a map reduce job.

For this job, the mapper has the following generic parameters <LongWriteable, 
Text, Text, Text>

LongWriteable - byte offset from beginning of file
Text - Line of text currently being processed
Text - Line of text currently being processed
Text - Output File Indication

And the reducer has the following generic parameters <Text, Text, 
NullWriteable, Text>
Text - Line of text currently being processed
Text - Output File Indication
NullWriteable - Empty key
Text - Line of text currently being processed

We read the standard LongWriteable, Text as provided by a TextFileInputFormat 
input key, value types coming into the mapper. We emit from the mapper the 
value as the key (ie. the full text string it passed in as the value to the 
mapper). This would essentially mean that the reducer will receive as input 
Text, Text. The reducer key is the string of text, and the value is a 
destination file. The reducer uses the value it received from the mapper to 
determine an output file as it uses MultipleOutputs.

So, our map phase reads the strings, pushes the lines out as the keys so that 
the sort and partition phase send any duplicate lines to a single reducer, and 
then the reducer writes no more than one key (line) to its output file.

Again, this does work at smaller scales, but when we increase our payload, we 
start getting numerous IO errors. It fails when we start processing 10's of 
GB's of data. I have tried numerous different things and believe that either 
our configuration is an issue or we are doing something fundamentally wrong.

Any advice or things to check would be greatly appreciated. I'd be happy to 
provide more information if it would help to diagnose the problem.



Currently collected information:

[analytics@bi-data-12 ~]$ hadoop version
Hadoop 2.2.0.2.0.6.0-102
Subversion 
g...@github.com:hortonworks/hadoop.git<mailto:g...@github.com:hortonworks/hadoop.git>
 -r 02ad68f19849a0a986dac36b705491f6791f9179
Compiled by jenkins on 2014-01-21T00:56Z
Compiled with protoc 2.5.0
>From source with checksum 66f6c486e27479105979740178fbf0
This command was run using /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar
[analytics@bi-data-12 ~]$


>From the datanode service logs:
2014-04-25 13:23:00,349 ERROR datanode.DataNode 
(DataXceiver.java:writeBlock(540)) - 
DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current, 
/disk2/hdfs/data/current, /disk3/hdfs/data/current, 
/disk4/hdfs/data/current]'}, 
localName='bi-data-12.soleocommunications.com:50010', 
storageID='DS-225705953-10.1.21.12-50010-1394477125650', 
xmitsInProgress=1}:Exception transfering block 
BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror 
10.1.21.10:50010: java.io.EOFException: Premature EOF: no length prefix 
available
2014-04-25 13:23:00,349 WARN  datanode.DataNode 
(DataXceiverServer.java:run(155)) - 
bi-data-12.soleocommunications.com:50010:DataXceiverServer:
java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent 
xcievers: 4096
        at 
org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137)
        at java.lang.Thread.run(Thread.java:722)
2014-04-25 13:23:00,349 INFO  datanode.DataNode 
(DataXceiver.java:writeBlock(600)) - opWriteBlock 
BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received exception 
java.io.EOFException: Premature EOF: no length prefix available
2014-04-25 13:23:00,350 ERROR datanode.DataNode (DataXceiver.java:run(225)) - 
bi-data-12.soleocommunications.com:50010:DataXceiver error processing 
WRITE_BLOCK operation  src: /10.1.21.12:37307 dest: /10.1.21.12:50010
java.io.EOFException: Premature EOF: no length prefix available
        at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
        at 
org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511)
        at 
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)
        at 
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)
        at 
org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
        at java.lang.Thread.run(Thread.java:722)
2014-04-25 13:23:00,352 INFO  datanode.DataNode 
(DataXceiver.java:writeBlock(432)) - Receiving 
BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: 
/10.1.21.12:37311 dest: /10.1.21.12:50010



>From the map job logs we see exceptions like:

2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: 
Exception in createBlockOutputStream

java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)

        at sun.nio.ch.IOUtil.read(IOUtil.java:191)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)

        at 
org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)

        at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)

        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)

        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)

        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)

        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
...
2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: 
Exception in createBlockOutputStream
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while 
waiting for channel to be ready for connect. ch : 
java.nio.channels.SocketChannel[connection-pending local=/10.1.21.10:43973 
remote=/10.1.21.10:50010]
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: 
Abandoning BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558
2014-04-25 13:04:53,117 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: 
Excluding datanode 10.1.21.10:50010
2014-04-25 13:06:33,056 WARN [ResponseProcessor for block 
BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012] 
org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor exception  
for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012
java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel 
to be ready for read. ch : java.nio.channels.SocketChannel[connected 
local=/10.1.21.10:49537 remote=/10.1.21.12:50010]
        at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
        at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
2014-04-25 13:07:18,519 ERROR [main] 
org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException 
as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes 
10.1.21.12:50010 are bad. Aborting...
2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild: 
Exception running child : java.io.IOException: All datanodes 10.1.21.12:50010 
are bad. Aborting...
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)

...

2014-04-25 13:32:40,197 ERROR [main] 
org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException 
as:mapred (auth:SIMPLE) 
cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
 failed to create file 
/data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000
 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 
10.1.21.12 because current leaseholder is trying to recreate file.

        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)

        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)

        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)

        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)

        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)

        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)

        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)

        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
...

2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: 
Exception running child : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
 failed to create file 
/data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000
 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 
10.1.21.12 because current leaseholder is trying to recreate file.

        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)

        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)

        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)

        at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)

        at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)

        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)

        at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)

        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)

Reply via email to