Repository: tajo Updated Branches: refs/heads/master 6c87e8e9d -> 72ebc43d6
TAJO-1921: Hbase Storage can cause NPE when the hbase cluster is restarted Closes #918 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/72ebc43d Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/72ebc43d Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/72ebc43d Branch: refs/heads/master Commit: 72ebc43d6e685dad58b2bf238fae12e51c8dec96 Parents: 6c87e8e Author: combineads <[email protected]> Authored: Tue Jan 19 13:31:23 2016 -0800 Committer: Hyunsik Choi <[email protected]> Committed: Tue Jan 19 13:33:19 2016 -0800 ---------------------------------------------------------------------- CHANGES | 3 + .../tajo/engine/query/TestHBaseTable.java | 56 ++++++++++ .../testGetSplitsWhenRestartHBase.result | 102 +++++++++++++++++++ .../tajo/storage/hbase/HBaseTablespace.java | 49 ++++----- 4 files changed, 180 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/72ebc43d/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 3d49873..f680eb0 100644 --- a/CHANGES +++ b/CHANGES @@ -90,6 +90,9 @@ Release 0.12.0 - unreleased BUG FIXES + TAJO-1921: Hbase Storage can cause NPE when the hbase cluster is restarted. + (Byunghwa Yun via hyunsik) + TAJO-2038: NPE in FileScanner#getProgress. (jinho) TAJO-2034: Files required for executing python functions are not copied in http://git-wip-us.apache.org/repos/asf/tajo/blob/72ebc43d/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java index 13b7711..97feb65 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java @@ -26,10 +26,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.InclusiveStopFilter; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.tajo.IntegrationTest; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.TajoTestingCluster; @@ -1376,6 +1379,59 @@ public class TestHBaseTable extends QueryTestCaseBase { } } + @Test + public void testGetSplitsWhenRestartHBase() throws Exception { + executeString("CREATE TABLE hbase_mapped_table1 (rk text, col1 text, col2 text, col3 int) " + + "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table1', 'columns'=':key,col1:a,col2:,col3:#b', " + + "'hbase.split.rowkeys'='010,020,030,040,050,060,070,080,090')").close(); + + assertTableExists("hbase_mapped_table1"); + HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf()); + HTable htable = null; + try { + hAdmin.tableExists("hbase_table1"); + htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table1"); + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + assertEquals(10, keys.getFirst().length); + + DecimalFormat df = new DecimalFormat("000"); + for (int i = 0; i < 100; i++) { + Put put = new Put(String.valueOf(df.format(i)).getBytes()); + put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes()); + put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes()); + put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes()); + put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes()); + put.add("col3".getBytes(), "".getBytes(), Bytes.toBytes(i)); + htable.put(put); + } + + ResultSet res = executeString("select * from hbase_mapped_table1"); + assertResultSet(res); + res.close(); + + MiniHBaseCluster cluster = testingCluster.getHBaseUtil().getMiniHBaseCluster(); + HMaster master = cluster.getMaster(); + master.balanceSwitch(true); + assertEquals(1, cluster.getLiveRegionServerThreads().size()); + HRegionServer orgRegionServer = cluster.getLiveRegionServerThreads().get(0).getRegionServer(); + cluster.startRegionServer().waitForServerOnline(); + cluster.startRegionServer().waitForServerOnline(); + cluster.startRegionServer().waitForServerOnline(); + cluster.stopRegionServer(orgRegionServer.getServerName()); + cluster.waitForRegionServerToStop(orgRegionServer.getServerName(), 1000); + + res = executeString("select * from hbase_mapped_table1"); + assertResultSet(res); + res.close(); + } finally { + executeString("DROP TABLE hbase_mapped_table1 PURGE").close(); + hAdmin.close(); + if (htable == null) { + htable.close(); + } + } + } + private String resultSetToString(ResultScanner scanner, byte[][] cfNames, byte[][] qualifiers, boolean [] binaries, Schema schema) throws Exception { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/tajo/blob/72ebc43d/tajo-core-tests/src/test/resources/results/TestHBaseTable/testGetSplitsWhenRestartHBase.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestHBaseTable/testGetSplitsWhenRestartHBase.result b/tajo-core-tests/src/test/resources/results/TestHBaseTable/testGetSplitsWhenRestartHBase.result new file mode 100644 index 0000000..4f5fd8f --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestHBaseTable/testGetSplitsWhenRestartHBase.result @@ -0,0 +1,102 @@ +rk,col1,col2,col3 +------------------------------- +000,a-0,{"k1":"k1-0", "k2":"k2-0"},0 +001,a-1,{"k1":"k1-1", "k2":"k2-1"},1 +002,a-2,{"k1":"k1-2", "k2":"k2-2"},2 +003,a-3,{"k1":"k1-3", "k2":"k2-3"},3 +004,a-4,{"k1":"k1-4", "k2":"k2-4"},4 +005,a-5,{"k1":"k1-5", "k2":"k2-5"},5 +006,a-6,{"k1":"k1-6", "k2":"k2-6"},6 +007,a-7,{"k1":"k1-7", "k2":"k2-7"},7 +008,a-8,{"k1":"k1-8", "k2":"k2-8"},8 +009,a-9,{"k1":"k1-9", "k2":"k2-9"},9 +010,a-10,{"k1":"k1-10", "k2":"k2-10"},10 +011,a-11,{"k1":"k1-11", "k2":"k2-11"},11 +012,a-12,{"k1":"k1-12", "k2":"k2-12"},12 +013,a-13,{"k1":"k1-13", "k2":"k2-13"},13 +014,a-14,{"k1":"k1-14", "k2":"k2-14"},14 +015,a-15,{"k1":"k1-15", "k2":"k2-15"},15 +016,a-16,{"k1":"k1-16", "k2":"k2-16"},16 +017,a-17,{"k1":"k1-17", "k2":"k2-17"},17 +018,a-18,{"k1":"k1-18", "k2":"k2-18"},18 +019,a-19,{"k1":"k1-19", "k2":"k2-19"},19 +020,a-20,{"k1":"k1-20", "k2":"k2-20"},20 +021,a-21,{"k1":"k1-21", "k2":"k2-21"},21 +022,a-22,{"k1":"k1-22", "k2":"k2-22"},22 +023,a-23,{"k1":"k1-23", "k2":"k2-23"},23 +024,a-24,{"k1":"k1-24", "k2":"k2-24"},24 +025,a-25,{"k1":"k1-25", "k2":"k2-25"},25 +026,a-26,{"k1":"k1-26", "k2":"k2-26"},26 +027,a-27,{"k1":"k1-27", "k2":"k2-27"},27 +028,a-28,{"k1":"k1-28", "k2":"k2-28"},28 +029,a-29,{"k1":"k1-29", "k2":"k2-29"},29 +030,a-30,{"k1":"k1-30", "k2":"k2-30"},30 +031,a-31,{"k1":"k1-31", "k2":"k2-31"},31 +032,a-32,{"k1":"k1-32", "k2":"k2-32"},32 +033,a-33,{"k1":"k1-33", "k2":"k2-33"},33 +034,a-34,{"k1":"k1-34", "k2":"k2-34"},34 +035,a-35,{"k1":"k1-35", "k2":"k2-35"},35 +036,a-36,{"k1":"k1-36", "k2":"k2-36"},36 +037,a-37,{"k1":"k1-37", "k2":"k2-37"},37 +038,a-38,{"k1":"k1-38", "k2":"k2-38"},38 +039,a-39,{"k1":"k1-39", "k2":"k2-39"},39 +040,a-40,{"k1":"k1-40", "k2":"k2-40"},40 +041,a-41,{"k1":"k1-41", "k2":"k2-41"},41 +042,a-42,{"k1":"k1-42", "k2":"k2-42"},42 +043,a-43,{"k1":"k1-43", "k2":"k2-43"},43 +044,a-44,{"k1":"k1-44", "k2":"k2-44"},44 +045,a-45,{"k1":"k1-45", "k2":"k2-45"},45 +046,a-46,{"k1":"k1-46", "k2":"k2-46"},46 +047,a-47,{"k1":"k1-47", "k2":"k2-47"},47 +048,a-48,{"k1":"k1-48", "k2":"k2-48"},48 +049,a-49,{"k1":"k1-49", "k2":"k2-49"},49 +050,a-50,{"k1":"k1-50", "k2":"k2-50"},50 +051,a-51,{"k1":"k1-51", "k2":"k2-51"},51 +052,a-52,{"k1":"k1-52", "k2":"k2-52"},52 +053,a-53,{"k1":"k1-53", "k2":"k2-53"},53 +054,a-54,{"k1":"k1-54", "k2":"k2-54"},54 +055,a-55,{"k1":"k1-55", "k2":"k2-55"},55 +056,a-56,{"k1":"k1-56", "k2":"k2-56"},56 +057,a-57,{"k1":"k1-57", "k2":"k2-57"},57 +058,a-58,{"k1":"k1-58", "k2":"k2-58"},58 +059,a-59,{"k1":"k1-59", "k2":"k2-59"},59 +060,a-60,{"k1":"k1-60", "k2":"k2-60"},60 +061,a-61,{"k1":"k1-61", "k2":"k2-61"},61 +062,a-62,{"k1":"k1-62", "k2":"k2-62"},62 +063,a-63,{"k1":"k1-63", "k2":"k2-63"},63 +064,a-64,{"k1":"k1-64", "k2":"k2-64"},64 +065,a-65,{"k1":"k1-65", "k2":"k2-65"},65 +066,a-66,{"k1":"k1-66", "k2":"k2-66"},66 +067,a-67,{"k1":"k1-67", "k2":"k2-67"},67 +068,a-68,{"k1":"k1-68", "k2":"k2-68"},68 +069,a-69,{"k1":"k1-69", "k2":"k2-69"},69 +070,a-70,{"k1":"k1-70", "k2":"k2-70"},70 +071,a-71,{"k1":"k1-71", "k2":"k2-71"},71 +072,a-72,{"k1":"k1-72", "k2":"k2-72"},72 +073,a-73,{"k1":"k1-73", "k2":"k2-73"},73 +074,a-74,{"k1":"k1-74", "k2":"k2-74"},74 +075,a-75,{"k1":"k1-75", "k2":"k2-75"},75 +076,a-76,{"k1":"k1-76", "k2":"k2-76"},76 +077,a-77,{"k1":"k1-77", "k2":"k2-77"},77 +078,a-78,{"k1":"k1-78", "k2":"k2-78"},78 +079,a-79,{"k1":"k1-79", "k2":"k2-79"},79 +080,a-80,{"k1":"k1-80", "k2":"k2-80"},80 +081,a-81,{"k1":"k1-81", "k2":"k2-81"},81 +082,a-82,{"k1":"k1-82", "k2":"k2-82"},82 +083,a-83,{"k1":"k1-83", "k2":"k2-83"},83 +084,a-84,{"k1":"k1-84", "k2":"k2-84"},84 +085,a-85,{"k1":"k1-85", "k2":"k2-85"},85 +086,a-86,{"k1":"k1-86", "k2":"k2-86"},86 +087,a-87,{"k1":"k1-87", "k2":"k2-87"},87 +088,a-88,{"k1":"k1-88", "k2":"k2-88"},88 +089,a-89,{"k1":"k1-89", "k2":"k2-89"},89 +090,a-90,{"k1":"k1-90", "k2":"k2-90"},90 +091,a-91,{"k1":"k1-91", "k2":"k2-91"},91 +092,a-92,{"k1":"k1-92", "k2":"k2-92"},92 +093,a-93,{"k1":"k1-93", "k2":"k2-93"},93 +094,a-94,{"k1":"k1-94", "k2":"k2-94"},94 +095,a-95,{"k1":"k1-95", "k2":"k2-95"},95 +096,a-96,{"k1":"k1-96", "k2":"k2-96"},96 +097,a-97,{"k1":"k1-97", "k2":"k2-97"},97 +098,a-98,{"k1":"k1-98", "k2":"k2-98"},98 +099,a-99,{"k1":"k1-99", "k2":"k2-99"},99 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/72ebc43d/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index a7aa11b..1b81531 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.util.RegionSizeCalculator; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.task.JobContextImpl; @@ -421,10 +422,10 @@ public class HBaseTablespace extends Tablespace { List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, filterCondition); HTable htable = null; - HBaseAdmin hAdmin = null; try { htable = new HTable(hbaseConf, tableDesc.getMeta().getProperty(HBaseStorageConstants.META_TABLE_KEY)); + RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(htable); org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { @@ -433,12 +434,18 @@ public class HBaseTablespace extends Tablespace { throw new IOException("Expecting at least one region."); } List<Fragment> fragments = new ArrayList<>(1); - Fragment fragment = new HBaseFragment( + HBaseFragment fragment = new HBaseFragment( tableDesc.getUri(), inputSourceId, htable.getName().getNameAsString(), HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostname()); + long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); + if (regionSize == 0) { + fragment.setLength(TajoConstants.UNKNOWN_LENGTH); + } else { + fragment.setLength(regionSize); + } fragments.add(fragment); return fragments; } @@ -473,13 +480,14 @@ public class HBaseTablespace extends Tablespace { stopRows = EMPTY_END_ROW_KEY; } - hAdmin = new HBaseAdmin(hbaseConf); - Map<ServerName, ServerLoad> serverLoadMap = new HashMap<>(); - + // reference: org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(JobContext) // region startkey -> HBaseFragment Map<byte[], HBaseFragment> fragmentMap = new HashMap<>(); for (int i = 0; i < keys.getFirst().length; i++) { HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); + if (null == location) { + throw new IOException("Can't find the region of the key: " + Bytes.toStringBinary(keys.getFirst()[i])); + } byte[] regionStartKey = keys.getFirst()[i]; byte[] regionStopKey = keys.getSecond()[i]; @@ -497,14 +505,6 @@ public class HBaseTablespace extends Tablespace { byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) && regionStopKey.length > 0 ? regionStopKey : stopRow; - String regionName = location.getRegionInfo().getRegionNameAsString(); - - ServerLoad serverLoad = serverLoadMap.get(location.getServerName()); - if (serverLoad == null) { - serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName()); - serverLoadMap.put(location.getServerName(), serverLoad); - } - if (fragmentMap.containsKey(regionStartKey)) { HBaseFragment prevFragment = fragmentMap.get(regionStartKey); if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) { @@ -514,27 +514,19 @@ public class HBaseTablespace extends Tablespace { prevFragment.setStopRow(fragmentStop); } } else { + byte[] regionName = location.getRegionInfo().getRegionName(); + long regionSize = sizeCalculator.getRegionSize(regionName); + HBaseFragment fragment = new HBaseFragment(tableDesc.getUri(), inputSourceId, htable.getName().getNameAsString(), fragmentStart, fragmentStop, location.getHostname()); - - // get region size - boolean foundLength = false; - for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) { - if (regionName.equals(Bytes.toString(entry.getKey()))) { - RegionLoad regionLoad = entry.getValue(); - long storeFileSize = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L; - fragment.setLength(storeFileSize); - foundLength = true; - break; - } - } - - if (!foundLength) { + if (regionSize == 0) { fragment.setLength(TajoConstants.UNKNOWN_LENGTH); + } else { + fragment.setLength(regionSize); } fragmentMap.put(regionStartKey, fragment); @@ -557,9 +549,6 @@ public class HBaseTablespace extends Tablespace { if (htable != null) { htable.close(); } - if (hAdmin != null) { - hAdmin.close(); - } } }
