Repository: hbase
Updated Branches:
  refs/heads/branch-1 8da121409 -> ebd338d0d


HBASE-12596 bulkload needs to follow locality (Victor Xu)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ebd338d0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ebd338d0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ebd338d0

Branch: refs/heads/branch-1
Commit: ebd338d0d157029055e64371aeac155e2169ddd2
Parents: 8da1214
Author: tedyu <yuzhih...@gmail.com>
Authored: Fri Jul 10 06:27:19 2015 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Fri Jul 10 06:27:19 2015 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/HFileOutputFormat2.java     |  85 +++++++++++++--
 .../hbase/mapreduce/TestHFileOutputFormat.java  |   8 ++
 .../hbase/mapreduce/TestHFileOutputFormat2.java | 104 +++++++++++++------
 3 files changed, 160 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ebd338d0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 678b7bb..9c87807 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.util.ArrayList;
@@ -40,13 +41,18 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -108,6 +114,15 @@ public class HFileOutputFormat2
   public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
       "hbase.mapreduce.hfileoutputformat.datablock.encoding";
 
+  /**
+   * Keep locality while generating HFiles for bulkload. See HBASE-12596
+   */
+  public static final String LOCALITY_SENSITIVE_CONF_KEY =
+      "hbase.bulkload.locality.sensitive.enabled";
+  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
+  private static final String OUTPUT_TABLE_NAME_CONF_KEY =
+      "hbase.mapreduce.hfileoutputformat.table.name";
+
   @Override
   public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
       final TaskAttemptContext context) throws IOException, 
InterruptedException {
@@ -191,7 +206,48 @@ public class HFileOutputFormat2
 
         // create a new WAL writer, if necessary
         if (wl == null || wl.writer == null) {
-          wl = getNewWriter(family, conf);
+          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, 
DEFAULT_LOCALITY_SENSITIVE)) {
+            HRegionLocation loc = null;
+            String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+
+            try (Connection connection = 
ConnectionFactory.createConnection(conf);
+                RegionLocator locator =
+                    connection.getRegionLocator(TableName.valueOf(tableName))) 
{
+              loc = locator.getRegionLocation(rowKey);
+            } catch (Throwable e) {
+              LOG.warn("there's something wrong when locating rowkey: " +
+                  Bytes.toString(rowKey), e);
+              loc = null;
+            }
+
+            if (null == loc) {
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("failed to get region location, so use default 
writer: "
+                    + Bytes.toString(rowKey));
+              }
+              wl = getNewWriter(family, conf, null);
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
+              }
+              InetSocketAddress initialIsa =
+                  new InetSocketAddress(loc.getHostname(), loc.getPort());
+              if (initialIsa.isUnresolved()) {
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace("failed to resolve bind address: " + 
loc.getHostname() + ":"
+                      + loc.getPort() + ", so use default writer");
+                }
+                wl = getNewWriter(family, conf, null);
+              } else {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("use favored nodes writer: " + 
initialIsa.getHostString());
+                }
+                wl = getNewWriter(family, conf, new InetSocketAddress[] { 
initialIsa });
+              }
+            }
+          } else {
+            wl = getNewWriter(family, conf, null);
+          }
         }
 
         // we now have the proper WAL writer. full steam ahead
@@ -223,7 +279,8 @@ public class HFileOutputFormat2
        */
       
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
           justification="Not important")
-      private WriterLength getNewWriter(byte[] family, Configuration conf)
+      private WriterLength getNewWriter(byte[] family, Configuration conf,
+          InetSocketAddress[] favoredNodes)
           throws IOException {
         WriterLength wl = new WriterLength();
         Path familydir = new Path(outputdir, Bytes.toString(family));
@@ -245,11 +302,19 @@ public class HFileOutputFormat2
                                     .withBlockSize(blockSize);
         contextBuilder.withDataBlockEncoding(encoding);
         HFileContext hFileContext = contextBuilder.build();
-                                    
-        wl.writer = new StoreFile.WriterBuilder(conf, new 
CacheConfig(tempConf), fs)
-            .withOutputDir(familydir).withBloomType(bloomType)
-            .withComparator(KeyValue.COMPARATOR)
-            .withFileContext(hFileContext).build();
+
+        if (null == favoredNodes) {
+          wl.writer =
+              new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
+                  .withOutputDir(familydir).withBloomType(bloomType)
+                  
.withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build();
+        } else {
+          wl.writer =
+              new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new 
HFileSystem(fs))
+                  .withOutputDir(familydir).withBloomType(bloomType)
+                  
.withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext)
+                  .withFavoredNodes(favoredNodes).build();
+        }
 
         this.writers.put(family, wl);
         return wl;
@@ -430,6 +495,12 @@ public class HFileOutputFormat2
         MutationSerialization.class.getName(), 
ResultSerialization.class.getName(),
         KeyValueSerialization.class.getName());
 
