I should have mentioned that the IOException is from the DataNode (if not 
obvious from the stacktrace :)).  The DFSClient continues along happily by 
fetching the block from a different replica (as one would expect).

-----Original Message-----
From: Sirianni, Eric [mailto:eric.siria...@netapp.com] 
Sent: Monday, August 19, 2013 10:37 AM
To: hdfs-dev@hadoop.apache.org
Subject: RE: mapred replication

Thanks Chris and others for the detailed explanation.

I was aware of the basic rationale behind having a higher replication factor 
for mapred job files - thanks taking the time to elaborate and share with those 
on this list.

After thinking about it a bit more offline, I too speculated that the choice of 
altering the rep factor post file-create was to limit the size of the client 
write pipeline and use background inter-datanode replication to create the 
additional replicas.  Thanks for confirming that intuition.

Regarding the IOExceptions, here is the stack trace in question:

WARN org.apache.hadoop.hdfs.server.datanode.DataNode: 
DatanodeRegistration(10.63.150.50:50010, id=DS-1611001133, infoPort=50075, 
ipcPort=50020):Got exception while serving blk_7363978388743975861_1030 to 
/10.63.150.49:
java.io.IOException: Block blk_7363978388743975861_1030 is not valid.
        at 
org.apache.hadoop.hdfs.server.datanode.FSDataset.getBlockFile(FSDataset.java:1059)
        at 
org.apache.hadoop.hdfs.server.datanode.FSDataset.getLength(FSDataset.java:1022)
        at 
org.apache.hadoop.hdfs.server.datanode.FSDataset.getVisibleLength(FSDataset.java:1032)
        at 
org.apache.hadoop.hdfs.server.datanode.BlockSender.<init>(BlockSender.java:115)
        at 
org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:194)
        at 
org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:104)
        at java.lang.Thread.run(Thread.java:679)

I also speculated that this is due to the way that invalidates are processed 
asynchronously at the namenode.  A quick look at the 
chooseExcessReplicates()->addToInvalidates() path seems to indicate that the 
NameNode does not actually remove the pruned replica from the BlocksMap until 
the subsequent blockReport is received.  This can leave a substantial window 
where the NameNode can return bogus replica locations to clients.

There is another code path FSNamesystem.invalidateBlock() that does proactively 
update the BlocksMap (via FSNamesystem.removeStoredBlock()) after updating the 
recentInvalidateSets.  Perhaps the excess replica pruning path should include 
such a blockmap update as well?  Or even better, just push the common BlocksMap 
removal into the addToInvalidates() method to all callers get that behavior.  

Maybe I'm missing something - is there a legitimate reason for the NameNode to 
keep a replica's metadata in the BlocksMap after it has already decided to 
invalidate said replica?

Eric


-----Original Message-----
From: Robert Evans [mailto:ev...@yahoo-inc.com] 
Sent: Monday, August 19, 2013 10:11 AM
To: hdfs-dev@hadoop.apache.org
Subject: Re: mapred replication

Without the stack trace of the exceptions it is hard to tell.  The pruning
is asynchronous, but so is a node crashing with a replica on it.  The
client is supposed to detect this situation and find a new replica that
works.  I am not that familiar with the code, but I believe in some if not
all of these cases it will log the exception to indicate that something
bad happened, but it recovered.

--Bobby

On 8/16/13 4:40 PM, "Jay Vyas" <jayunit...@gmail.com> wrote:

>Why should this lead to an IOException?  Is it because the pruning of
>replicas is asynchronous and the datanodes try to access nonexistent
>files?  If so that seems like a pretty major bug
>
>
>On Fri, Aug 16, 2013 at 5:21 PM, Chris Nauroth
><cnaur...@hortonworks.com>wrote:
>
>> Hi Eric,
>>
>> Yes, this is intentional.  The job.xml file and the job jar file get
>>read
>> from every node running a map or reduce task.  Because of this, using a
>> higher than normal replication factor on these files improves locality.
>>  More than 3 task slots will have access to local replicas.  These files
>> tend to be much smaller than the actual data read by a job, so there
>>tends
>> to be little harm done in terms of disk space consumption.
>>
>> Why not create the file initially with 10 replicas instead of creating
>>it
>> with 3 and then dialing up?  I imagine this is so that job submission
>> doesn't block on a synchronous write to a long pipeline.  The extra
>> replicas aren't necessary for correctness, and a long-running job will
>>get
>> the locality benefits in the long term once more replicas are created in
>> the background.
>>
>> I recommend submitting a new jira describing the problem that you saw.
>>We
>> probably can handle this better, and a jira would be a good place to
>> discuss the trade-offs.  A few possibilities:
>>
>> Log a warning if mapred.submit.replication < dfs.replication.
>> Skip resetting replication if mapred.submit.replication <=
>>dfs.replication.
>> Fail with error if mapred.submit.replication < dfs.replication.
>>
>> Chris Nauroth
>> Hortonworks
>> http://hortonworks.com/
>>
>>
>>
>> On Thu, Aug 15, 2013 at 6:21 AM, Sirianni, Eric
>><eric.siria...@netapp.com
>> >wrote:
>>
>> > In debugging some replication issues in our HDFS environment, I
>>noticed
>> > that the MapReduce framework uses the following algorithm for setting
>>the
>> > replication on submitted job files:
>> >
>> > 1.     Create the file with *default* DFS replication factor (i.e.
>> > 'dfs.replication')
>> >
>> > 2.     Subsequently alter the replication of the file based on the
>> > 'mapred.submit.replication' config value
>> >
>> >   private static FSDataOutputStream createFile(FileSystem fs, Path
>> > splitFile,
>> >       Configuration job)  throws IOException {
>> >     FSDataOutputStream out = FileSystem.create(fs, splitFile,
>> >         new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
>> >     int replication = job.getInt("mapred.submit.replication", 10);
>> >     fs.setReplication(splitFile, (short)replication);
>> >     writeSplitHeader(out);
>> >     return out;
>> >   }
>> >
>> > If I understand currectly, the net functional effect of this approach
>>is
>> > that
>> >
>> > -       The initial write pipeline is setup with 'dfs.replication'
>>nodes
>> > (i.e. 3)
>> >
>> > -       The namenode triggers additional inter-datanode replications
>>in
>> > the background (as it detects the blocks as "under-replicated").
>> >
>> > I'm assuming this is intentional?  Alternatively, if the
>> > mapred.submit.replication was specified on initial create, the write
>> > pipeline would be significantly larger.
>> >
>> > The reason I noticed is that we had inadvertently specified
>> > mapred.submit.replication as *less than* dfs.replication in our
>> > configuration, which caused a bunch of excess replica pruning (and
>> > ultimately IOExceptions in our datanode logs).
>> >
>> > Thanks,
>> > Eric
>> >
>> >
>>
>> --
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or
>>entity to
>> which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the
>>reader
>> of this message is not the intended recipient, you are hereby notified
>>that
>> any printing, copying, dissemination, distribution, disclosure or
>> forwarding of this communication is strictly prohibited. If you have
>> received this communication in error, please contact the sender
>>immediately
>> and delete it from your system. Thank You.
>>
>
>
>
>-- 
>Jay Vyas
>http://jayunit100.blogspot.com

Reply via email to