[ 
https://issues.apache.org/jira/browse/DRILL-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492148#comment-16492148
 ] 

ASF GitHub Bot commented on DRILL-6442:
---------------------------------------

Ben-Zvi closed pull request #1288: DRILL-6442: Adjust Hbase disk cost & row 
count estimation when filter push down is applied
URL: https://github.com/apache/drill/pull/1288
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 9eeba245fc..97c9a95ac0 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -22,7 +22,9 @@
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -63,20 +65,13 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 @JsonTypeName("hbase-scan")
 public class HBaseGroupScan extends AbstractGroupScan implements 
DrillHBaseConstants {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
 
-  private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR 
= new Comparator<List<HBaseSubScanSpec>>() {
-    @Override
-    public int compare(List<HBaseSubScanSpec> list1, List<HBaseSubScanSpec> 
list2) {
-      return list1.size() - list2.size();
-    }
-  };
+  private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR 
= (list1, list2) -> list1.size() - list2.size();
 
   private static final Comparator<List<HBaseSubScanSpec>> 
LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
 
@@ -182,12 +177,12 @@ private void init() {
   public List<EndpointAffinity> getOperatorAffinity() {
     watch.reset();
     watch.start();
-    Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, 
DrillbitEndpoint>();
+    Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
     for (DrillbitEndpoint ep : storagePlugin.getContext().getBits()) {
       endpointMap.put(ep.getAddress(), ep);
     }
 
-    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new 
HashMap<DrillbitEndpoint, EndpointAffinity>();
+    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
     for (ServerName sn : regionsToScan.values()) {
       DrillbitEndpoint ep = endpointMap.get(sn.getHostname());
       if (ep != null) {
@@ -199,14 +194,10 @@ private void init() {
         }
       }
     }
-    logger.debug("Took {} µs to get operator affinity", 
watch.elapsed(TimeUnit.NANOSECONDS)/1000);
-    return Lists.newArrayList(affinityMap.values());
+    logger.debug("Took {} µs to get operator affinity", 
watch.elapsed(TimeUnit.NANOSECONDS) / 1000);
+    return new ArrayList<>(affinityMap.values());
   }
 
-  /**
-   *
-   * @param incomingEndpoints
-   */
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
     watch.reset();
@@ -230,23 +221,23 @@ public void applyAssignments(List<DrillbitEndpoint> 
incomingEndpoints) {
     /*
      * another map with endpoint (hostname => corresponding index list) in 
'incomingEndpoints' list
      */
-    Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
+    Map<String, Queue<Integer>> endpointHostIndexListMap = new HashMap<>();
 
     /*
      * Initialize these two maps
      */
     for (int i = 0; i < numSlots; ++i) {
-      endpointFragmentMapping.put(i, new 
ArrayList<HBaseSubScanSpec>(maxPerEndpointSlot));
+      endpointFragmentMapping.put(i, new ArrayList<>(maxPerEndpointSlot));
       String hostname = incomingEndpoints.get(i).getAddress();
       Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
       if (hostIndexQueue == null) {
-        hostIndexQueue = Lists.newLinkedList();
+        hostIndexQueue = new LinkedList<>();
         endpointHostIndexListMap.put(hostname, hostIndexQueue);
       }
       hostIndexQueue.add(i);
     }
 
-    Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = 
Sets.newHashSet(regionsToScan.entrySet());
+    Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = new 
HashSet<>(regionsToScan.entrySet());
 
     /*
      * First, we assign regions which are hosted on region servers running on 
drillbit endpoints
@@ -256,13 +247,13 @@ public void applyAssignments(List<DrillbitEndpoint> 
incomingEndpoints) {
       /*
        * Test if there is a drillbit endpoint which is also an HBase 
RegionServer that hosts the current HBase region
        */
-      Queue<Integer> endpointIndexlist = 
endpointHostIndexListMap.get(regionEntry.getValue().getHostname());
-      if (endpointIndexlist != null) {
-        Integer slotIndex = endpointIndexlist.poll();
+      Queue<Integer> endpointIndexList = 
endpointHostIndexListMap.get(regionEntry.getValue().getHostname());
+      if (endpointIndexList != null) {
+        Integer slotIndex = endpointIndexList.poll();
         List<HBaseSubScanSpec> endpointSlotScanList = 
endpointFragmentMapping.get(slotIndex);
         
endpointSlotScanList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
         // add to the tail of the slot list, to add more later in round robin 
fashion
-        endpointIndexlist.offer(slotIndex);
+        endpointIndexList.offer(slotIndex);
         // this region has been assigned
         regionsIterator.remove();
       }
@@ -271,8 +262,8 @@ public void applyAssignments(List<DrillbitEndpoint> 
incomingEndpoints) {
     /*
      * Build priority queues of slots, with ones which has tasks lesser than 
'minPerEndpointSlot' and another which have more.
      */
-    PriorityQueue<List<HBaseSubScanSpec>> minHeap = new 
PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
-    PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new 
PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
+    PriorityQueue<List<HBaseSubScanSpec>> minHeap = new 
PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR);
+    PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new 
PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR_REV);
     for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
       if (listOfScan.size() < minPerEndpointSlot) {
         minHeap.offer(listOfScan);
@@ -310,12 +301,11 @@ public void applyAssignments(List<DrillbitEndpoint> 
incomingEndpoints) {
     }
 
     /* no slot should be empty at this point */
-    assert (minHeap.peek() == null || minHeap.peek().size() > 0) : 
String.format(
-        "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment 
Map: {}.",
-        incomingEndpoints, endpointFragmentMapping.toString());
+    assert (minHeap.peek() == null || minHeap.peek().size() > 0) :
+      String.format("Unable to assign tasks to some endpoints.\nEndpoints: 
%s.\nAssignment Map: %s.", incomingEndpoints, 
endpointFragmentMapping.toString());
 
     logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment 
Map: {}",
-        watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, 
endpointFragmentMapping.toString());
+        watch.elapsed(TimeUnit.NANOSECONDS) / 1000, incomingEndpoints, 
endpointFragmentMapping.toString());
   }
 
   private HBaseSubScanSpec regionInfoToSubScanSpec(HRegionInfo ri) {
@@ -347,9 +337,15 @@ public int getMaxParallelizationWidth() {
 
   @Override
   public ScanStats getScanStats() {
-    long rowCount = (long) ((scanSizeInBytes / 
statsCalculator.getAvgRowSizeInBytes()) * (hbaseScanSpec.getFilter() != null ? 
0.5 : 1));
-    // the following calculation is not precise since 'columns' could specify 
CFs while getColsPerRow() returns the number of qualifier.
-    float diskCost = scanSizeInBytes * ((columns == null || columns.isEmpty()) 
? 1 : columns.size()/statsCalculator.getColsPerRow());
+    long rowCount = scanSizeInBytes / statsCalculator.getAvgRowSizeInBytes();
+    // the following calculation is not precise since 'columns' could specify 
CFs while getColsPerRow() returns the number of qualifier
+    float diskCost = scanSizeInBytes * ((columns == null || columns.isEmpty()) 
? 1 : columns.size() / statsCalculator.getColsPerRow());
+    // if filter push down is used, reduce estimated row count and disk cost 
by half to ensure plan cost will be less then without it
+    if (hbaseScanSpec.getFilter() != null) {
+      rowCount = (long) (rowCount * 0.5);
+      // if during sampling we found out exact row count, no need to reduce 
number of rows
+      diskCost = statsCalculator.usedDefaultRowCount() ? diskCost * 0.5F : 
diskCost;
+    }
     return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, 
diskCost);
   }
 
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
index 379fb7cd47..b435fbdb0a 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java
@@ -48,36 +48,36 @@
  * Computes size of each region for given table.
  */
 public class TableStatsCalculator {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TableStatsCalculator.class);
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TableStatsCalculator.class);
 
-  public static final long DEFAULT_ROW_COUNT = 1024L * 1024L;
+  public static final long DEFAULT_ROW_COUNT = 1024L * 1024L; // 1 million rows
 
   private static final String DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT = 
"drill.exec.hbase.scan.samplerows.count";
 
   private static final int DEFAULT_SAMPLE_SIZE = 100;
 
-  /**
-   * Maps each region to its size in bytes.
-   */
+  // Maps each region to its size in bytes.
   private Map<byte[], Long> sizeMap = null;
 
   private int avgRowSizeInBytes = 1;
 
   private int colsPerRow = 1;
 
+  private long estimatedRowCount = DEFAULT_ROW_COUNT;
+
   /**
    * Computes size of each region for table.
    *
-   * @param conn
-   * @param hbaseScanSpec
-   * @param config
-   * @throws IOException
+   * @param connection connection to Hbase client
+   * @param hbaseScanSpec scan specification
+   * @param config drill configuration
+   * @param storageConfig Hbase storage configuration
    */
-  public TableStatsCalculator(Connection conn, HBaseScanSpec hbaseScanSpec, 
DrillConfig config, HBaseStoragePluginConfig storageConfig) throws IOException {
+  public TableStatsCalculator(Connection connection, HBaseScanSpec 
hbaseScanSpec, DrillConfig config, HBaseStoragePluginConfig storageConfig) 
throws IOException {
     TableName tableName = TableName.valueOf(hbaseScanSpec.getTableName());
-    try (Admin admin = conn.getAdmin();
-         Table table = conn.getTable(tableName);
-         RegionLocator locator = conn.getRegionLocator(tableName)) {
+    try (Admin admin = connection.getAdmin();
+         Table table = connection.getTable(tableName);
+         RegionLocator locator = connection.getRegionLocator(tableName)) {
       int rowsToSample = rowsToSample(config);
       if (rowsToSample > 0) {
         Scan scan = new Scan(hbaseScanSpec.getStartRow(), 
hbaseScanSpec.getStopRow());
@@ -100,22 +100,25 @@ public TableStatsCalculator(Connection conn, 
HBaseScanSpec hbaseScanSpec, DrillC
           }
         }
         if (rowCount > 0) {
-          avgRowSizeInBytes = (int) (rowSizeSum/rowCount);
-          colsPerRow = numColumnsSum/rowCount;
+          avgRowSizeInBytes = (int) (rowSizeSum / rowCount);
+          colsPerRow = numColumnsSum / rowCount;
+          // if during sampling we receive less rows than expected, then we 
can use this number instead of default
+          estimatedRowCount = rowCount == rowsToSample ? estimatedRowCount : 
rowCount;
         }
+
         scanner.close();
       }
 
       if (!enabled(storageConfig)) {
-        logger.info("Region size calculation disabled.");
+        logger.debug("Region size calculation is disabled.");
         return;
       }
 
-      logger.info("Calculating region sizes for table '{}'.", 
tableName.getNameAsString());
+      logger.debug("Calculating region sizes for table '{}'.", 
tableName.getNameAsString());
 
-      //get regions for table
+      // get regions for table
       List<HRegionLocation> tableRegionInfos = locator.getAllRegionLocations();
-      Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+      Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
       for (HRegionLocation regionInfo : tableRegionInfos) {
         tableRegions.add(regionInfo.getRegionInfo().getRegionName());
       }
@@ -124,17 +127,17 @@ public TableStatsCalculator(Connection conn, 
HBaseScanSpec hbaseScanSpec, DrillC
       try {
         clusterStatus = admin.getClusterStatus();
       } catch (Exception e) {
-        logger.debug(e.getMessage());
+        logger.debug(e.getMessage(), e);
       } finally {
         if (clusterStatus == null) {
           return;
         }
       }
 
-      sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+      sizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
 
       Collection<ServerName> servers = clusterStatus.getServers();
-      //iterate all cluster regions, filter regions from our table and compute 
their size
+      // iterate all cluster regions, filter regions from our table and 
compute their size
       for (ServerName serverName : servers) {
         ServerLoad serverLoad = clusterStatus.getLoad(serverName);
 
@@ -143,14 +146,12 @@ public TableStatsCalculator(Connection conn, 
HBaseScanSpec hbaseScanSpec, DrillC
 
           if (tableRegions.contains(regionId)) {
             long regionSizeMB = regionLoad.getMemStoreSizeMB() + 
regionLoad.getStorefileSizeMB();
-            sizeMap.put(regionId, (regionSizeMB > 0 ? regionSizeMB : 1) * 
(1024*1024));
-            if (logger.isDebugEnabled()) {
-              logger.debug("Region " + regionLoad.getNameAsString() + " has 
size " + regionSizeMB + "MB");
-            }
+            sizeMap.put(regionId, (regionSizeMB > 0 ? regionSizeMB : 1) * 
estimatedRowCount);
+            logger.debug("Region {} has size {} MB.", 
regionLoad.getNameAsString(), regionSizeMB);
           }
         }
       }
-      logger.debug("Region sizes calculated");
+      logger.debug("Region sizes calculated.");
     }
 
   }
@@ -160,8 +161,8 @@ private boolean enabled(HBaseStoragePluginConfig config) {
   }
 
   private int rowsToSample(DrillConfig config) {
-    return config.hasPath(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT)
-        ? config.getInt(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) : 
DEFAULT_SAMPLE_SIZE;
+    return config.hasPath(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) ?
+      config.getInt(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) : 
DEFAULT_SAMPLE_SIZE;
   }
 
   /**
@@ -169,11 +170,11 @@ private int rowsToSample(DrillConfig config) {
    */
   public long getRegionSizeInBytes(byte[] regionId) {
     if (sizeMap == null) {
-      return (long) avgRowSizeInBytes * DEFAULT_ROW_COUNT; // 1 million rows
+      return (long) avgRowSizeInBytes * estimatedRowCount;
     } else {
       Long size = sizeMap.get(regionId);
       if (size == null) {
-        logger.debug("Unknown region:" + Arrays.toString(regionId));
+        logger.debug("Unknown region: {}.", Arrays.toString(regionId));
         return 0;
       } else {
         return size;
@@ -189,4 +190,8 @@ public int getColsPerRow() {
     return colsPerRow;
   }
 
+  public boolean usedDefaultRowCount() {
+    return estimatedRowCount == DEFAULT_ROW_COUNT;
+  }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Adjust Hbase disk cost & row count estimation when filter push down is applied
> ------------------------------------------------------------------------------
>
>                 Key: DRILL-6442
>                 URL: https://issues.apache.org/jira/browse/DRILL-6442
>             Project: Apache Drill
>          Issue Type: Bug
>    Affects Versions: 1.13.0
>            Reporter: Arina Ielchiieva
>            Assignee: Arina Ielchiieva
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.14.0
>
>
> Disk cost for Hbase scan is calculated based on scan size in bytes.
> {noformat}
> float diskCost = scanSizeInBytes * ((columns == null || columns.isEmpty()) ? 
> 1 : columns.size() / statsCalculator.getColsPerRow());
> {noformat}
> Scan size is bytes is estimated using {{TableStatsCalculator}} with the help 
> of sampling.
> When we estimate size for the first time (before applying filter push down), 
> for sampling we use random rows. When estimating rows after filter push down, 
> for sampling we use rows that qualify filter condition. It can happen that 
> average row size can be higher after filter push down 
> than before. Unfortunately since disk cost depends on these calculations, 
> plan with filter push down can give higher cost then without it. 
> Possible enhancements:
> 1. Currently default row count is 1 million but if during sampling we return 
> less rows then expected, it means that our query will return not more rows 
> then this number. We can use this number instead of default row count to 
> achieve better cost estimations.
> 2. When filter push down was applied, row number was reduced by half in order 
> to ensure plan with filter push down will have less cost. Then same should be 
> done for disk cost as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to