+    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, 
DEFAULT_LOCALITY_SENSITIVE)) {
+      // record this table name for creating writer by favored nodes
+      LOG.info("bulkload locality sensitive enabled");
+      conf.set(OUTPUT_TABLE_NAME_CONF_KEY, 
regionLocator.getName().getNameAsString());
+    }
+
     // Use table's region boundaries for TOP split points.
     LOG.info("Looking up current regions for table " + 
tableDescriptor.getTableName());
     List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ebd338d0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
index f3a5c4c..a4a0dad 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
@@ -340,6 +340,7 @@ public class TestHFileOutputFormat  {
     HTableDescriptor tableDescriptor = Mockito.mock(HTableDescriptor.class);
     RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
     setupMockStartKeys(regionLocator);
+    setupMockTableName(regionLocator);
     HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, 
regionLocator);
     assertEquals(job.getNumReduceTasks(), 4);
   }
@@ -794,6 +795,11 @@ public class TestHFileOutputFormat  {
     Mockito.doReturn(mockKeys).when(regionLocator).getStartKeys();
   }
 
+  private void setupMockTableName(RegionLocator table) throws IOException {
+    TableName mockTableName = TableName.valueOf("mock_table");
+    Mockito.doReturn(mockTableName).when(table).getName();
+  }
+
   /**
    * Test that {@link HFileOutputFormat} RecordWriter uses compression and
    * bloom filter settings from the column family descriptor
@@ -823,6 +829,8 @@ public class TestHFileOutputFormat  {
       // pollutes the GZip codec pool with an incompatible compressor.
       conf.set("io.seqfile.compression.type", "NONE");
       conf.set("hbase.fs.tmp.dir", dir.toString());
+      // turn locality off to eliminate getRegionLocation fail-and-retry time 
when writing kvs
+      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
       Job job = new Job(conf, "testLocalMRIncrementalLoad");
       
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
       setupRandomGeneratorMapper(job);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ebd338d0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index d34cfc1..94797eb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -48,16 +48,15 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HadoopShims;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.PerformanceEvaluation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
@@ -73,8 +72,10 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
@@ -343,6 +344,7 @@ public class TestHFileOutputFormat2  {
     job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
     RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
     setupMockStartKeys(regionLocator);
+    setupMockTableName(regionLocator);
     HFileOutputFormat2.configureIncrementalLoad(job, new HTableDescriptor(), 
regionLocator);
     assertEquals(job.getNumReduceTasks(), 4);
   }
@@ -372,40 +374,62 @@ public class TestHFileOutputFormat2  {
   @Test
   public void testMRIncrementalLoad() throws Exception {
     LOG.info("\nStarting test testMRIncrementalLoad\n");
-    doIncrementalLoadTest(false);
+    doIncrementalLoadTest(false, false);
   }
 
   @Test
   public void testMRIncrementalLoadWithSplit() throws Exception {
     LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
-    doIncrementalLoadTest(true);
+    doIncrementalLoadTest(true, false);
   }
 
-  private void doIncrementalLoadTest(
-      boolean shouldChangeRegions) throws Exception {
+  /**
+   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test 
could only check the
+   * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to 
true. Because
+   * MiniHBaseCluster always run with single hostname (and different ports), 
it's not possible to
+   * check the region locality by comparing region locations and DN hostnames. 
When MiniHBaseCluster
+   * supports explicit hostnames parameter (just like MiniDFSCluster does), we 
could test region
+   * locality features more easily.
+   */
+  @Test
+  public void testMRIncrementalLoadWithLocality() throws Exception {
+    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
+    doIncrementalLoadTest(false, true);
+    doIncrementalLoadTest(true, true);
+  }
+
+  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean 
shouldKeepLocality)
+      throws Exception {
     util = new HBaseTestingUtility();
     Configuration conf = util.getConfiguration();
-    byte[][] splitKeys = generateRandomSplitKeys(4);
-    util.startMiniCluster();
-    try {
-      HTable table = util.createTable(TABLE_NAME, FAMILIES, splitKeys);
-      Admin admin = table.getConnection().getAdmin();
-      Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
-      assertEquals("Should start with empty table",
-          0, util.countRows(table));
-      int numRegions = -1;
-      try (RegionLocator r = table.getRegionLocator()) {
-        numRegions = r.getStartKeys().length;
-      }
-      assertEquals("Should make 5 regions", numRegions, 5);
+    conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, 
shouldKeepLocality);
+    int hostCount = 1;
+    int regionNum = 5;
+    if (shouldKeepLocality) {
+      // We should change host count higher than hdfs replica count when 
MiniHBaseCluster supports
+      // explicit hostnames parameter just like MiniDFSCluster does.
+      hostCount = 3;
+      regionNum = 20;
+    }
+    byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
+    String[] hostnames = new String[hostCount];
+    for (int i = 0; i < hostCount; ++i) {
+      hostnames[i] = "datanode_" + i;
+    }
+    util.startMiniCluster(1, hostCount, hostnames);
+
+    HTable table = util.createTable(TABLE_NAME, FAMILIES, splitKeys);
+    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
+    try (RegionLocator r = table.getRegionLocator(); Admin admin = 
table.getConnection().getAdmin()) {
+      assertEquals("Should start with empty table", 0, util.countRows(table));
+      int numRegions = r.getStartKeys().length;
+      assertEquals("Should make " + regionNum + " regions", numRegions, 
regionNum);
 
       // Generate the bulk load files
       util.startMiniMapReduceCluster();
       runIncrementalPELoad(conf, table.getTableDescriptor(), 
table.getRegionLocator(), testDir);
       // This doesn't write into the table, just makes files
-      assertEquals("HFOF should not touch actual table",
-          0, util.countRows(table));
-
+      assertEquals("HFOF should not touch actual table", 0, 
util.countRows(table));
 
       // Make sure that a directory was created for every CF
       int dir = 0;
@@ -422,8 +446,8 @@ public class TestHFileOutputFormat2  {
       if (shouldChangeRegions) {
         LOG.info("Changing regions in table");
         admin.disableTable(table.getName());
-        while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
-            getRegionStates().isRegionsInTransition()) {
+        while 
(util.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+            .isRegionsInTransition()) {
           Threads.sleep(200);
           LOG.info("Waiting on table to finish disabling");
         }
@@ -431,8 +455,8 @@ public class TestHFileOutputFormat2  {
         byte[][] newSplitKeys = generateRandomSplitKeys(14);
         table = util.createTable(TABLE_NAME, FAMILIES, newSplitKeys);
 
-        while (table.getRegionLocator().getAllRegionLocations().size() != 15 ||
-            !admin.isTableAvailable(table.getName())) {
+        while (table.getRegionLocator().getAllRegionLocations().size() != 15
+            || !admin.isTableAvailable(table.getName())) {
           Thread.sleep(200);
           LOG.info("Waiting for new region assignment to happen");
         }
@@ -443,8 +467,8 @@ public class TestHFileOutputFormat2  {
 
       // Ensure data shows up
       int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
-      assertEquals("LoadIncrementalHFiles should put expected data in table",
-          expectedRows, util.countRows(table));
+      assertEquals("LoadIncrementalHFiles should put expected data in table", 
expectedRows,
+        util.countRows(table));
       Scan scan = new Scan();
       ResultScanner results = table.getScanner(scan);
       for (Result res : results) {
@@ -458,6 +482,17 @@ public class TestHFileOutputFormat2  {
       results.close();
       String tableDigestBefore = util.checksumRows(table);
 
+      // Check region locality
+      HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
+      for (HRegion region : util.getHBaseCluster().getRegions(TABLE_NAME)) {
+        hbd.add(region.getHDFSBlocksDistribution());
+      }
+      for (String hostname : hostnames) {
+        float locality = hbd.getBlockLocalityIndex(hostname);
+        LOG.info("locality of [" + hostname + "]: " + locality);
+        assertEquals(100, (int) (locality * 100));
+      }
+
       // Cause regions to reopen
       admin.disableTable(TABLE_NAME);
       while (!admin.isTableDisabled(TABLE_NAME)) {
@@ -466,9 +501,11 @@ public class TestHFileOutputFormat2  {
       }
       admin.enableTable(TABLE_NAME);
       util.waitTableAvailable(TABLE_NAME);
-      assertEquals("Data should remain after reopening of regions",
-          tableDigestBefore, util.checksumRows(table));
+      assertEquals("Data should remain after reopening of regions", 
tableDigestBefore,
+        util.checksumRows(table));
     } finally {
+      testDir.getFileSystem(conf).delete(testDir, true);
+      util.deleteTable(TABLE_NAME);
       util.shutdownMiniMapReduceCluster();
       util.shutdownMiniCluster();
     }
@@ -796,6 +833,11 @@ public class TestHFileOutputFormat2  {
     Mockito.doReturn(mockKeys).when(table).getStartKeys();
   }
 
+  private void setupMockTableName(RegionLocator table) throws IOException {
+    TableName mockTableName = TableName.valueOf("mock_table");
+    Mockito.doReturn(mockTableName).when(table).getName();
+  }
+
   /**
    * Test that {@link HFileOutputFormat2} RecordWriter uses compression and
    * bloom filter settings from the column family descriptor
@@ -825,6 +867,8 @@ public class TestHFileOutputFormat2  {
       // pollutes the GZip codec pool with an incompatible compressor.
       conf.set("io.seqfile.compression.type", "NONE");
       conf.set("hbase.fs.tmp.dir", dir.toString());
+      // turn locality off to eliminate getRegionLocation fail-and-retry time 
when writing kvs
+      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
       Job job = new Job(conf, "testLocalMRIncrementalLoad");
       
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
       setupRandomGeneratorMapper(job);

Reply via email to