This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bbaa03  Fix the capacity of the DistinctTable (#5204)
3bbaa03 is described below

commit 3bbaa03976b055eb81e93789d89e5bd11e99a3f3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Apr 3 12:04:15 2020 -0700

    Fix the capacity of the DistinctTable (#5204)
    
    In order to get a good enough approximation, we keep at least 5000 unique 
records if there are more of them (same as group-by behavior)
---
 .../core/query/aggregation/DistinctTable.java      | 35 ++++++++--------------
 .../function/DistinctAggregationFunction.java      | 15 +++++-----
 2 files changed, 20 insertions(+), 30 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
index 6accdde..8dadf14 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.data.table.BaseTable;
@@ -37,6 +38,7 @@ import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.spi.utils.BytesUtils;
 
+
 /**
  * This serves the following purposes:
  *
@@ -47,21 +49,18 @@ import org.apache.pinot.spi.utils.BytesUtils;
  * uses {@link Set} to store unique records.
  */
 public class DistinctTable extends BaseTable {
-  private static final double LOAD_FACTOR = 0.75;
   private static final int MAX_INITIAL_CAPACITY = 64 * 1024;
   private Set<Record> _uniqueRecordsSet;
   private boolean _noMoreNewRecords;
   private Iterator<Record> _sortedIterator;
 
-  public DistinctTable(DataSchema dataSchema, List<SelectionSort> orderBy, int 
limit) {
+  public DistinctTable(DataSchema dataSchema, List<SelectionSort> orderBy, int 
capacity) {
     // TODO: see if 64k is the right max initial capacity to use
-    // if it turns out that users always use LIMIT N > 0.75 * 64k and
-    // there are indeed that many records, then there will be resizes.
-    // The current method of setting the initial capacity as
-    // min(64k, limit/loadFactor) will not require resizes for LIMIT N
-    // where N <= 48000
-    super(dataSchema, Collections.emptyList(), orderBy, limit);
-    int initialCapacity = Math.min(MAX_INITIAL_CAPACITY, 
Math.abs(nextPowerOfTwo((int) (limit / LOAD_FACTOR))));
+    // NOTE: The passed in capacity is calculated based on the LIMIT in the 
query as Math.max(limit * 5, 5000). When
+    //       LIMIT is smaller than (64 * 1024 * 0.75 (load factor) / 5 = 
9830), then it is guaranteed that no resize is
+    //       required.
+    super(dataSchema, Collections.emptyList(), orderBy, capacity);
+    int initialCapacity = Math.min(MAX_INITIAL_CAPACITY, 
HashUtil.getHashMapCapacity(capacity));
     _uniqueRecordsSet = new HashSet<>(initialCapacity);
     _noMoreNewRecords = false;
   }
@@ -151,13 +150,15 @@ public class DistinctTable extends BaseTable {
    * @param byteBuffer data to deserialize
    * @throws IOException
    */
-  public DistinctTable(ByteBuffer byteBuffer) throws IOException {
+  public DistinctTable(ByteBuffer byteBuffer)
+      throws IOException {
     // This is called by the BrokerReduceService when it de-serializes the
     // DISTINCT result from the DataTable. As of now we don't have all the
     // information to pass to super class so just pass null, empty lists
     // and the broker will set the correct information before merging the
     // data tables.
-    super(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]), 
Collections.emptyList(), new ArrayList<>(), 0);
+    super(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]), 
Collections.emptyList(), new ArrayList<>(),
+        0);
     DataTable dataTable = DataTableFactory.getDataTable(byteBuffer);
     _dataSchema = dataTable.getDataSchema();
     _uniqueRecordsSet = new HashSet<>();
@@ -214,18 +215,6 @@ public class DistinctTable extends BaseTable {
     addCapacityAndOrderByInfo(brokerRequest.getOrderBy(), 
brokerRequest.getLimit());
   }
 
-  private static int nextPowerOfTwo(int val) {
-    if (val == 0 || val == 1) {
-      return val + 1;
-    }
-    int highestBit = Integer.highestOneBit(val);
-    if (highestBit == val) {
-      return val;
-    } else {
-      return highestBit << 1;
-    }
-  }
-
   private void resize(int trimToSize) {
     _tableResizer.resizeRecordsSet(_uniqueRecordsSet, trimToSize);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
index d7a5a98..457e431 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
@@ -33,6 +33,7 @@ import 
org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.DistinctTable;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.util.GroupByUtils;
 import org.apache.pinot.pql.parsers.pql2.ast.FunctionCallAstNode;
 
 
@@ -43,15 +44,15 @@ import 
org.apache.pinot.pql.parsers.pql2.ast.FunctionCallAstNode;
 public class DistinctAggregationFunction implements 
AggregationFunction<DistinctTable, Comparable> {
   private final String[] _columns;
   private final List<SelectionSort> _orderBy;
-  private final int _limit;
+  private final int _capacity;
 
   public DistinctAggregationFunction(String multiColumnExpression, 
List<SelectionSort> orderBy, int limit) {
     _columns = 
multiColumnExpression.split(FunctionCallAstNode.DISTINCT_MULTI_COLUMN_SEPARATOR);
     _orderBy = orderBy;
-    // use a multiplier for trim size when DISTINCT queries have ORDER BY. 
This logic
-    // is similar to what we have in GROUP BY with ORDER BY
-    // this does not guarantee 100% accuracy but still takes closer to it
-    _limit = CollectionUtils.isNotEmpty(_orderBy) ? limit * 5 : limit;
+    // NOTE: DISTINCT with order-by is similar to group-by with order-by, 
where we limit the maximum number of unique
+    //       records (groups) for each query to reduce the memory footprint. 
The result might not be 100% accurate in
+    //       certain scenarios, but should give a good enough approximation.
+    _capacity = CollectionUtils.isNotEmpty(_orderBy) ? 
GroupByUtils.getTableCapacity(limit) : limit;
   }
 
   @Override
@@ -82,7 +83,7 @@ public class DistinctAggregationFunction implements 
AggregationFunction<Distinct
         columnDataTypes[i] = 
ColumnDataType.fromDataTypeSV(blockValSets[i].getValueType());
       }
       DataSchema dataSchema = new DataSchema(_columns, columnDataTypes);
-      distinctTable = new DistinctTable(dataSchema, _orderBy, _limit);
+      distinctTable = new DistinctTable(dataSchema, _orderBy, _capacity);
       aggregationResultHolder.setValue(distinctTable);
     }
 
@@ -114,7 +115,7 @@ public class DistinctAggregationFunction implements 
AggregationFunction<Distinct
       ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
       // NOTE: Use STRING for unknown type
       Arrays.fill(columnDataTypes, ColumnDataType.STRING);
-      return new DistinctTable(new DataSchema(_columns, columnDataTypes), 
_orderBy, _limit);
+      return new DistinctTable(new DataSchema(_columns, columnDataTypes), 
_orderBy, _capacity);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to