Hi Eric,

IMHO you do have a solution by increasing the xcievers count in
hdfs-site.xml, but this might give you a performance hit

<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>

To get a better understanding of how xcievers work go through this link:
http://blog.cloudera.com/blog/2012/03/hbase-hadoop-xceivers/

Also make sure you do not have a leak in your code where there are open
file descriptors.

Thanks
Divye Sheth


On Mon, Apr 28, 2014 at 9:09 PM, Wright, Eric <
ewri...@soleocommunications.com> wrote:

>  Currently, we are working on setting up a production hadoop cluster. We
> have a smaller cluster of four nodes we are working on for development
>  (nn, snn, and two datanodes). From a high level, we flume our data into
> HDFS, and then due to the way that flume may resend data on error (due to
> the way it handles it’s reliability guarantees), it is possible to get
> duplicate records. In our case, our batch size is 100 so we occasionally
> see 100 records duplicated. In order to remedy this, we de-duplicate the
> data using a map reduce job.
>
>
>
> For this job, the mapper has the following generic parameters
> <LongWriteable, Text, Text, Text>
>
>
>
> LongWriteable – byte offset from beginning of file
>
> Text – Line of text currently being processed
>
> Text – Line of text currently being processed
>
> Text – Output File Indication
>
>
>
> And the reducer has the following generic parameters <Text, Text,
> NullWriteable, Text>
>
> Text – Line of text currently being processed
>
> Text – Output File Indication
>
> NullWriteable – Empty key
>
> Text – Line of text currently being processed
>
>
>
> We read the standard LongWriteable, Text as provided by a
> TextFileInputFormat input key, value types coming into the mapper. We emit
> from the mapper the value as the key (ie. the full text string it passed in
> as the value to the mapper). This would essentially mean that the reducer
> will receive as input Text, Text. The reducer key is the string of text,
> and the value is a destination file. The reducer uses the value it received
> from the mapper to determine an output file as it uses MultipleOutputs.
>
>
>
> So, our map phase reads the strings, pushes the lines out as the keys so
> that the sort and partition phase send any duplicate lines to a single
> reducer, and then the reducer writes no more than one key (line) to its
> output file.
>
>
>
> Again, this does work at smaller scales, but when we increase our payload,
> we start getting numerous IO errors. It fails when we start processing 10’s
> of GB’s of data. I have tried numerous different things and believe that 
> either
> our configuration is an issue or we are doing something fundamentally
> wrong.
>
>
>
> Any advice or things to check would be greatly appreciated. I’d be happy
> to provide more information if it would help to diagnose the problem.
>
>
>
>
>
>
>
> *Currently collected information:*
>
>
>
> [analytics@bi-data-12 ~]$ hadoop version
>
> Hadoop 2.2.0.2.0.6.0-102
>
> Subversion g...@github.com:hortonworks/hadoop.git -r
> 02ad68f19849a0a986dac36b705491f6791f9179
>
> Compiled by jenkins on 2014-01-21T00:56Z
>
> Compiled with protoc 2.5.0
>
> From source with checksum 66f6c486e27479105979740178fbf0
>
> This command was run using
> /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar
>
> [analytics@bi-data-12 ~]$
>
>
>
>
>
> From the datanode service logs:
>
> 2014-04-25 13:23:00,349 ERROR datanode.DataNode
> (DataXceiver.java:writeBlock(540)) -
> DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current,
> /disk2/hdfs/data/current, /disk3/hdfs/data/current,
> /disk4/hdfs/data/current]'}, localName='
> bi-data-12.soleocommunications.com:50010',
> storageID='DS-225705953-10.1.21.12-50010-1394477125650',
> xmitsInProgress=1}:Exception transfering block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror
> 10.1.21.10:50010: java.io.EOFException: Premature EOF: no length prefix
> available
>
> 2014-04-25 13:23:00,349 WARN  datanode.DataNode
> (DataXceiverServer.java:run(155)) - bi-data-12.soleocommunications.com:50010
> :DataXceiverServer:
>
> java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent
> xcievers: 4096
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137)
>
>         at java.lang.Thread.run(Thread.java:722)
>
> 2014-04-25 13:23:00,349 INFO  datanode.DataNode
> (DataXceiver.java:writeBlock(600)) - opWriteBlock
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received
> exception java.io.EOFException: Premature EOF: no length prefix available
>
> 2014-04-25 13:23:00,350 ERROR datanode.DataNode
> (DataXceiver.java:run(225)) - 
> bi-data-12.soleocommunications.com:50010:DataXceiver
> error processing WRITE_BLOCK operation  src: /10.1.21.12:37307 dest: /
> 10.1.21.12:50010
>
> java.io.EOFException: Premature EOF: no length prefix available
>
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
>
>         at java.lang.Thread.run(Thread.java:722)
>
> 2014-04-25 13:23:00,352 INFO  datanode.DataNode
> (DataXceiver.java:writeBlock(432)) - Receiving
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: /
> 10.1.21.12:37311 dest: /10.1.21.12:50010
>
>
>
>
>
>
>
> From the map job logs we see exceptions like:
>
> 2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: 
> Exception in createBlockOutputStream
>
> java.io.IOException: Connection reset by peer
>
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)
>
>         at sun.nio.ch.IOUtil.read(IOUtil.java:191)
>
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)
>
>         at 
> org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
>
>         at 
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
>
>         at 
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>
>         at 
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>
>         at 
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at 
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
>
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160)
>
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
>
>         at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
>
> …
>
> 2014-04-25 13:04:53,116 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
>
> org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while
> waiting for channel to be ready for connect. ch :
> java.nio.channels.SocketChannel[connection-pending 
> local=/10.1.21.10:43973remote=/
> 10.1.21.10:50010]
>
>         at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
>
> 2014-04-25 13:04:53,116 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Abandoning
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558
>
> 2014-04-25 13:04:53,117 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Excluding datanode 10.1.21.10:50010
>
> 2014-04-25 13:06:33,056 WARN [ResponseProcessor for block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012]
> org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor
> exception  for block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012
>
> java.net.SocketTimeoutException: 65000 millis timeout while waiting for
> channel to be ready for read. ch :
> java.nio.channels.SocketChannel[connected local=/10.1.21.10:49537 remote=/
> 10.1.21.12:50010]
>
>         at
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
>
> 2014-04-25 13:07:18,519 ERROR [main]
> org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException
> as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes
> 10.1.21.12:50010 are bad. Aborting...
>
> 2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild:
> Exception running child : java.io.IOException: All datanodes
> 10.1.21.12:50010 are bad. Aborting...
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)
>
>
>
> …
>
> 2014-04-25 13:32:40,197 ERROR [main] 
> org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException 
> as:mapred (auth:SIMPLE) 
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  failed to create file 
> /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000
>  for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 
> 10.1.21.12 because current leaseholder is trying to recreate file.
>
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)
>
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)
>
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
>
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
>
>         at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
>
>         at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
>
>         at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
>
>         at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
>
> …
>
> 2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: 
> Exception running child : 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  failed to create file 
> /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000
>  for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 
> 10.1.21.12 because current leaseholder is trying to recreate file.
>
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)
>
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)
>
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
>
>         at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
>
>         at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
>
>         at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
>
>         at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
>
>         at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
>
>
>

Reply via email to