DRILL-1346: Use HBase table size information to improve scan parallelization
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/64e43d2b Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/64e43d2b Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/64e43d2b Branch: refs/heads/master Commit: 64e43d2b770cb420b8f4fa9cf390b07efa5da44e Parents: c241137 Author: Aditya Kishore <[email protected]> Authored: Fri Aug 29 21:31:07 2014 +0530 Committer: Jacques Nadeau <[email protected]> Committed: Fri Aug 29 14:59:04 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/store/hbase/HBaseGroupScan.java | 20 ++- .../drill/exec/store/hbase/HBaseScanSpec.java | 5 +- .../exec/store/hbase/TableStatsCalculator.java | 179 +++++++++++++++++++ .../src/main/resources/drill-module.conf | 6 +- .../apache/drill/hbase/TestTableGenerator.java | 45 +++++ 5 files changed, 246 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index 8301de1..d9a3cf9 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -97,6 +97,10 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst private boolean filterPushedDown = false; + private TableStatsCalculator statsCalculator; + + private long scanSizeInBytes = 0; + @JsonCreator public HBaseGroupScan(@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec, @JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig, @@ -126,6 +130,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst this.storagePluginConfig = that.storagePluginConfig; this.hTableDesc = that.hTableDesc; this.filterPushedDown = that.filterPushedDown; + this.statsCalculator = that.statsCalculator; + this.scanSizeInBytes = that.scanSizeInBytes; } @Override @@ -142,7 +148,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst HTable table = new HTable(storagePluginConfig.getHBaseConf(), hbaseScanSpec.getTableName()); this.hTableDesc = table.getTableDescriptor(); NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations(); - table.close(); + statsCalculator = new TableStatsCalculator(table, hbaseScanSpec, storagePlugin.getContext().getConfig()); boolean foundStartRegion = false; regionsToScan = new TreeMap<HRegionInfo, ServerName>(); @@ -153,10 +159,13 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst } foundStartRegion = true; regionsToScan.put(regionInfo, mapEntry.getValue()); + scanSizeInBytes += statsCalculator.getRegionSizeInBytes(regionInfo.getRegionName()); if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) { break; } } + + table.close(); } catch (IOException e) { throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e); } @@ -342,11 +351,10 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst @Override public ScanStats getScanStats() { - //TODO: look at stats for this. - int rowCount = (hbaseScanSpec.getFilter() != null ? 5 : 10) * regionsToScan.size(); - int avgColumnSize = 10; - int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size(); - return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount); + int rowCount = (int) ((scanSizeInBytes / statsCalculator.getAvgRowSizeInBytes()) * (hbaseScanSpec.getFilter() != null ? 0.5 : 1)); + // the following calculation is not precise since 'columns' could specify CFs while getColsPerRow() returns the number of qualifier. + float diskCost = scanSizeInBytes * ((columns == null || columns.isEmpty()) ? 1 : columns.size()/statsCalculator.getColsPerRow()); + return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java index c2ee723..f9a585e 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.hbase; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.util.Bytes; @@ -68,11 +69,11 @@ public class HBaseScanSpec { } public byte[] getStartRow() { - return startRow; + return startRow == null ? HConstants.EMPTY_START_ROW : startRow; } public byte[] getStopRow() { - return stopRow; + return stopRow == null ? HConstants.EMPTY_START_ROW : stopRow; } @JsonIgnore http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java new file mode 100644 index 0000000..0ce9938 --- /dev/null +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hbase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerLoad; +import org.apache.hadoop.hbase.HServerLoad.RegionLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Computes size of each region for given table. + */ +public class TableStatsCalculator { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TableStatsCalculator.class); + + private static final String DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT = "drill.exec.hbase.scan.samplerows.count"; + + private static final String DRILL_EXEC_HBASE_SCAN_SIZECALCULATOR_ENABLED = "drill.exec.hbase.scan.sizecalculator.enabled"; + + private static final int DEFAULT_SAMPLE_SIZE = 100; + + /** + * Maps each region to its size in bytes. + */ + private Map<byte[], Long> sizeMap = null; + + private int avgRowSizeInBytes; + + private int colsPerRow; + + /** + * Computes size of each region for table. + * + * @param table + * @param hbaseScanSpec + * @param drillConfig + * @throws IOException + */ + public TableStatsCalculator(HTable table, HBaseScanSpec hbaseScanSpec, DrillConfig config) throws IOException { + HBaseAdmin admin = new HBaseAdmin(table.getConfiguration()); + try { + int rowsToSample = rowsToSample(config); + if (rowsToSample > 0) { + Scan scan = new Scan(hbaseScanSpec.getStartRow(), hbaseScanSpec.getStopRow()); + scan.setCaching(rowsToSample < DEFAULT_SAMPLE_SIZE ? rowsToSample : DEFAULT_SAMPLE_SIZE); + scan.setMaxVersions(1); + ResultScanner scanner = table.getScanner(scan); + int rowSizeSum = 0, numColumnsSum = 0, rowCount = 0; + for (; rowCount < rowsToSample; ++rowCount) { + Result row = scanner.next(); + if (row == null) { + break; + } + numColumnsSum += row.size(); + rowSizeSum += row.getBytes().getLength(); + } + avgRowSizeInBytes = rowSizeSum/rowCount; + colsPerRow = numColumnsSum/rowCount; + scanner.close(); + } + + if (!enabled(config)) { + logger.info("Region size calculation disabled."); + return; + } + + logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\"."); + + //get regions for table + Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet(); + Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); + for (HRegionInfo regionInfo : tableRegionInfos) { + tableRegions.add(regionInfo.getRegionName()); + } + + ClusterStatus clusterStatus = null; + try { + clusterStatus = admin.getClusterStatus(); + } catch (Exception e) { + logger.debug(e.getMessage()); + } finally { + if (clusterStatus == null) { + return; + } + } + + sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); + + Collection<ServerName> servers = clusterStatus.getServers(); + //iterate all cluster regions, filter regions from our table and compute their size + for (ServerName serverName : servers) { + HServerLoad serverLoad = clusterStatus.getLoad(serverName); + + for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) { + byte[] regionId = regionLoad.getName(); + + if (tableRegions.contains(regionId)) { + long regionSizeMB = regionLoad.getMemStoreSizeMB() + regionLoad.getStorefileSizeMB(); + sizeMap.put(regionId, (regionSizeMB > 0 ? regionSizeMB : 1) * (1024*1024)); + if (logger.isDebugEnabled()) { + logger.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeMB + "MB"); + } + } + } + } + logger.debug("Region sizes calculated"); + } finally { + admin.close(); + } + + } + + private boolean enabled(DrillConfig config) { + return config.hasPath(DRILL_EXEC_HBASE_SCAN_SIZECALCULATOR_ENABLED) + ? config.getBoolean(DRILL_EXEC_HBASE_SCAN_SIZECALCULATOR_ENABLED) : true; + } + + private int rowsToSample(DrillConfig config) { + return config.hasPath(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) + ? config.getInt(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) : DEFAULT_SAMPLE_SIZE; + } + + /** + * Returns size of given region in bytes. Returns 0 if region was not found. + */ + public long getRegionSizeInBytes(byte[] regionId) { + if (sizeMap == null) { + return avgRowSizeInBytes*1024*1024; // 1 million rows + } else { + Long size = sizeMap.get(regionId); + if (size == null) { + logger.debug("Unknown region:" + Arrays.toString(regionId)); + return 0; + } else { + return size; + } + } + } + + public int getAvgRowSizeInBytes() { + return avgRowSizeInBytes; + } + + public int getColsPerRow() { + return colsPerRow; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/resources/drill-module.conf b/contrib/storage-hbase/src/main/resources/drill-module.conf index 0edceaf..0f0a0c6 100644 --- a/contrib/storage-hbase/src/main/resources/drill-module.conf +++ b/contrib/storage-hbase/src/main/resources/drill-module.conf @@ -29,6 +29,10 @@ drill.exec: { "hbase.zookeeper.property.clientPort" : 2181 } } - } + }, + hbase.scan: { + samplerows.count: 100, + sizecalculator.enabled: true + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/64e43d2b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java index 3678c78..99862e0 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java @@ -18,6 +18,7 @@ package org.apache.drill.hbase; import java.util.Arrays; +import java.util.Random; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; @@ -107,4 +108,48 @@ public class TestTableGenerator { table.close(); } + public static void generateHBaseDataset2(HBaseAdmin admin, String tableName, int numberRegions) throws Exception { + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor("f")); + + if (numberRegions > 1) { + admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1)); + } else { + admin.createTable(desc); + } + + HTable table = new HTable(admin.getConfiguration(), tableName); + + int rowCount = 0; + byte[] bytes = null; + final int numColumns = 5; + Random random = new Random(); + int iteration = 0; + while (rowCount < 1000) { + char rowKeyChar = 'a'; + for (int i = 0; i < numberRegions; i++) { + Put p = new Put((""+rowKeyChar+iteration).getBytes()); + for (int j = 1; j <= numColumns; j++) { + bytes = new byte[5000]; random.nextBytes(bytes); + p.add("f".getBytes(), ("c"+j).getBytes(), bytes); + } + table.put(p); + + ++rowKeyChar; + ++rowCount; + } + ++iteration; + } + + table.flushCommits(); + table.close(); + + admin.flush(tableName); + } + }
