This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3bbaa03 Fix the capacity of the DistinctTable (#5204)
3bbaa03 is described below
commit 3bbaa03976b055eb81e93789d89e5bd11e99a3f3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Apr 3 12:04:15 2020 -0700
Fix the capacity of the DistinctTable (#5204)
In order to get a good enough approximation, we keep at least 5000 unique
records if there are more of them (same as group-by behavior)
---
.../core/query/aggregation/DistinctTable.java | 35 ++++++++--------------
.../function/DistinctAggregationFunction.java | 15 +++++-----
2 files changed, 20 insertions(+), 30 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
index 6accdde..8dadf14 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.SelectionSort;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.data.table.BaseTable;
@@ -37,6 +38,7 @@ import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.spi.utils.BytesUtils;
+
/**
* This serves the following purposes:
*
@@ -47,21 +49,18 @@ import org.apache.pinot.spi.utils.BytesUtils;
* uses {@link Set} to store unique records.
*/
public class DistinctTable extends BaseTable {
- private static final double LOAD_FACTOR = 0.75;
private static final int MAX_INITIAL_CAPACITY = 64 * 1024;
private Set<Record> _uniqueRecordsSet;
private boolean _noMoreNewRecords;
private Iterator<Record> _sortedIterator;
- public DistinctTable(DataSchema dataSchema, List<SelectionSort> orderBy, int
limit) {
+ public DistinctTable(DataSchema dataSchema, List<SelectionSort> orderBy, int
capacity) {
// TODO: see if 64k is the right max initial capacity to use
- // if it turns out that users always use LIMIT N > 0.75 * 64k and
- // there are indeed that many records, then there will be resizes.
- // The current method of setting the initial capacity as
- // min(64k, limit/loadFactor) will not require resizes for LIMIT N
- // where N <= 48000
- super(dataSchema, Collections.emptyList(), orderBy, limit);
- int initialCapacity = Math.min(MAX_INITIAL_CAPACITY,
Math.abs(nextPowerOfTwo((int) (limit / LOAD_FACTOR))));
+ // NOTE: The passed in capacity is calculated based on the LIMIT in the
query as Math.max(limit * 5, 5000). When
+ // LIMIT is smaller than (64 * 1024 * 0.75 (load factor) / 5 =
9830), then it is guaranteed that no resize is
+ // required.
+ super(dataSchema, Collections.emptyList(), orderBy, capacity);
+ int initialCapacity = Math.min(MAX_INITIAL_CAPACITY,
HashUtil.getHashMapCapacity(capacity));
_uniqueRecordsSet = new HashSet<>(initialCapacity);
_noMoreNewRecords = false;
}
@@ -151,13 +150,15 @@ public class DistinctTable extends BaseTable {
* @param byteBuffer data to deserialize
* @throws IOException
*/
- public DistinctTable(ByteBuffer byteBuffer) throws IOException {
+ public DistinctTable(ByteBuffer byteBuffer)
+ throws IOException {
// This is called by the BrokerReduceService when it de-serializes the
// DISTINCT result from the DataTable. As of now we don't have all the
// information to pass to super class so just pass null, empty lists
// and the broker will set the correct information before merging the
// data tables.
- super(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
Collections.emptyList(), new ArrayList<>(), 0);
+ super(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
Collections.emptyList(), new ArrayList<>(),
+ 0);
DataTable dataTable = DataTableFactory.getDataTable(byteBuffer);
_dataSchema = dataTable.getDataSchema();
_uniqueRecordsSet = new HashSet<>();
@@ -214,18 +215,6 @@ public class DistinctTable extends BaseTable {
addCapacityAndOrderByInfo(brokerRequest.getOrderBy(),
brokerRequest.getLimit());
}
- private static int nextPowerOfTwo(int val) {
- if (val == 0 || val == 1) {
- return val + 1;
- }
- int highestBit = Integer.highestOneBit(val);
- if (highestBit == val) {
- return val;
- } else {
- return highestBit << 1;
- }
- }
-
private void resize(int trimToSize) {
_tableResizer.resizeRecordsSet(_uniqueRecordsSet, trimToSize);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
index d7a5a98..457e431 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
@@ -33,6 +33,7 @@ import
org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.DistinctTable;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.pql.parsers.pql2.ast.FunctionCallAstNode;
@@ -43,15 +44,15 @@ import
org.apache.pinot.pql.parsers.pql2.ast.FunctionCallAstNode;
public class DistinctAggregationFunction implements
AggregationFunction<DistinctTable, Comparable> {
private final String[] _columns;
private final List<SelectionSort> _orderBy;
- private final int _limit;
+ private final int _capacity;
public DistinctAggregationFunction(String multiColumnExpression,
List<SelectionSort> orderBy, int limit) {
_columns =
multiColumnExpression.split(FunctionCallAstNode.DISTINCT_MULTI_COLUMN_SEPARATOR);
_orderBy = orderBy;
- // use a multiplier for trim size when DISTINCT queries have ORDER BY.
This logic
- // is similar to what we have in GROUP BY with ORDER BY
- // this does not guarantee 100% accuracy but still takes closer to it
- _limit = CollectionUtils.isNotEmpty(_orderBy) ? limit * 5 : limit;
+ // NOTE: DISTINCT with order-by is similar to group-by with order-by,
where we limit the maximum number of unique
+ // records (groups) for each query to reduce the memory footprint.
The result might not be 100% accurate in
+ // certain scenarios, but should give a good enough approximation.
+ _capacity = CollectionUtils.isNotEmpty(_orderBy) ?
GroupByUtils.getTableCapacity(limit) : limit;
}
@Override
@@ -82,7 +83,7 @@ public class DistinctAggregationFunction implements
AggregationFunction<Distinct
columnDataTypes[i] =
ColumnDataType.fromDataTypeSV(blockValSets[i].getValueType());
}
DataSchema dataSchema = new DataSchema(_columns, columnDataTypes);
- distinctTable = new DistinctTable(dataSchema, _orderBy, _limit);
+ distinctTable = new DistinctTable(dataSchema, _orderBy, _capacity);
aggregationResultHolder.setValue(distinctTable);
}
@@ -114,7 +115,7 @@ public class DistinctAggregationFunction implements
AggregationFunction<Distinct
ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
// NOTE: Use STRING for unknown type
Arrays.fill(columnDataTypes, ColumnDataType.STRING);
- return new DistinctTable(new DataSchema(_columns, columnDataTypes),
_orderBy, _limit);
+ return new DistinctTable(new DataSchema(_columns, columnDataTypes),
_orderBy, _capacity);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]