This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4967780802 Configure final reduce phase threads for heavy aggreagtion
functions (#14662)
4967780802 is described below
commit 496778080232f7bff961dbaa56dca4c2b6f1f9b5
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Jan 24 05:34:04 2025 +0800
Configure final reduce phase threads for heavy aggreagtion functions
(#14662)
* Configure final reduce phase threads for heavy aggreagtion functions
* Address comments
* Add tests with numThreadsForFinalReduce
---
.../common/utils/config/QueryOptionsUtils.java | 13 +++
.../core/data/table/ConcurrentIndexedTable.java | 5 +-
.../apache/pinot/core/data/table/IndexedTable.java | 101 +++++++++++++++++++--
.../pinot/core/data/table/SimpleIndexedTable.java | 6 +-
.../table/UnboundedConcurrentIndexedTable.java | 6 +-
.../operator/combine/GroupByCombineOperator.java | 3 +-
.../core/plan/maker/InstancePlanMakerImplV2.java | 17 ++++
.../core/query/reduce/GroupByDataTableReducer.java | 2 +-
.../core/query/request/context/QueryContext.java | 21 +++++
.../org/apache/pinot/core/util/GroupByUtils.java | 35 ++++---
.../accounting/ResourceManagerAccountingTest.java | 3 +-
.../pinot/core/data/table/IndexedTableTest.java | 19 ++--
.../pinot/integration/tests/custom/ArrayTest.java | 4 +-
.../integration/tests/custom/BytesTypeTest.java | 6 +-
.../integration/tests/custom/CpcSketchTest.java | 5 +-
.../CustomDataQueryClusterIntegrationTest.java | 16 ++--
.../tests/custom/FloatingPointDataTypeTest.java | 4 +-
.../integration/tests/custom/GeoSpatialTest.java | 5 +-
.../integration/tests/custom/JsonPathTest.java | 4 +-
.../tests/custom/MapFieldTypeRealtimeTest.java | 4 +-
.../integration/tests/custom/MapFieldTypeTest.java | 4 +-
.../integration/tests/custom/MapTypeTest.java | 4 +-
.../integration/tests/custom/SumPrecisionTest.java | 5 +-
.../integration/tests/custom/TextIndicesTest.java | 4 +-
.../integration/tests/custom/ThetaSketchTest.java | 4 +-
.../integration/tests/custom/TimestampTest.java | 5 +-
.../integration/tests/custom/TupleSketchTest.java | 5 +-
.../pinot/integration/tests/custom/ULLTest.java | 5 +-
.../pinot/integration/tests/custom/VectorTest.java | 4 +-
.../integration/tests/custom/WindowFunnelTest.java | 65 +++++++++++--
.../apache/pinot/perf/BenchmarkCombineGroupBy.java | 2 +-
.../apache/pinot/perf/BenchmarkIndexedTable.java | 4 +-
.../apache/pinot/spi/utils/CommonConstants.java | 11 ++-
33 files changed, 314 insertions(+), 87 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 5f88a9691c..5e8ba86643 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -248,6 +248,19 @@ public class QueryOptionsUtils {
return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD,
groupByTrimThreshold);
}
+ @Nullable
+ public static Integer getNumThreadsExtractFinalResult(Map<String, String>
queryOptions) {
+ String numThreadsExtractFinalResultString =
queryOptions.get(QueryOptionKey.NUM_THREADS_EXTRACT_FINAL_RESULT);
+ return checkedParseInt(QueryOptionKey.NUM_THREADS_EXTRACT_FINAL_RESULT,
numThreadsExtractFinalResultString, 1);
+ }
+
+ @Nullable
+ public static Integer getChunkSizeExtractFinalResult(Map<String, String>
queryOptions) {
+ String chunkSizeExtractFinalResultString =
+ queryOptions.get(QueryOptionKey.CHUNK_SIZE_EXTRACT_FINAL_RESULT);
+ return checkedParseInt(QueryOptionKey.CHUNK_SIZE_EXTRACT_FINAL_RESULT,
chunkSizeExtractFinalResultString, 1);
+ }
+
public static boolean isNullHandlingEnabled(Map<String, String>
queryOptions) {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ENABLE_NULL_HANDLING));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
index 871eea7c26..fd75284324 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.data.table;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pinot.common.utils.DataSchema;
@@ -33,9 +34,9 @@ public class ConcurrentIndexedTable extends IndexedTable {
private final ReentrantReadWriteLock _readWriteLock = new
ReentrantReadWriteLock();
public ConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput,
QueryContext queryContext, int resultSize,
- int trimSize, int trimThreshold, int initialCapacity) {
+ int trimSize, int trimThreshold, int initialCapacity, ExecutorService
executorService) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize,
trimThreshold,
- new ConcurrentHashMap<>(initialCapacity));
+ new ConcurrentHashMap<>(initialCapacity), executorService);
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index bce224eb3a..d96854ccb5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -19,17 +19,24 @@
package org.apache.pinot.core.data.table;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
/**
@@ -37,6 +44,7 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class IndexedTable extends BaseTable {
+ private final ExecutorService _executorService;
protected final Map<Key, Record> _lookupMap;
protected final boolean _hasFinalInput;
protected final int _resultSize;
@@ -46,6 +54,8 @@ public abstract class IndexedTable extends BaseTable {
protected final TableResizer _tableResizer;
protected final int _trimSize;
protected final int _trimThreshold;
+ protected final int _numThreadsExtractFinalResult;
+ protected final int _chunkSizeExtractFinalResult;
protected Collection<Record> _topRecords;
private int _numResizes;
@@ -63,13 +73,14 @@ public abstract class IndexedTable extends BaseTable {
* @param lookupMap Map from keys to records
*/
protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput,
QueryContext queryContext, int resultSize,
- int trimSize, int trimThreshold, Map<Key, Record> lookupMap) {
+ int trimSize, int trimThreshold, Map<Key, Record> lookupMap,
ExecutorService executorService) {
super(dataSchema);
Preconditions.checkArgument(resultSize >= 0, "Result size can't be
negative");
Preconditions.checkArgument(trimSize >= 0, "Trim size can't be negative");
Preconditions.checkArgument(trimThreshold >= 0, "Trim threshold can't be
negative");
+ _executorService = executorService;
_lookupMap = lookupMap;
_hasFinalInput = hasFinalInput;
_resultSize = resultSize;
@@ -84,6 +95,10 @@ public abstract class IndexedTable extends BaseTable {
assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold ==
Integer.MAX_VALUE);
_trimSize = trimSize;
_trimThreshold = trimThreshold;
+ // NOTE: The upper limit of threads number for final reduce is set to 2 *
number of available processors by default
+ _numThreadsExtractFinalResult =
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+ Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS));
+ _chunkSizeExtractFinalResult =
queryContext.getChunkSizeExtractFinalResult();
}
@Override
@@ -157,14 +172,88 @@ public abstract class IndexedTable extends BaseTable {
for (int i = 0; i < numAggregationFunctions; i++) {
columnDataTypes[i + _numKeyColumns] =
_aggregationFunctions[i].getFinalResultColumnType();
}
- for (Record record : _topRecords) {
- Object[] values = record.getValues();
- for (int i = 0; i < numAggregationFunctions; i++) {
- int colId = i + _numKeyColumns;
- values[colId] =
_aggregationFunctions[i].extractFinalResult(values[colId]);
+ int numThreadsExtractFinalResult = inferNumThreadsExtractFinalResult();
+ // Submit task when the EXECUTOR_SERVICE is not overloaded
+ if (numThreadsExtractFinalResult > 1) {
+ // Multi-threaded final reduce
+ List<Future<Void>> futures = new
ArrayList<>(numThreadsExtractFinalResult);
+ try {
+ List<Record> topRecordsList = new ArrayList<>(_topRecords);
+ int chunkSize = (topRecordsList.size() +
numThreadsExtractFinalResult - 1) / numThreadsExtractFinalResult;
+ for (int threadId = 0; threadId < numThreadsExtractFinalResult;
threadId++) {
+ int startIdx = threadId * chunkSize;
+ int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size());
+ if (startIdx < endIdx) {
+ // Submit a task for processing a chunk of values
+ futures.add(_executorService.submit(new TraceCallable<Void>() {
+ @Override
+ public Void callJob() {
+ for (int recordIdx = startIdx; recordIdx < endIdx;
recordIdx++) {
+ Object[] values =
topRecordsList.get(recordIdx).getValues();
+ for (int i = 0; i < numAggregationFunctions; i++) {
+ int colId = i + _numKeyColumns;
+ values[colId] =
_aggregationFunctions[i].extractFinalResult(values[colId]);
+ }
+ }
+ return null;
+ }
+ }));
+ }
+ }
+ // Wait for all tasks to complete
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ // Cancel all running tasks
+ for (Future<Void> future : futures) {
+ future.cancel(true);
+ }
+ throw new RuntimeException("Error during multi-threaded final
reduce", e);
}
+ } else {
+ for (Record record : _topRecords) {
+ Object[] values = record.getValues();
+ for (int i = 0; i < numAggregationFunctions; i++) {
+ int colId = i + _numKeyColumns;
+ values[colId] =
_aggregationFunctions[i].extractFinalResult(values[colId]);
+ }
+ }
+ }
+ }
+ }
+
+ private int inferNumThreadsExtractFinalResult() {
+ if (_numThreadsExtractFinalResult > 1) {
+ return _numThreadsExtractFinalResult;
+ }
+ if (containsExpensiveAggregationFunctions()) {
+ int parallelChunkSize = _chunkSizeExtractFinalResult;
+ if (_topRecords != null && _topRecords.size() > parallelChunkSize) {
+ int estimatedThreads = (int) Math.ceil((double) _topRecords.size() /
parallelChunkSize);
+ if (estimatedThreads == 0) {
+ return 1;
+ }
+ return Math.min(estimatedThreads,
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
+ }
+ }
+ // Default to 1 thread
+ return 1;
+ }
+
+ private boolean containsExpensiveAggregationFunctions() {
+ for (AggregationFunction aggregationFunction : _aggregationFunctions) {
+ switch (aggregationFunction.getType()) {
+ case FUNNELCOMPLETECOUNT:
+ case FUNNELCOUNT:
+ case FUNNELMATCHSTEP:
+ case FUNNELMAXSTEP:
+ return true;
+ default:
+ break;
}
}
+ return false;
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
index df89c3a8e1..e05f8dea9c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.data.table;
import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -31,8 +32,9 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
public class SimpleIndexedTable extends IndexedTable {
public SimpleIndexedTable(DataSchema dataSchema, boolean hasFinalInput,
QueryContext queryContext, int resultSize,
- int trimSize, int trimThreshold, int initialCapacity) {
- super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize,
trimThreshold, new HashMap<>(initialCapacity));
+ int trimSize, int trimThreshold, int initialCapacity, ExecutorService
executorService) {
+ super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize,
trimThreshold, new HashMap<>(initialCapacity),
+ executorService);
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
index f397ac0e8c..93ec7a967a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/UnboundedConcurrentIndexedTable.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.data.table;
+import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -36,8 +37,9 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable {
public UnboundedConcurrentIndexedTable(DataSchema dataSchema, boolean
hasFinalInput, QueryContext queryContext,
- int resultSize, int initialCapacity) {
- super(dataSchema, hasFinalInput, queryContext, resultSize,
Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity);
+ int resultSize, int initialCapacity, ExecutorService executorService) {
+ super(dataSchema, hasFinalInput, queryContext, resultSize,
Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity,
+ executorService);
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 633fb7d5e6..3439bc9109 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -111,7 +111,8 @@ public class GroupByCombineOperator extends
BaseSingleBlockCombineOperator<Group
if (_indexedTable == null) {
synchronized (this) {
if (_indexedTable == null) {
- _indexedTable =
GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext,
_numTasks);
+ _indexedTable =
GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext,
_numTasks,
+ _executorService);
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 82f1549971..82aa1e25d2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -94,6 +94,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
// set as pinot.server.query.executor.groupby.trim.threshold
public static final String GROUPBY_TRIM_THRESHOLD_KEY =
"groupby.trim.threshold";
public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
+ public static final int DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT = 1;
+ public static final int DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT = 10_000;
private static final Logger LOGGER =
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
@@ -268,6 +270,21 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
} else {
queryContext.setGroupTrimThreshold(_groupByTrimThreshold);
}
+ // Set numThreadsExtractFinalResult
+ Integer numThreadsExtractFinalResult =
QueryOptionsUtils.getNumThreadsExtractFinalResult(queryOptions);
+ if (numThreadsExtractFinalResult != null) {
+
queryContext.setNumThreadsExtractFinalResult(numThreadsExtractFinalResult);
+ } else {
+
queryContext.setNumThreadsExtractFinalResult(DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT);
+ }
+ // Set chunkSizeExtractFinalResult
+ Integer chunkSizeExtractFinalResult =
+ QueryOptionsUtils.getChunkSizeExtractFinalResult(queryOptions);
+ if (chunkSizeExtractFinalResult != null) {
+
queryContext.setChunkSizeExtractFinalResult(chunkSizeExtractFinalResult);
+ } else {
+
queryContext.setChunkSizeExtractFinalResult(DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT);
+ }
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index c53be31ed5..e1db966f1b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -240,7 +240,7 @@ public class GroupByDataTableReducer implements
DataTableReducer {
// Create an indexed table to perform the reduce.
IndexedTable indexedTable =
GroupByUtils.createIndexedTableForDataTableReducer(dataTables.get(0),
_queryContext, reducerContext,
- numReduceThreadsToUse);
+ numReduceThreadsToUse, reducerContext.getExecutorService());
// Create groups of data tables that each thread can process concurrently.
// Given that numReduceThreads is <= numDataTables, each group will have
at least one data table.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index e5ce066806..a804d1f3bb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -124,6 +124,11 @@ public class QueryContext {
private int _minServerGroupTrimSize =
InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE;
// Trim threshold to use for server combine for SQL GROUP BY
private int _groupTrimThreshold =
InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD;
+ // Number of threads to use for final reduce
+ private int _numThreadsExtractFinalResult =
InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT;
+ // Parallel chunk size for final reduce
+ private int _chunkSizeExtractFinalResult =
+ InstancePlanMakerImplV2.DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT;
// Whether null handling is enabled
private boolean _nullHandlingEnabled;
// Whether server returns the final result
@@ -411,6 +416,22 @@ public class QueryContext {
_groupTrimThreshold = groupTrimThreshold;
}
+ public int getNumThreadsExtractFinalResult() {
+ return _numThreadsExtractFinalResult;
+ }
+
+ public void setNumThreadsExtractFinalResult(int
numThreadsExtractFinalResult) {
+ _numThreadsExtractFinalResult = numThreadsExtractFinalResult;
+ }
+
+ public int getChunkSizeExtractFinalResult() {
+ return _chunkSizeExtractFinalResult;
+ }
+
+ public void setChunkSizeExtractFinalResult(int chunkSizeExtractFinalResult) {
+ _chunkSizeExtractFinalResult = chunkSizeExtractFinalResult;
+ }
+
public boolean isNullHandlingEnabled() {
return _nullHandlingEnabled;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
index ac25d4a31b..5da707f61e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.util;
import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.HashUtil;
@@ -49,7 +50,7 @@ public final class GroupByUtils {
/**
* Returns the capacity of the table required by the given query.
* NOTE: It returns {@code max(limit * 5, minNumGroups)} where minNumGroups
is configurable to tune the table size and
- * result accuracy.
+ * result accuracy.
*/
public static int getTableCapacity(int limit, int minNumGroups) {
long capacityByLimit = limit * 5L;
@@ -93,7 +94,7 @@ public final class GroupByUtils {
* Creates an indexed table for the combine operator given a sample results
block.
*/
public static IndexedTable
createIndexedTableForCombineOperator(GroupByResultsBlock resultsBlock,
- QueryContext queryContext, int numThreads) {
+ QueryContext queryContext, int numThreads, ExecutorService
executorService) {
DataSchema dataSchema = resultsBlock.getDataSchema();
int numGroups = resultsBlock.getNumGroups();
int limit = queryContext.getLimit();
@@ -119,7 +120,8 @@ public final class GroupByUtils {
resultSize = limit;
}
int initialCapacity = getIndexedTableInitialCapacity(resultSize,
numGroups, minInitialIndexedTableCapacity);
- return getTrimDisabledIndexedTable(dataSchema, false, queryContext,
resultSize, initialCapacity, numThreads);
+ return getTrimDisabledIndexedTable(dataSchema, false, queryContext,
resultSize, initialCapacity, numThreads,
+ executorService);
}
int resultSize;
@@ -132,10 +134,11 @@ public final class GroupByUtils {
int trimThreshold = getIndexedTableTrimThreshold(trimSize,
queryContext.getGroupTrimThreshold());
int initialCapacity = getIndexedTableInitialCapacity(trimThreshold,
numGroups, minInitialIndexedTableCapacity);
if (trimThreshold == Integer.MAX_VALUE) {
- return getTrimDisabledIndexedTable(dataSchema, false, queryContext,
resultSize, initialCapacity, numThreads);
+ return getTrimDisabledIndexedTable(dataSchema, false, queryContext,
resultSize, initialCapacity, numThreads,
+ executorService);
} else {
return getTrimEnabledIndexedTable(dataSchema, false, queryContext,
resultSize, trimSize, trimThreshold,
- initialCapacity, numThreads);
+ initialCapacity, numThreads, executorService);
}
}
@@ -143,7 +146,7 @@ public final class GroupByUtils {
* Creates an indexed table for the data table reducer given a sample data
table.
*/
public static IndexedTable createIndexedTableForDataTableReducer(DataTable
dataTable, QueryContext queryContext,
- DataTableReducerContext reducerContext, int numThreads) {
+ DataTableReducerContext reducerContext, int numThreads, ExecutorService
executorService) {
DataSchema dataSchema = dataTable.getDataSchema();
int numGroups = dataTable.getNumberOfRows();
int limit = queryContext.getLimit();
@@ -166,39 +169,41 @@ public final class GroupByUtils {
if (!hasOrderBy) {
int initialCapacity = getIndexedTableInitialCapacity(resultSize,
numGroups, minInitialIndexedTableCapacity);
return getTrimDisabledIndexedTable(dataSchema, hasFinalInput,
queryContext, resultSize, initialCapacity,
- numThreads);
+ numThreads, executorService);
}
int trimThreshold = getIndexedTableTrimThreshold(trimSize,
reducerContext.getGroupByTrimThreshold());
int initialCapacity = getIndexedTableInitialCapacity(trimThreshold,
numGroups, minInitialIndexedTableCapacity);
if (trimThreshold == Integer.MAX_VALUE) {
return getTrimDisabledIndexedTable(dataSchema, hasFinalInput,
queryContext, resultSize, initialCapacity,
- numThreads);
+ numThreads, executorService);
} else {
return getTrimEnabledIndexedTable(dataSchema, hasFinalInput,
queryContext, resultSize, trimSize, trimThreshold,
- initialCapacity, numThreads);
+ initialCapacity, numThreads, executorService);
}
}
private static IndexedTable getTrimDisabledIndexedTable(DataSchema
dataSchema, boolean hasFinalInput,
- QueryContext queryContext, int resultSize, int initialCapacity, int
numThreads) {
+ QueryContext queryContext, int resultSize, int initialCapacity, int
numThreads, ExecutorService executorService) {
if (numThreads == 1) {
return new SimpleIndexedTable(dataSchema, hasFinalInput, queryContext,
resultSize, Integer.MAX_VALUE,
- Integer.MAX_VALUE, initialCapacity);
+ Integer.MAX_VALUE, initialCapacity, executorService);
} else {
- return new UnboundedConcurrentIndexedTable(dataSchema, hasFinalInput,
queryContext, resultSize, initialCapacity);
+ return new UnboundedConcurrentIndexedTable(dataSchema, hasFinalInput,
queryContext, resultSize, initialCapacity,
+ executorService);
}
}
private static IndexedTable getTrimEnabledIndexedTable(DataSchema
dataSchema, boolean hasFinalInput,
- QueryContext queryContext, int resultSize, int trimSize, int
trimThreshold, int initialCapacity, int numThreads) {
+ QueryContext queryContext, int resultSize, int trimSize, int
trimThreshold, int initialCapacity, int numThreads,
+ ExecutorService executorService) {
assert trimThreshold != Integer.MAX_VALUE;
if (numThreads == 1) {
return new SimpleIndexedTable(dataSchema, hasFinalInput, queryContext,
resultSize, trimSize, trimThreshold,
- initialCapacity);
+ initialCapacity, executorService);
} else {
return new ConcurrentIndexedTable(dataSchema, hasFinalInput,
queryContext, resultSize, trimSize, trimThreshold,
- initialCapacity);
+ initialCapacity, executorService);
}
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index e77e644fc3..1ebfaf2248 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -331,7 +332,7 @@ public class ResourceManagerAccountingTest {
List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema,
NUM_ROWS, 0);
IndexedTable indexedTable =
new SimpleIndexedTable(dataSchema, false, queryContext, NUM_ROWS,
Integer.MAX_VALUE, Integer.MAX_VALUE,
-
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
+
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY,
Executors.newCachedThreadPool());
for (Object[] row : rows) {
indexedTable.upsert(new Record(row));
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
index af8d8cf2ff..913ce906f6 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
@@ -57,7 +57,8 @@ public class IndexedTableTest {
ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
});
IndexedTable indexedTable =
- new ConcurrentIndexedTable(dataSchema, false, queryContext, 5,
TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY);
+ new ConcurrentIndexedTable(dataSchema, false, queryContext, 5,
TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY,
+ Executors.newCachedThreadPool());
// 3 threads upsert together
// a inserted 6 times (60), b inserted 5 times (50), d inserted 2 times
(20)
@@ -131,18 +132,22 @@ public class IndexedTableTest {
// Test SimpleIndexedTable
IndexedTable indexedTable =
- new SimpleIndexedTable(dataSchema, false, queryContext, 5, TRIM_SIZE,
TRIM_THRESHOLD, INITIAL_CAPACITY);
+ new SimpleIndexedTable(dataSchema, false, queryContext, 5, TRIM_SIZE,
TRIM_THRESHOLD, INITIAL_CAPACITY,
+ Executors.newCachedThreadPool());
IndexedTable mergeTable =
- new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE,
TRIM_THRESHOLD, INITIAL_CAPACITY);
+ new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE,
TRIM_THRESHOLD, INITIAL_CAPACITY,
+ Executors.newCachedThreadPool());
testNonConcurrent(indexedTable, mergeTable);
indexedTable.finish(true);
checkSurvivors(indexedTable, survivors);
// Test ConcurrentIndexedTable
indexedTable =
- new ConcurrentIndexedTable(dataSchema, false, queryContext, 5,
TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY);
+ new ConcurrentIndexedTable(dataSchema, false, queryContext, 5,
TRIM_SIZE, TRIM_THRESHOLD, INITIAL_CAPACITY,
+ Executors.newCachedThreadPool());
mergeTable =
- new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE,
TRIM_THRESHOLD, INITIAL_CAPACITY);
+ new SimpleIndexedTable(dataSchema, false, queryContext, 10, TRIM_SIZE,
TRIM_THRESHOLD, INITIAL_CAPACITY,
+ Executors.newCachedThreadPool());
testNonConcurrent(indexedTable, mergeTable);
indexedTable.finish(true);
checkSurvivors(indexedTable, survivors);
@@ -260,11 +265,11 @@ public class IndexedTableTest {
IndexedTable indexedTable =
new SimpleIndexedTable(dataSchema, false, queryContext, 5,
Integer.MAX_VALUE, Integer.MAX_VALUE,
- INITIAL_CAPACITY);
+ INITIAL_CAPACITY, Executors.newCachedThreadPool());
testNoMoreNewRecordsInTable(indexedTable);
indexedTable = new ConcurrentIndexedTable(dataSchema, false, queryContext,
5, Integer.MAX_VALUE, Integer.MAX_VALUE,
- INITIAL_CAPACITY);
+ INITIAL_CAPACITY, Executors.newCachedThreadPool());
testNoMoreNewRecordsInTable(indexedTable);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
index 78fda6266e..84598ab45b 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ArrayTest.java
@@ -873,7 +873,7 @@ public class ArrayTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -953,6 +953,6 @@ public class ArrayTest extends
CustomDataQueryClusterIntegrationTest {
));
}
}
- return avroFile;
+ return List.of(avroFile);
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
index 8e3f18c30d..6c798d1e68 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BytesTypeTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Base64;
+import java.util.List;
import java.util.UUID;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
@@ -88,7 +89,7 @@ public class BytesTypeTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -153,8 +154,7 @@ public class BytesTypeTest extends
CustomDataQueryClusterIntegrationTest {
fileWriter.append(record);
}
}
-
- return avroFile;
+ return List.of(avroFile);
}
private static String newRandomBase64String() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
index 85bcb59e8e..bf42a3ec5a 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Base64;
+import java.util.List;
import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
@@ -165,7 +166,7 @@ public class CpcSketchTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -188,7 +189,7 @@ public class CpcSketchTest extends
CustomDataQueryClusterIntegrationTest {
fileWriter.append(record);
}
}
- return avroFile;
+ return List.of(avroFile);
}
private byte[] getRandomRawValue() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
index 6f15e3be17..2d4569f012 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
@@ -94,22 +94,26 @@ public abstract class CustomDataQueryClusterIntegrationTest
extends BaseClusterI
Schema schema = createSchema();
addSchema(schema);
- File avroFile = createAvroFile();
+ List<File> avroFiles = createAvroFiles();
if (isRealtimeTable()) {
// create realtime table
- TableConfig tableConfig = createRealtimeTableConfig(avroFile);
+ TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
addTableConfig(tableConfig);
// Push data into Kafka
- pushAvroIntoKafka(List.of(avroFile));
+ pushAvroIntoKafka(avroFiles);
} else {
// create offline table
TableConfig tableConfig = createOfflineTableConfig();
addTableConfig(tableConfig);
// create & upload segments
- ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig,
schema, 0, _segmentDir, _tarDir);
- uploadSegments(getTableName(), _tarDir);
+ int segmentIndex = 0;
+ for (File avroFile : avroFiles) {
+ ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile,
tableConfig, schema, segmentIndex++, _segmentDir,
+ _tarDir);
+ uploadSegments(getTableName(), _tarDir);
+ }
}
waitForAllDocsLoaded(60_000);
@@ -247,7 +251,7 @@ public abstract class CustomDataQueryClusterIntegrationTest
extends BaseClusterI
@Override
public abstract Schema createSchema();
- public abstract File createAvroFile()
+ public abstract List<File> createAvroFiles()
throws Exception;
public boolean isRealtimeTable() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
index 6c76127021..7adc80628c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/FloatingPointDataTypeTest.java
@@ -72,7 +72,7 @@ public class FloatingPointDataTypeTest extends
CustomDataQueryClusterIntegration
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws IOException {
// create avro schema
@@ -124,7 +124,7 @@ public class FloatingPointDataTypeTest extends
CustomDataQueryClusterIntegration
fileWriter.append(record);
}
}
- return avroFile;
+ return List.of(avroFile);
}
@Override
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
index b5cf20019b..6b11a8da3a 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GeoSpatialTest.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
@@ -130,7 +131,7 @@ public class GeoSpatialTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -185,7 +186,7 @@ public class GeoSpatialTest extends
CustomDataQueryClusterIntegrationTest {
}
}
- return avroFile;
+ return List.of(avroFile);
}
@Test(dataProvider = "useBothQueryEngines")
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
index 7dd460d1f5..d865d7defd 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/JsonPathTest.java
@@ -96,7 +96,7 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
List<org.apache.avro.Schema.Field> fields =
@@ -130,7 +130,7 @@ public class JsonPathTest extends
CustomDataQueryClusterIntegrationTest {
}
Collections.sort(_sortedSequenceIds);
- return avroFile;
+ return List.of(avroFile);
}
@Test(dataProvider = "useBothQueryEngines")
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
index a9cee052b1..8d54850f23 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeRealtimeTest.java
@@ -92,7 +92,7 @@ public class MapFieldTypeRealtimeTest extends
CustomDataQueryClusterIntegrationT
.build();
}
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
org.apache.avro.Schema stringMapAvroSchema =
@@ -126,7 +126,7 @@ public class MapFieldTypeRealtimeTest extends
CustomDataQueryClusterIntegrationT
fileWriter.append(record);
}
}
- return avroFile;
+ return List.of(avroFile);
}
protected int getSelectionDefaultDocCount() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
index 74ce24e3d7..e906c5b865 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
@@ -89,7 +89,7 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
.build();
}
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
org.apache.avro.Schema stringMapAvroSchema =
@@ -119,7 +119,7 @@ public class MapFieldTypeTest extends
CustomDataQueryClusterIntegrationTest {
fileWriter.append(record);
}
}
- return avroFile;
+ return List.of(avroFile);
}
protected int getSelectionDefaultDocCount() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
index 4c5571c9a1..578ee5e5ea 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
@@ -86,7 +86,7 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
.build();
}
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
org.apache.avro.Schema stringKeyMapAvroSchema =
@@ -116,7 +116,7 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
}
}
- return avroFile;
+ return List.of(avroFile);
}
protected int getSelectionDefaultDocCount() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
index b087913c7c..2677211430 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/SumPrecisionTest.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
@@ -64,7 +65,7 @@ public class SumPrecisionTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws IOException {
// create avro schema
@@ -103,7 +104,7 @@ public class SumPrecisionTest extends
CustomDataQueryClusterIntegrationTest {
fileWriter.append(record);
}
}
- return avroFile;
+ return List.of(avroFile);
}
@Override
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
index 9d963eab75..353cd00396 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TextIndicesTest.java
@@ -132,7 +132,7 @@ public class TextIndicesTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
// Read all skills from the skill file
InputStream inputStream =
getClass().getClassLoader().getResourceAsStream("data/text_search_data/skills.txt");
@@ -164,7 +164,7 @@ public class TextIndicesTest extends
CustomDataQueryClusterIntegrationTest {
fileWriter.append(record);
}
}
- return avroFile;
+ return List.of(avroFile);
}
@Override
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
index e9b577d977..e63e0ad99c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ThetaSketchTest.java
@@ -87,7 +87,7 @@ public class ThetaSketchTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws IOException {
// create avro schema
@@ -171,7 +171,7 @@ public class ThetaSketchTest extends
CustomDataQueryClusterIntegrationTest {
}
}
- return avroFile;
+ return List.of(avroFile);
}
@Test(dataProvider = "useV1QueryEngine")
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
index 60e63898f4..483370b247 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampTest.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.sql.Timestamp;
+import java.util.List;
import java.util.TimeZone;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
@@ -461,7 +462,7 @@ public class TimestampTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -533,6 +534,6 @@ public class TimestampTest extends
CustomDataQueryClusterIntegrationTest {
tsBaseLong += 86400000;
}
}
- return avroFile;
+ return List.of(avroFile);
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
index e4cd62c302..d39c64db57 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TupleSketchTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Base64;
+import java.util.List;
import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
@@ -279,7 +280,7 @@ public class TupleSketchTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -303,7 +304,7 @@ public class TupleSketchTest extends
CustomDataQueryClusterIntegrationTest {
fileWriter.append(record);
}
}
- return avroFile;
+ return List.of(avroFile);
}
private byte[] getRandomRawValue() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
index ccf82c2181..d017a71db8 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ULLTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Base64;
+import java.util.List;
import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
@@ -123,7 +124,7 @@ public class ULLTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -146,7 +147,7 @@ public class ULLTest extends
CustomDataQueryClusterIntegrationTest {
fileWriter.append(record);
}
}
- return avroFile;
+ return List.of(avroFile);
}
private byte[] getRandomRawValue() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
index 78a078d3d9..da2e03c9fe 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/VectorTest.java
@@ -252,7 +252,7 @@ public class VectorTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -320,7 +320,7 @@ public class VectorTest extends
CustomDataQueryClusterIntegrationTest {
fileWriter.append(record);
}
}
- return avroFile;
+ return List.of(avroFile);
}
private float[] createZeroVector(int vectorDimSize) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
index d185837c22..a318e5698c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
@@ -21,6 +21,9 @@ package org.apache.pinot.integration.tests.custom;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
@@ -212,7 +215,6 @@ public class WindowFunnelTest extends
CustomDataQueryClusterIntegrationTest {
}
}
-
@Test(dataProvider = "useBothQueryEngines")
public void testFunnelMaxStepGroupByQueriesWithModeKeepAll(boolean
useMultiStageQueryEngine)
throws Exception {
@@ -476,6 +478,53 @@ public class WindowFunnelTest extends
CustomDataQueryClusterIntegrationTest {
}
}
+ @Test(dataProvider = "useV2QueryEngine", invocationCount = 10,
threadPoolSize = 5)
+ public void testFunnelMatchStepWithMultiThreadsReduce(boolean
useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ int numThreadsExtractFinalResult = 2 + new Random().nextInt(10);
+ LOGGER.info("Running testFunnelMatchStepWithMultiThreadsReduce with
numThreadsExtractFinalResult: {}",
+ numThreadsExtractFinalResult);
+ String query =
+ String.format("SET numThreadsExtractFinalResult=" +
numThreadsExtractFinalResult + "; "
+ + "SELECT "
+ + "userId, funnelMatchStep(timestampCol, '1000', 4, "
+ + "url = '/product/search', "
+ + "url = '/cart/add', "
+ + "url = '/checkout/start', "
+ + "url = '/checkout/confirmation', "
+ + "'strict_increase' ) "
+ + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d ",
getTableName(), getCountStarResult());
+ JsonNode jsonNode = postQuery(query);
+ JsonNode rows = jsonNode.get("resultTable").get("rows");
+ assertEquals(rows.size(), 40);
+ for (int i = 0; i < 40; i++) {
+ JsonNode row = rows.get(i);
+ assertEquals(row.size(), 2);
+ assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+ int sumSteps = 0;
+ for (JsonNode step : row.get(1)) {
+ sumSteps += step.intValue();
+ }
+ switch (i / 10) {
+ case 0:
+ assertEquals(sumSteps, 4);
+ break;
+ case 1:
+ assertEquals(sumSteps, 2);
+ break;
+ case 2:
+ assertEquals(sumSteps, 3);
+ break;
+ case 3:
+ assertEquals(sumSteps, 1);
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ }
+
@Test(dataProvider = "useBothQueryEngines")
public void testFunnelMatchStepGroupByQueriesWithModeSkipLeaf(boolean
useMultiStageQueryEngine)
throws Exception {
@@ -860,7 +909,7 @@ public class WindowFunnelTest extends
CustomDataQueryClusterIntegrationTest {
}
@Override
- public File createAvroFile()
+ public List<File> createAvroFiles()
throws Exception {
// create avro schema
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
@@ -895,10 +944,11 @@ public class WindowFunnelTest extends
CustomDataQueryClusterIntegrationTest {
}
_countStarResult = totalRows * repeats;
// create avro file
- File avroFile = new File(_tempDir, "data.avro");
- try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
- fileWriter.create(avroSchema, avroFile);
- for (int repeat = 0; repeat < repeats; repeat++) {
+ List<File> avroFiles = new ArrayList<>();
+ for (int repeat = 0; repeat < repeats; repeat++) {
+ File avroFile = new File(_tempDir, "data" + repeat + ".avro");
+ try (DataFileWriter<GenericData.Record> fileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+ fileWriter.create(avroSchema, avroFile);
for (int i = 0; i < userUrlValues.length; i++) {
for (int j = 0; j < userUrlValues[i].length; j++) {
GenericData.Record record = new GenericData.Record(avroSchema);
@@ -909,7 +959,8 @@ public class WindowFunnelTest extends
CustomDataQueryClusterIntegrationTest {
}
}
}
+ avroFiles.add(avroFile);
}
- return avroFile;
+ return avroFiles;
}
}
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
index 579bd5b227..a8fd8cf98d 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
@@ -121,7 +121,7 @@ public class BenchmarkCombineGroupBy {
IndexedTable concurrentIndexedTable =
new ConcurrentIndexedTable(_dataSchema, false, _queryContext,
trimSize, trimSize,
InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD,
-
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
+
InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY,
_executorService);
List<Callable<Void>> innerSegmentCallables = new ArrayList<>(NUM_SEGMENTS);
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
index 6c9667533b..7f9be99ea9 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
@@ -119,7 +119,7 @@ public class BenchmarkIndexedTable {
// make 1 concurrent table
IndexedTable concurrentIndexedTable =
new ConcurrentIndexedTable(_dataSchema, false, _queryContext,
TRIM_SIZE, TRIM_SIZE, TRIM_THRESHOLD,
- TRIM_THRESHOLD);
+ TRIM_THRESHOLD, _executorService);
// 10 parallel threads putting 10k records into the table
@@ -169,7 +169,7 @@ public class BenchmarkIndexedTable {
// make 10 indexed tables
IndexedTable simpleIndexedTable =
new SimpleIndexedTable(_dataSchema, false, _queryContext, TRIM_SIZE,
TRIM_SIZE, TRIM_THRESHOLD,
- TRIM_THRESHOLD);
+ TRIM_THRESHOLD, _executorService);
simpleIndexedTables.add(simpleIndexedTable);
// put 10k records in each indexed table, in parallel
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index e3c3e0d483..30f4b44e27 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -429,12 +429,21 @@ public class CommonConstants {
* Trimming happens only when (sub)query contains order by clause. */
public static final String MIN_SEGMENT_GROUP_TRIM_SIZE =
"minSegmentGroupTrimSize";
- /** Max number of groups GroupByCombineOperator (running at server)
should return .*/
+ /** Max number of groups GroupByCombineOperator (running at server)
should return. */
public static final String MIN_SERVER_GROUP_TRIM_SIZE =
"minServerGroupTrimSize";
/** Max number of groups GroupByDataTableReducer (running at broker)
should return. */
public static final String MIN_BROKER_GROUP_TRIM_SIZE =
"minBrokerGroupTrimSize";
+ /** Number of threads used in the final reduce.
+ * This is useful for expensive aggregation functions. E.g. Funnel
queries are considered as expensive
+ * aggregation functions. */
+ public static final String NUM_THREADS_EXTRACT_FINAL_RESULT =
"numThreadsExtractFinalResult";
+
+ /** Number of threads used in the final reduce at broker level. */
+ public static final String CHUNK_SIZE_EXTRACT_FINAL_RESULT =
+ "chunkSizeExtractFinalResult";
+
public static final String NUM_REPLICA_GROUPS_TO_QUERY =
"numReplicaGroupsToQuery";
public static final String USE_FIXED_REPLICA = "useFixedReplica";
public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]