Author: mbautin
Date: Thu Feb  2 22:38:48 2012
New Revision: 1239892

URL: http://svn.apache.org/viewvc?rev=1239892&view=rev
Log:
[jira] [HBASE-5259] Normalize the RegionLocation in TableInputFormat by the 
reverse DNS lookup.

Summary:
Assuming the HBase and MapReduce running in the same cluster, the
TableInputFormat is to override the split function which divides all the
regions from one particular table into a series of mapper tasks. So each
mapper task can process a region or one part of a region. Ideally, the
mapper task should run on the same machine on which the region server
hosts the corresponding region.  That's the motivation that the
TableInputFormat sets the RegionLocation so that the MapReduce framework
can respect the node locality.

The code simply set the host name of the region server as the
HRegionLocation.  However, the host name of the region server may have
different format with the host name of the task tracker (Mapper task).
The task tracker always gets its hostname by the reverse DNS lookup. And
the DNS service may return different host name format. For example, the
host name of the region server is correctly set as a.b.c.d while the
reverse DNS lookup may return a.b.c.d. (With an additional doc in the
end).

So the solution is to set the RegionLocation by the reverse DNS lookup
as well.  No matter what host name format the DNS system is using, the
TableInputFormat has the responsibility to keep the consistent host name
format with the MapReduce framework.

Test Plan: running all the unit tests

Reviewers: Kannan, Karthik, mbautin

Reviewed By: kannan

CC: JIRA, tedyu, Liyin, Kannan, gqchen

Differential Revision: https://reviews.facebook.net/D1413


Modified:
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1239892&r1=1239891&r2=1239892&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
 Thu Feb  2 22:38:48 2012
@@ -20,15 +20,19 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
+import javax.naming.NamingException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -38,7 +42,7 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.net.DNS;
 
 /**
  * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
@@ -82,6 +86,13 @@ extends InputFormat<ImmutableBytesWritab
   /** The number of mappers to assign to each region. */
   private int numMappersPerRegion = 1;
 
+  /** The reverse DNS lookup cache mapping: IPAddress => HostName */
+  private HashMap<InetAddress, String> reverseDNSCacheMap =
+    new HashMap<InetAddress, String>();
+  
+  /** The NameServer address */
+  private String nameServer = null;
+  
   /**
    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
    * the default.
@@ -125,6 +136,10 @@ extends InputFormat<ImmutableBytesWritab
    */
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException {
+    // Get the name server address and the default value is null.
+    this.nameServer =
+      context.getConfiguration().get("hbase.nameserver.address", null);
+    
     Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
     if (keys == null || keys.getFirst() == null ||
         keys.getFirst().length == 0) {
@@ -192,8 +207,19 @@ extends InputFormat<ImmutableBytesWritab
           keys.getSecond()[i / numMappersPerRegion])) {
         continue;
       }
-      String regionLocation = table.getRegionLocation(splitKeys.getFirst()[i]).
-        getServerAddress().getHostname();
+      HServerAddress regionServerAddress = 
+        table.getRegionLocation(splitKeys.getFirst()[i]).getServerAddress();
+      InetAddress regionAddress =
+        regionServerAddress.getInetSocketAddress().getAddress();
+      String regionLocation;
+      try {
+        regionLocation = reverseDNS(regionAddress);
+      } catch (NamingException e) {
+        LOG.error("Cannot resolve the host name for " + regionAddress +
+            " because of " + e);
+        regionLocation = regionServerAddress.getHostname();
+      }
+      
       // determine if the given start an stop key fall into the region
       if ((startRow.length == 0 || splitKeys.getSecond()[i].length == 0 ||
           Bytes.compareTo(startRow, splitKeys.getSecond()[i]) < 0) &&
@@ -215,6 +241,16 @@ extends InputFormat<ImmutableBytesWritab
     }
     return splits;
   }
+  
+  private String reverseDNS(InetAddress ipAddress)
+  throws NamingException {
+    String hostName = this.reverseDNSCacheMap.get(ipAddress);
+    if (hostName == null) {
+      hostName = DNS.reverseDns(ipAddress, this.nameServer);
+      this.reverseDNSCacheMap.put(ipAddress, hostName);
+    }
+    return hostName;
+  }
 
   /**
    *


Reply via email to