This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ff66173 Deprecate the PostAggregationGapfill. (#8383)
ff66173 is described below
commit ff66173a27eda5d327eb06c669e76a6488c82045
Author: weixiangsun <[email protected]>
AuthorDate: Tue Mar 22 11:34:13 2022 -0700
Deprecate the PostAggregationGapfill. (#8383)
---
.../core/operator/transform/TransformOperator.java | 4 +-
.../reduce/GapFillGroupByDataTableReducer.java | 491 ----------------
.../core/query/reduce/PostAggregationHandler.java | 4 +-
.../core/query/reduce/ResultReducerFactory.java | 3 -
.../org/apache/pinot/core/util/GapfillUtils.java | 30 +-
.../apache/pinot/queries/GapfillQueriesTest.java | 1 -
.../queries/PostAggregationGapfillQueriesTest.java | 616 ---------------------
7 files changed, 8 insertions(+), 1141 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
index 08bfeb8..d3b752e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
@@ -36,7 +36,6 @@ import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import
org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
@@ -63,8 +62,7 @@ public class TransformOperator extends
BaseOperator<TransformBlock> {
_projectionOperator = projectionOperator;
_dataSourceMap = projectionOperator.getDataSourceMap();
for (ExpressionContext expression : expressions) {
- TransformFunction transformFunction =
- TransformFunctionFactory.get(queryContext,
GapfillUtils.stripGapfill(expression), _dataSourceMap);
+ TransformFunction transformFunction =
TransformFunctionFactory.get(queryContext, expression, _dataSourceMap);
_transformFunctionMap.put(expression, transformFunction);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
deleted file mode 100644
index 9f739b0..0000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.reduce;
-
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.metrics.BrokerGauge;
-import org.apache.pinot.common.metrics.BrokerMeter;
-import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.request.context.FilterContext;
-import org.apache.pinot.common.request.context.OrderByExpressionContext;
-import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.QueryProcessingException;
-import org.apache.pinot.common.response.broker.ResultTable;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
-import org.apache.pinot.core.data.table.IndexedTable;
-import org.apache.pinot.core.data.table.Key;
-import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.data.table.SimpleIndexedTable;
-import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
-import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.transport.ServerRoutingInstance;
-import org.apache.pinot.core.util.GapfillUtils;
-import org.apache.pinot.core.util.GroupByUtils;
-import org.apache.pinot.core.util.trace.TraceCallable;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.DateTimeGranularitySpec;
-
-
-/**
- * Helper class to reduce data tables and set group by results into the
BrokerResponseNative
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class GapFillGroupByDataTableReducer implements DataTableReducer {
- private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD,
find a better value.
-
- private final QueryContext _queryContext;
- private final AggregationFunction[] _aggregationFunctions;
- private final int _numAggregationFunctions;
- private final List<ExpressionContext> _groupByExpressions;
- private final int _numGroupByExpressions;
- private final int _numColumns;
- private final DateTimeGranularitySpec _dateTimeGranularity;
- private final DateTimeFormatSpec _dateTimeFormatter;
- private final long _startMs;
- private final long _endMs;
- private final Set<Key> _groupByKeys;
- private final Map<Key, Object[]> _previousByGroupKey;
- private final int _numOfGroupByKeys;
- private final List<Integer> _groupByKeyIndexes;
- private final boolean [] _isGroupBySelections;
- private int _timeBucketIndex = -1;
-
- GapFillGroupByDataTableReducer(QueryContext queryContext) {
- Preconditions.checkArgument(
- queryContext.getBrokerRequest().getPinotQuery() != null, "GapFill can
only be applied to sql query");
- _queryContext = queryContext;
- _aggregationFunctions = queryContext.getAggregationFunctions();
- assert _aggregationFunctions != null;
- _numAggregationFunctions = _aggregationFunctions.length;
- _groupByExpressions = queryContext.getGroupByExpressions();
- assert _groupByExpressions != null;
- _numGroupByExpressions = _groupByExpressions.size();
- _numColumns = _numAggregationFunctions + _numGroupByExpressions;
-
- ExpressionContext gapFillSelection = null;
- for (ExpressionContext expressionContext :
_queryContext.getSelectExpressions()) {
- if (GapfillUtils.isPostAggregateGapfill(expressionContext)) {
- gapFillSelection = expressionContext;
- break;
- }
- }
-
- List<ExpressionContext> args =
gapFillSelection.getFunction().getArguments();
- Preconditions.checkArgument(
- args.size() == 5, "PostAggregateGapFill does not have correct number
of arguments.");
- Preconditions.checkArgument(
- args.get(1).getLiteral() != null, "The second argument of
PostAggregateGapFill should be TimeFormatter.");
- Preconditions.checkArgument(
- args.get(2).getLiteral() != null, "The third argument of
PostAggregateGapFill should be start time.");
- Preconditions.checkArgument(
- args.get(3).getLiteral() != null, "The fourth argument of
PostAggregateGapFill should be end time.");
- Preconditions.checkArgument(
- args.get(4).getLiteral() != null, "The fifth argument of
PostAggregateGapFill should be time bucket size.");
-
- boolean orderByTimeBucket = false;
- if (_queryContext.getOrderByExpressions() != null &&
!_queryContext.getOrderByExpressions().isEmpty()) {
- OrderByExpressionContext firstOrderByExpression =
_queryContext.getOrderByExpressions().get(0);
- orderByTimeBucket =
- firstOrderByExpression.isAsc() &&
firstOrderByExpression.getExpression().equals(gapFillSelection);
- }
-
- Preconditions.checkArgument(
- orderByTimeBucket, "PostAggregateGapFill does not work if the time
bucket is not ordered.");
-
- _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
- _dateTimeGranularity = new
DateTimeGranularitySpec(args.get(4).getLiteral());
- String start = args.get(2).getLiteral();
- String end = args.get(3).getLiteral();
- _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
- _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
- _groupByKeys = new HashSet<>();
- _previousByGroupKey = new HashMap<>();
- _numOfGroupByKeys = _queryContext.getGroupByExpressions().size() - 1;
- _groupByKeyIndexes = new ArrayList<>();
- _isGroupBySelections = new
boolean[_queryContext.getSelectExpressions().size()];
-
- for (ExpressionContext expressionContext : _groupByExpressions) {
- if (GapfillUtils.isPostAggregateGapfill(expressionContext)) {
- for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) {
- if
(expressionContext.equals(_queryContext.getSelectExpressions().get(i))) {
- _timeBucketIndex = i;
- _isGroupBySelections[i] = true;
- break;
- }
- }
- } else {
- for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) {
- if
(expressionContext.equals(_queryContext.getSelectExpressions().get(i))) {
- _groupByKeyIndexes.add(i);
- _isGroupBySelections[i] = true;
- break;
- }
- }
- }
- }
-
- Preconditions.checkArgument(_timeBucketIndex >= 0, "There is no time
bucket.");
- }
-
- private long truncate(long epoch) {
- int sz = _dateTimeGranularity.getSize();
- return epoch / sz * sz;
- }
-
- /**
- * Reduces and sets group by results into ResultTable, if responseFormat =
sql
- * By default, sets group by results into GroupByResults
- */
- @Override
- public void reduceAndSetResults(String tableName, DataSchema dataSchema,
- Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative
brokerResponseNative,
- DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
- assert dataSchema != null;
- Collection<DataTable> dataTables = dataTableMap.values();
-
- try {
- setSQLGroupByInResultTable(brokerResponseNative, dataSchema, dataTables,
reducerContext, tableName,
- brokerMetrics);
- } catch (TimeoutException e) {
- brokerResponseNative.getProcessingExceptions()
- .add(new
QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE,
e.getMessage()));
- }
- int resultSize = brokerResponseNative.getResultTable().getRows().size();
-
- if (brokerMetrics != null && resultSize > 0) {
- brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE,
resultSize);
- }
- }
-
- private Key constructGroupKeys(Object[] row) {
- Object [] groupKeys = new Object[_numOfGroupByKeys];
- for (int i = 0; i < _numOfGroupByKeys; i++) {
- groupKeys[i] = row[_groupByKeyIndexes.get(i)];
- }
- return new Key(groupKeys);
- }
-
- /**
- * Extract group by order by results and set into {@link ResultTable}
- * @param brokerResponseNative broker response
- * @param dataSchema data schema
- * @param dataTables Collection of data tables
- * @param reducerContext DataTableReducer context
- * @param rawTableName table name
- * @param brokerMetrics broker metrics (meters)
- * @throws TimeoutException If unable complete within timeout.
- */
- private void setSQLGroupByInResultTable(BrokerResponseNative
brokerResponseNative, DataSchema dataSchema,
- Collection<DataTable> dataTables, DataTableReducerContext
reducerContext, String rawTableName,
- BrokerMetrics brokerMetrics)
- throws TimeoutException {
- IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables,
reducerContext);
- if (brokerMetrics != null) {
- brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
- brokerMetrics.addValueToTableGauge(rawTableName,
BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
- }
- DataSchema prePostAggregationDataSchema =
getPrePostAggregationDataSchema(dataSchema);
- ColumnDataType[] columnDataTypes =
prePostAggregationDataSchema.getColumnDataTypes();
-
- PostAggregationHandler postAggregationHandler =
- new PostAggregationHandler(_queryContext,
prePostAggregationDataSchema);
- DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
- ColumnDataType[] resultColumnDataTypes =
resultDataSchema.getColumnDataTypes();
- Iterator<Record> sortedIterator = indexedTable.iterator();
- while (sortedIterator.hasNext()) {
- Object[] row = sortedIterator.next().getValues();
- extractFinalAggregationResults(row);
- for (int i = 0; i < columnDataTypes.length; i++) {
- row[i] = columnDataTypes[i].convert(row[i]);
- }
- Object[] resultRow = postAggregationHandler.getResult(row);
- for (int i = 0; i < resultColumnDataTypes.length; i++) {
- resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
- }
-
- _groupByKeys.add(constructGroupKeys(resultRow));
- }
-
- List<Object[]> gapfillResultRows = gapFill(indexedTable.iterator(),
postAggregationHandler);
- brokerResponseNative.setResultTable(new ResultTable(resultDataSchema,
gapfillResultRows));
- }
-
- List<Object[]> gapFill(Iterator<Record> sortedIterator,
PostAggregationHandler postAggregationHandler) {
- DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
- ColumnDataType[] resultColumnDataTypes =
resultDataSchema.getColumnDataTypes();
- int limit = _queryContext.getLimit();
- int numResultColumns = resultColumnDataTypes.length;
- List<Object[]> gapfillResultRows = new ArrayList<>(limit);
- long step = _dateTimeGranularity.granularityToMillis();
- FilterContext havingFilter = _queryContext.getHavingFilter();
- HavingFilterHandler havingFilterHandler = null;
- if (havingFilter != null) {
- havingFilterHandler = new HavingFilterHandler(havingFilter,
postAggregationHandler);
- }
- Record record = null;
- for (long time = _startMs; time + 2 * step <= _endMs; time += step) {
- Set<Key> keys = new HashSet<>(_groupByKeys);
- if (record == null && sortedIterator.hasNext()) {
- record = sortedIterator.next();
- }
-
- while (record != null) {
- Object[] row = record.getValues();
-
- Object[] resultRow = postAggregationHandler.getResult(row);
- for (int i = 0; i < resultColumnDataTypes.length; i++) {
- resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
- }
-
- long timeCol =
_dateTimeFormatter.fromFormatToMillis(String.valueOf(resultRow[_timeBucketIndex]));
- if (timeCol > time) {
- break;
- }
- if (timeCol == time) {
- if (havingFilterHandler == null || havingFilterHandler.isMatch(row))
{
- gapfillResultRows.add(resultRow);
- if (gapfillResultRows.size() == limit) {
- return gapfillResultRows;
- }
- }
- Key key = constructGroupKeys(resultRow);
- keys.remove(key);
- _previousByGroupKey.put(key, resultRow);
- }
- if (sortedIterator.hasNext()) {
- record = sortedIterator.next();
- } else {
- record = null;
- }
- }
-
- for (Key key : keys) {
- Object[] gapfillRow = new Object[numResultColumns];
- int keyIndex = 0;
- for (int i = 0; i < _isGroupBySelections.length; i++) {
- if (_isGroupBySelections[i]) {
- if (i == _timeBucketIndex) {
- if (resultColumnDataTypes[i] == ColumnDataType.LONG) {
- gapfillRow[_timeBucketIndex] =
Long.valueOf(_dateTimeFormatter.fromMillisToFormat(time));
- } else {
- gapfillRow[_timeBucketIndex] =
_dateTimeFormatter.fromMillisToFormat(time);
- }
- } else {
- gapfillRow[i] = key.getValues()[keyIndex++];
- }
- } else {
- gapfillRow[i] = getFillValue(i, key, resultColumnDataTypes[i]);
- }
- }
-
- if (havingFilterHandler == null ||
havingFilterHandler.isMatch(gapfillRow)) {
- gapfillResultRows.add(gapfillRow);
- if (gapfillResultRows.size() == limit) {
- return gapfillResultRows;
- }
- }
- }
- }
- return gapfillResultRows;
- }
-
- Object getFillValue(int columIndex, Object key, ColumnDataType dataType) {
- ExpressionContext expressionContext =
_queryContext.getSelectExpressions().get(columIndex);
- if (expressionContext.getFunction() != null &&
GapfillUtils.isFill(expressionContext)) {
- List<ExpressionContext> args =
expressionContext.getFunction().getArguments();
- if (args.get(1).getLiteral() == null) {
- throw new UnsupportedOperationException("Wrong Sql.");
- }
- GapfillUtils.FillType fillType =
GapfillUtils.FillType.valueOf(args.get(1).getLiteral());
- if (fillType == GapfillUtils.FillType.FILL_DEFAULT_VALUE) {
- // TODO: may fill the default value from sql in the future.
- return GapfillUtils.getDefaultValue(dataType);
- } else if (fillType == GapfillUtils.FillType.FILL_PREVIOUS_VALUE) {
- Object[] row = _previousByGroupKey.get(key);
- if (row != null) {
- return row[columIndex];
- } else {
- return GapfillUtils.getDefaultValue(dataType);
- }
- } else {
- throw new UnsupportedOperationException("unsupported fill type.");
- }
- } else {
- return GapfillUtils.getDefaultValue(dataType);
- }
- }
-
- /**
- * Helper method to extract the final aggregation results for the given row
(in-place).
- */
- private void extractFinalAggregationResults(Object[] row) {
- for (int i = 0; i < _numAggregationFunctions; i++) {
- int valueIndex = i + _numGroupByExpressions;
- row[valueIndex] =
_aggregationFunctions[i].extractFinalResult(row[valueIndex]);
- }
- }
-
- /**
- * Constructs the DataSchema for the rows before the post-aggregation (SQL
mode).
- */
- private DataSchema getPrePostAggregationDataSchema(DataSchema dataSchema) {
- String[] columnNames = dataSchema.getColumnNames();
- ColumnDataType[] columnDataTypes = new ColumnDataType[_numColumns];
- System.arraycopy(dataSchema.getColumnDataTypes(), 0, columnDataTypes, 0,
_numGroupByExpressions);
- for (int i = 0; i < _numAggregationFunctions; i++) {
- columnDataTypes[i + _numGroupByExpressions] =
_aggregationFunctions[i].getFinalResultColumnType();
- }
- return new DataSchema(columnNames, columnDataTypes);
- }
-
- private IndexedTable getIndexedTable(DataSchema dataSchema,
Collection<DataTable> dataTablesToReduce,
- DataTableReducerContext reducerContext)
- throws TimeoutException {
- long start = System.currentTimeMillis();
- int numDataTables = dataTablesToReduce.size();
-
- // Get the number of threads to use for reducing.
- // In case of single reduce thread, fall back to SimpleIndexedTable to
avoid redundant locking/unlocking calls.
- int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables,
reducerContext.getMaxReduceThreadsPerQuery());
- int limit = _queryContext.getLimit();
- // TODO: Make minTrimSize configurable
- int trimSize = GroupByUtils.getTableCapacity(limit);
- // NOTE: For query with HAVING clause, use trimSize as resultSize to
ensure the result accuracy.
- // TODO: Resolve the HAVING clause within the IndexedTable before
returning the result
- int resultSize = _queryContext.getHavingFilter() != null ? trimSize :
limit;
- int trimThreshold = reducerContext.getGroupByTrimThreshold();
- IndexedTable indexedTable;
- if (numReduceThreadsToUse <= 1) {
- indexedTable = new SimpleIndexedTable(dataSchema, _queryContext,
resultSize, trimSize, trimThreshold);
- } else {
- if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) {
- // special case of trim threshold where it is set to max value.
- // there won't be any trimming during upsert in this case.
- // thus we can avoid the overhead of read-lock and write-lock
- // in the upsert method.
- indexedTable = new UnboundedConcurrentIndexedTable(dataSchema,
_queryContext, resultSize);
- } else {
- indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext,
resultSize, trimSize, trimThreshold);
- }
- }
-
- // Create groups of data tables that each thread can process concurrently.
- // Given that numReduceThreads is <= numDataTables, each group will have
at least one data table.
- ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
- List<List<DataTable>> reduceGroups = new
ArrayList<>(numReduceThreadsToUse);
-
- for (int i = 0; i < numReduceThreadsToUse; i++) {
- reduceGroups.add(new ArrayList<>());
- }
- for (int i = 0; i < numDataTables; i++) {
- reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
- }
-
- ColumnDataType[] storedColumnDataTypes =
dataSchema.getStoredColumnDataTypes();
- long timeOutMs = reducerContext.getReduceTimeOutMs() -
(System.currentTimeMillis() - start);
- try {
- reducerContext.getExecutorService().invokeAll(reduceGroups.stream()
- .map(reduceGroup -> new TraceCallable<Void>() {
- @Override
- public Void callJob() throws Exception {
- for (DataTable dataTable : reduceGroup) {
- int numRows = dataTable.getNumberOfRows();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object[] values = new Object[_numColumns];
- for (int colId = 0; colId < _numColumns; colId++) {
- switch (storedColumnDataTypes[colId]) {
- case INT:
- values[colId] = dataTable.getInt(rowId, colId);
- break;
- case LONG:
- values[colId] = dataTable.getLong(rowId, colId);
- break;
- case FLOAT:
- values[colId] = dataTable.getFloat(rowId, colId);
- break;
- case DOUBLE:
- values[colId] = dataTable.getDouble(rowId, colId);
- break;
- case STRING:
- values[colId] = dataTable.getString(rowId, colId);
- break;
- case BYTES:
- values[colId] = dataTable.getBytes(rowId, colId);
- break;
- case OBJECT:
- values[colId] = dataTable.getObject(rowId, colId);
- break;
- // Add other aggregation intermediate result / group-by
column type supports here
- default:
- throw new IllegalStateException();
- }
- }
- indexedTable.upsert(new Record(values));
- }
- }
- return null;
- }
- }).collect(Collectors.toList()), timeOutMs, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- throw new TimeoutException("Timed out in broker reduce phase.");
- }
-
- indexedTable.finish(true);
- return indexedTable;
- }
-
- /**
- * Computes the number of reduce threads to use per query.
- * <ul>
- * <li> Use single thread if number of data tables to reduce is less than
- * {@value #MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE}.</li>
- * <li> Else, use min of max allowed reduce threads per query, and number
of data tables.</li>
- * </ul>
- *
- * @param numDataTables Number of data tables to reduce
- * @param maxReduceThreadsPerQuery Max allowed reduce threads per query
- * @return Number of reduce threads to use for the query
- */
- private int getNumReduceThreadsToUse(int numDataTables, int
maxReduceThreadsPerQuery) {
- // Use single thread if number of data tables <
MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE.
- if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) {
- return Math.min(1, numDataTables); // Number of data tables can be zero.
- }
-
- return Math.min(maxReduceThreadsPerQuery, numDataTables);
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
index 7fb878c..4c4bcaf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
@@ -35,7 +35,6 @@ import
org.apache.pinot.core.query.reduce.filter.LiteralValueExtractor;
import org.apache.pinot.core.query.reduce.filter.ValueExtractor;
import org.apache.pinot.core.query.reduce.filter.ValueExtractorFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.util.GapfillUtils;
/**
@@ -58,7 +57,7 @@ public class PostAggregationHandler implements
ValueExtractorFactory {
_numGroupByExpressions = groupByExpressions.size();
_groupByExpressionIndexMap = new HashMap<>();
for (int i = 0; i < _numGroupByExpressions; i++) {
-
_groupByExpressionIndexMap.put(GapfillUtils.stripGapfill(groupByExpressions.get(i)),
i);
+ _groupByExpressionIndexMap.put(groupByExpressions.get(i), i);
}
} else {
_numGroupByExpressions = 0;
@@ -107,7 +106,6 @@ public class PostAggregationHandler implements
ValueExtractorFactory {
*/
@Override
public ValueExtractor getValueExtractor(ExpressionContext expression) {
- expression = GapfillUtils.stripGapfill(expression);
if (expression.getType() == ExpressionContext.Type.LITERAL) {
// Literal
return new LiteralValueExtractor(expression.getLiteral());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
index a48a007..3631a3c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
@@ -23,7 +23,6 @@ import
org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
-import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -58,8 +57,6 @@ public final class ResultReducerFactory {
} else {
return new AggregationDataTableReducer(queryContext);
}
- } else if (GapfillUtils.isPostAggregateGapfill(queryContext)) {
- return new GapFillGroupByDataTableReducer(queryContext);
} else {
// Aggregation group-by query
return new GroupByDataTableReducer(queryContext);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
index c40837e..988f780 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
@@ -39,7 +39,6 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
* Util class to encapsulate all utilites required for gapfill.
*/
public class GapfillUtils {
- private static final String POST_AGGREGATE_GAP_FILL = "postaggregategapfill";
private static final String GAP_FILL = "gapfill";
private static final String AS = "as";
private static final String FILL = "fill";
@@ -56,29 +55,12 @@ public class GapfillUtils {
FunctionContext function = expression.getFunction();
String functionName = function.getFunctionName();
- if (functionName.equals(POST_AGGREGATE_GAP_FILL) ||
functionName.equals(FILL) || functionName.equals(GAP_FILL)) {
+ if (functionName.equals(FILL) || functionName.equals(GAP_FILL)) {
return function.getArguments().get(0);
}
return expression;
}
- public static boolean isPostAggregateGapfill(ExpressionContext
expressionContext) {
- if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
- return false;
- }
-
- return
POST_AGGREGATE_GAP_FILL.equals(expressionContext.getFunction().getFunctionName());
- }
-
- public static boolean isPostAggregateGapfill(QueryContext queryContext) {
- for (ExpressionContext expressionContext :
queryContext.getSelectExpressions()) {
- if (isPostAggregateGapfill(expressionContext)) {
- return true;
- }
- }
- return false;
- }
-
public static boolean isFill(ExpressionContext expressionContext) {
if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
return false;
@@ -189,15 +171,15 @@ public class GapfillUtils {
Preconditions.checkArgument(gapFillSelection != null &&
gapFillSelection.getFunction() != null,
"Gapfill Expression should be function.");
List<ExpressionContext> args =
gapFillSelection.getFunction().getArguments();
- Preconditions.checkArgument(args.size() > 5, "PreAggregateGapFill does not
have correct number of arguments.");
+ Preconditions.checkArgument(args.size() > 5, "Gapfill does not have
correct number of arguments.");
Preconditions.checkArgument(args.get(1).getLiteral() != null,
- "The second argument of PostAggregateGapFill should be
TimeFormatter.");
+ "The second argument of Gapfill should be TimeFormatter.");
Preconditions.checkArgument(args.get(2).getLiteral() != null,
- "The third argument of PostAggregateGapFill should be start time.");
+ "The third argument of Gapfill should be start time.");
Preconditions.checkArgument(args.get(3).getLiteral() != null,
- "The fourth argument of PostAggregateGapFill should be end time.");
+ "The fourth argument of Gapfill should be end time.");
Preconditions.checkArgument(args.get(4).getLiteral() != null,
- "The fifth argument of PostAggregateGapFill should be time bucket
size.");
+ "The fifth argument of Gapfill should be time bucket size.");
ExpressionContext timeseriesOn =
GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn
expressions should be specified.");
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/GapfillQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/GapfillQueriesTest.java
index bf0a28b..829a55d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/GapfillQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/GapfillQueriesTest.java
@@ -51,7 +51,6 @@ import org.testng.annotations.Test;
* Queries test for Gapfill queries.
*/
// TODO: Item 1. table alias for subquery in next PR
-// TODO: Item 2. Deprecate PostAggregateGapfill implementation in next PR
@SuppressWarnings("rawtypes")
public class GapfillQueriesTest extends BaseQueriesTest {
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"PostAggregationGapfillQueriesTest");
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java
deleted file mode 100644
index bfd7baa..0000000
---
a/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java
+++ /dev/null
@@ -1,616 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.queries;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.ResultTable;
-import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.ImmutableSegment;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.DateTimeGranularitySpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-/**
- * Queries test for PostAggregationGapfill queries.
- */
-@SuppressWarnings("rawtypes")
-public class PostAggregationGapfillQueriesTest extends BaseQueriesTest {
- private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"PostAggregationGapfillQueriesTest");
- private static final String RAW_TABLE_NAME = "parkingData";
- private static final String SEGMENT_NAME = "testSegment";
- private static final Random RANDOM = new Random();
-
- private static final int NUM_LOTS = 4;
-
- private static final String IS_OCCUPIED_COLUMN = "isOccupied";
- private static final String LOT_ID_COLUMN = "lotId";
- private static final String EVENT_TIME_COLUMN = "eventTime";
- private static final Schema SCHEMA = new Schema.SchemaBuilder()
- .addSingleValueDimension(IS_OCCUPIED_COLUMN, DataType.BOOLEAN)
- .addSingleValueDimension(LOT_ID_COLUMN, DataType.STRING)
- .addSingleValueDimension(EVENT_TIME_COLUMN, DataType.LONG)
- .setPrimaryKeyColumns(Arrays.asList(LOT_ID_COLUMN, EVENT_TIME_COLUMN))
- .build();
- private static final TableConfig TABLE_CONFIG = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
- .build();
-
- private IndexSegment _indexSegment;
- private List<IndexSegment> _indexSegments;
-
- @Override
- protected String getFilter() {
- // NOTE: Use a match all filter to switch between
DictionaryBasedAggregationOperator and AggregationOperator
- return " WHERE eventTime >= 0";
- }
-
- @Override
- protected IndexSegment getIndexSegment() {
- return _indexSegment;
- }
-
- @Override
- protected List<IndexSegment> getIndexSegments() {
- return _indexSegments;
- }
-
- @BeforeClass
- public void setUp()
- throws Exception {
- FileUtils.deleteDirectory(INDEX_DIR);
-
- long current = 1636286400000L; //November 7, 2021 12:00:00 PM
- int duplicates = 16;
- int interval = 1000 * 900; // 15 minutes
- long start = current - duplicates * 2 * interval; //November 7, 2021
4:00:00 AM
-
- List<GenericRow> records = new ArrayList<>(NUM_LOTS * 2);
- for (int i = 0; i < NUM_LOTS; i++) {
- for (int j = 0; j < duplicates; j++) {
- if (j == 4 || j == 5 || j == 6 || j == 7 || j == 10 || j == 11) {
- continue;
- }
- long parkingTime = start + interval * 2 * j + RANDOM.nextInt(interval);
- long departingTime = j == 3 ? start + interval * (2 * j + 6) +
RANDOM.nextInt(interval) : start
- + interval * (2 * j + 1) + RANDOM.nextInt(interval);
-
- GenericRow parkingRow = new GenericRow();
- parkingRow.putValue(EVENT_TIME_COLUMN, parkingTime);
- parkingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(i));
- parkingRow.putValue(IS_OCCUPIED_COLUMN, true);
- records.add(parkingRow);
-
- GenericRow departingRow = new GenericRow();
- departingRow.putValue(EVENT_TIME_COLUMN, departingTime);
- departingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(i));
- departingRow.putValue(IS_OCCUPIED_COLUMN, false);
- records.add(departingRow);
- }
- }
-
- SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
- segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
- segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
- segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
-
- SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
- driver.build();
-
- ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
- _indexSegment = immutableSegment;
- _indexSegments = Arrays.asList(immutableSegment);
- }
-
- @Test
- public void datetimeconvertGapfillTest() {
- String dataTimeConvertQuery = "SELECT "
- + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS',
'1:HOURS') AS time_col, "
- + "lotId, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "ORDER BY 1 "
- + "LIMIT 200";
-
- BrokerResponseNative dateTimeConvertBrokerResponse =
getBrokerResponseForSqlQuery(dataTimeConvertQuery);
-
- ResultTable dateTimeConvertResultTable =
dateTimeConvertBrokerResponse.getResultTable();
- Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
-
- String gapfillQuery = "SELECT "
- + "PostAggregateGapFill(DATETIMECONVERT(eventTime,
'1:MILLISECONDS:EPOCH', "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS',
'1:HOURS'), "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
- + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS
time_col, "
- + "lotId, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_PREVIOUS_VALUE') as status1, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_DEFAULT_VALUE') as status2, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "ORDER BY 1 "
- + "LIMIT 200";
-
- DateTimeFormatSpec dateTimeFormatter
- = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd
HH:mm:ss.SSS");
- DateTimeGranularitySpec dateTimeGranularity = new
DateTimeGranularitySpec("1:HOURS");
-
- BrokerResponseNative gapfillBrokerResponse =
getBrokerResponseForSqlQuery(gapfillQuery);
-
- ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
- Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
- List<Object[]> gapFillRows = gapFillResultTable.getRows();
- long start = dateTimeFormatter.fromFormatToMillis("2021-11-07
03:00:00.000");
- for (int i = 0; i < 32; i += 4) {
- String firstTimeCol = (String) gapFillRows.get(i)[0];
- long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
- Assert.assertEquals(timeStamp, start);
- Set<String> lots = new HashSet<>();
- lots.add((String) gapFillRows.get(i)[1]);
- for (int j = 1; j < 4; j++) {
- Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
- Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
- lots.add((String) gapFillRows.get(i + j)[1]);
- }
- start += dateTimeGranularity.granularityToMillis();
- }
- }
-
- @Test
- public void toEpochHoursGapfillTest() {
- String dataTimeConvertQuery = "SELECT "
- + "ToEpochHours(eventTime) AS time_col, "
- + "lotId, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "ORDER BY 1 "
- + "LIMIT 200";
-
- BrokerResponseNative dateTimeConvertBrokerResponse =
getBrokerResponseForSqlQuery(dataTimeConvertQuery);
-
- ResultTable dateTimeConvertResultTable =
dateTimeConvertBrokerResponse.getResultTable();
- Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
-
- String gapfillQuery = "SELECT "
- + "PostAggregateGapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', "
- + "'454515', '454524', '1:HOURS') AS time_col, "
- + "lotId, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_PREVIOUS_VALUE') as status1, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_DEFAULT_VALUE') as status2, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "ORDER BY 1 "
- + "LIMIT 200";
-
- DateTimeFormatSpec dateTimeFormatter = new
DateTimeFormatSpec("1:HOURS:EPOCH");
- DateTimeGranularitySpec dateTimeGranularity = new
DateTimeGranularitySpec("1:HOURS");
-
- BrokerResponseNative gapfillBrokerResponse =
getBrokerResponseForSqlQuery(gapfillQuery);
-
- ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
- Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
- List<Object[]> gapFillRows = gapFillResultTable.getRows();
- long start = dateTimeFormatter.fromFormatToMillis("454515");
- for (int i = 0; i < 32; i += 4) {
- Long firstTimeCol = (Long) gapFillRows.get(i)[0];
- long timeStamp =
dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
- Assert.assertEquals(timeStamp, start);
- Set<String> lots = new HashSet<>();
- lots.add((String) gapFillRows.get(i)[1]);
- for (int j = 1; j < 4; j++) {
- Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
- Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
- lots.add((String) gapFillRows.get(i + j)[1]);
- }
- start += dateTimeGranularity.granularityToMillis();
- }
- }
-
- @Test
- public void toEpochMinutesRoundedHoursGapfillTest() {
- String dataTimeConvertQuery = "SELECT "
- + "ToEpochMinutesRounded(eventTime, 60) AS time_col, "
- + "lotId, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "ORDER BY 1 "
- + "LIMIT 200";
-
- BrokerResponseNative dateTimeConvertBrokerResponse =
getBrokerResponseForSqlQuery(dataTimeConvertQuery);
-
- ResultTable dateTimeConvertResultTable =
dateTimeConvertBrokerResponse.getResultTable();
- Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
-
- String gapfillQuery = "SELECT "
- + "PostAggregateGapFill(ToEpochMinutesRounded(eventTime, 60),
'1:HOURS:EPOCH', "
- + "'454515', '454524', '1:HOURS') AS time_col, "
- + "lotId, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_PREVIOUS_VALUE') as status1, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_DEFAULT_VALUE') as status2, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "ORDER BY 1 "
- + "LIMIT 200";
-
- DateTimeFormatSpec dateTimeFormatter = new
DateTimeFormatSpec("1:HOURS:EPOCH");
- DateTimeGranularitySpec dateTimeGranularity = new
DateTimeGranularitySpec("1:HOURS");
-
- BrokerResponseNative gapfillBrokerResponse =
getBrokerResponseForSqlQuery(gapfillQuery);
-
- ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
- Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
- List<Object[]> gapFillRows = gapFillResultTable.getRows();
- long start = dateTimeFormatter.fromFormatToMillis("454515");
- for (int i = 0; i < 32; i += 4) {
- Long firstTimeCol = (Long) gapFillRows.get(i)[0];
- long timeStamp =
dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
- Assert.assertEquals(timeStamp, start);
- Set<String> lots = new HashSet<>();
- lots.add((String) gapFillRows.get(i)[1]);
- for (int j = 1; j < 4; j++) {
- Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
- Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
- lots.add((String) gapFillRows.get(i + j)[1]);
- }
- start += dateTimeGranularity.granularityToMillis();
- }
- }
-
- @Test
- public void toEpochMinutesBucketHoursGapfillTest() {
- String dataTimeConvertQuery = "SELECT "
- + "ToEpochMinutesBucket(eventTime, 60) AS time_col, "
- + "lotId, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "ORDER BY 1 "
- + "LIMIT 200";
-
- BrokerResponseNative dateTimeConvertBrokerResponse =
getBrokerResponseForSqlQuery(dataTimeConvertQuery);
-
- ResultTable dateTimeConvertResultTable =
dateTimeConvertBrokerResponse.getResultTable();
- Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
-
- String gapfillQuery = "SELECT "
- + "PostAggregateGapFill(ToEpochMinutesBucket(eventTime, 60),
'1:HOURS:EPOCH', "
- + "'454515', '454524', '1:HOURS') AS time_col, "
- + "lotId, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_PREVIOUS_VALUE') as status1, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_DEFAULT_VALUE') as status2, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "ORDER BY 1 "
- + "LIMIT 200";
-
- DateTimeFormatSpec dateTimeFormatter = new
DateTimeFormatSpec("1:HOURS:EPOCH");
- DateTimeGranularitySpec dateTimeGranularity = new
DateTimeGranularitySpec("1:HOURS");
-
- BrokerResponseNative gapfillBrokerResponse =
getBrokerResponseForSqlQuery(gapfillQuery);
-
- ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
- Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
- List<Object[]> gapFillRows = gapFillResultTable.getRows();
- long start = dateTimeFormatter.fromFormatToMillis("454515");
- for (int i = 0; i < 32; i += 4) {
- Long firstTimeCol = (Long) gapFillRows.get(i)[0];
- long timeStamp =
dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
- Assert.assertEquals(timeStamp, start);
- Set<String> lots = new HashSet<>();
- lots.add((String) gapFillRows.get(i)[1]);
- for (int j = 1; j < 4; j++) {
- Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
- Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
- lots.add((String) gapFillRows.get(i + j)[1]);
- }
- start += dateTimeGranularity.granularityToMillis();
- }
- }
-
- @Test
- public void dateTruncHoursGapfillTest() {
- String dataTimeConvertQuery = "SELECT "
- + "DATETRUNC('hour', eventTime, 'milliseconds') AS time_col, "
- + "lotId, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "ORDER BY 1 "
- + "LIMIT 200";
-
- BrokerResponseNative dateTimeConvertBrokerResponse =
getBrokerResponseForSqlQuery(dataTimeConvertQuery);
-
- ResultTable dateTimeConvertResultTable =
dateTimeConvertBrokerResponse.getResultTable();
- Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
-
- String gapfillQuery = "SELECT "
- + "PostAggregateGapFill(DATETRUNC('hour', eventTime, 'milliseconds'),
'1:HOURS:EPOCH', "
- + "'454515', '454524', '1:HOURS') AS time_col, "
- + "lotId, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_PREVIOUS_VALUE') as status1, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_DEFAULT_VALUE') as status2, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "ORDER BY 1 "
- + "LIMIT 200";
-
- DateTimeFormatSpec dateTimeFormatter = new
DateTimeFormatSpec("1:HOURS:EPOCH");
- DateTimeGranularitySpec dateTimeGranularity = new
DateTimeGranularitySpec("1:HOURS");
-
- BrokerResponseNative gapfillBrokerResponse =
getBrokerResponseForSqlQuery(gapfillQuery);
-
- ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
- Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
- List<Object[]> gapFillRows = gapFillResultTable.getRows();
- long start = dateTimeFormatter.fromFormatToMillis("454515");
- for (int i = 0; i < 32; i += 4) {
- Long firstTimeCol = (Long) gapFillRows.get(i)[0];
- long timeStamp =
dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
- Assert.assertEquals(timeStamp, start);
- Set<String> lots = new HashSet<>();
- lots.add((String) gapFillRows.get(i)[1]);
- for (int j = 1; j < 4; j++) {
- Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
- Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
- lots.add((String) gapFillRows.get(i + j)[1]);
- }
- start += dateTimeGranularity.granularityToMillis();
- }
- }
-
- @Test
- public void datetimeconvertGapfillTestWithoutTimeBucketOrdering() {
- try {
- String gapfillQuery = "SELECT "
- + "PostAggregateGapFill(DATETIMECONVERT(eventTime,
'1:MILLISECONDS:EPOCH', "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS',
'1:HOURS'), "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
- + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS')
AS time_col, "
- + "lotId, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_PREVIOUS_VALUE') as status1, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_DEFAULT_VALUE') as status2, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "LIMIT 200";
-
- getBrokerResponseForSqlQuery(gapfillQuery);
- Assert.fail();
- } catch (IllegalArgumentException e) {
- Assert.assertEquals(e.getMessage(), "PostAggregateGapFill does not work
if the time bucket is not ordered.");
- }
- }
-
- @Test
- public void datetimeconvertGapfillTestWithHavingClause() {
- String dataTimeConvertQueryWithUnoccupied = "SELECT "
- + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS',
'1:HOURS') AS time_col, "
- + "lotId, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status "
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "HAVING status = 'false' "
- + "ORDER BY 1 "
- + "LIMIT 200";
-
- BrokerResponseNative dateTimeConvertBrokerResponseWithUnoccupied
- = getBrokerResponseForSqlQuery(dataTimeConvertQueryWithUnoccupied);
-
- ResultTable dateTimeConvertResultTableWithUnoccupied =
dateTimeConvertBrokerResponseWithUnoccupied.getResultTable();
-
Assert.assertEquals(dateTimeConvertResultTableWithUnoccupied.getRows().size(),
20);
-
- String dataTimeConvertQueryWithOccupied = "SELECT "
- + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS',
'1:HOURS') AS time_col, "
- + "lotId, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status "
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "HAVING status = 'true' "
- + "ORDER BY 1 "
- + "LIMIT 200";
-
- BrokerResponseNative dateTimeConvertBrokerResponseWithOccupied
- = getBrokerResponseForSqlQuery(dataTimeConvertQueryWithOccupied);
-
- ResultTable dateTimeConvertResultTableWithOccupied =
dateTimeConvertBrokerResponseWithOccupied.getResultTable();
-
Assert.assertEquals(dateTimeConvertResultTableWithOccupied.getRows().size(), 4);
-
- String gapfillQueryWithOccupied = "SELECT "
- + "PostAggregateGapFill(DATETIMECONVERT(eventTime,
'1:MILLISECONDS:EPOCH', "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS',
'1:HOURS'), "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
- + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS
time_col, "
- + "lotId, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_PREVIOUS_VALUE') as status "
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "HAVING status = 'true' "
- + "ORDER BY 1 "
- + "LIMIT 7";
-
- BrokerResponseNative gapfillBrokerResponseWithOccupied =
getBrokerResponseForSqlQuery(gapfillQueryWithOccupied);
-
- ResultTable gapFillResultTableWithOccupied =
gapfillBrokerResponseWithOccupied.getResultTable();
- Assert.assertEquals(gapFillResultTableWithOccupied.getRows().size(), 7);
-
- for (Object [] row : gapFillResultTableWithOccupied.getRows()) {
- Assert.assertEquals(row[2], true);
- }
-
- String gapfillQueryWithUnoccupied = "SELECT "
- + "PostAggregateGapFill(DATETIMECONVERT(eventTime,
'1:MILLISECONDS:EPOCH', "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS',
'1:HOURS'), "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
- + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS
time_col, "
- + "lotId, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_PREVIOUS_VALUE') as status "
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "HAVING status = 'false' "
- + "ORDER BY 1 "
- + "LIMIT 24";
-
- BrokerResponseNative gapfillBrokerResponseWithUnoccupied =
getBrokerResponseForSqlQuery(gapfillQueryWithUnoccupied);
-
- ResultTable gapFillResultTableWithUnoccupied =
gapfillBrokerResponseWithUnoccupied.getResultTable();
- Assert.assertEquals(gapFillResultTableWithUnoccupied.getRows().size(), 24);
- for (Object [] row : gapFillResultTableWithUnoccupied.getRows()) {
- Assert.assertEquals(row[2], false);
- }
- }
-
-
- @Test
- public void datetimeconvertGapfillTestTimeBucketAsLastSelection() {
- String gapfillQuery = "SELECT "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_PREVIOUS_VALUE') as status1, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_DEFAULT_VALUE') as status2, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3, "
- + "lotId, PostAggregateGapFill(DATETIMECONVERT(eventTime,
'1:MILLISECONDS:EPOCH', "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS',
'1:HOURS'), "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
- + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS
time_col "
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 4, 5 "
- + "ORDER BY 5 "
- + "LIMIT 200";
-
- DateTimeFormatSpec dateTimeFormatter
- = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd
HH:mm:ss.SSS");
- DateTimeGranularitySpec dateTimeGranularity = new
DateTimeGranularitySpec("1:HOURS");
-
- BrokerResponseNative gapfillBrokerResponse =
getBrokerResponseForSqlQuery(gapfillQuery);
-
- ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
- Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
- List<Object[]> gapFillRows = gapFillResultTable.getRows();
- long start = dateTimeFormatter.fromFormatToMillis("2021-11-07
03:00:00.000");
- for (int i = 0; i < 32; i += 4) {
- String timeCol = (String) gapFillRows.get(i)[4];
- long timeStamp = dateTimeFormatter.fromFormatToMillis(timeCol);
- Assert.assertEquals(timeStamp, start);
- Set<String> lots = new HashSet<>();
- lots.add((String) gapFillRows.get(i)[3]);
- for (int j = 1; j < 4; j++) {
- Assert.assertEquals(gapFillRows.get(i)[4], gapFillRows.get(i + j)[4]);
- Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[3]));
- lots.add((String) gapFillRows.get(i + j)[3]);
- }
- start += dateTimeGranularity.granularityToMillis();
- }
- }
-
- @Test
- public void datetimeconvertGapfillWithOrderingByTwoColumnsTest() {
- String gapfillQuery = "SELECT "
- + "PostAggregateGapFill(DATETIMECONVERT(eventTime,
'1:MILLISECONDS:EPOCH', "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS',
'1:HOURS'), "
- + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
- + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS
time_col, "
- + "lotId, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_PREVIOUS_VALUE') as status1, "
- + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'),
'FILL_DEFAULT_VALUE') as status2, "
- + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
- + "FROM parkingData "
- + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
- + "GROUP BY 1, 2 "
- + "ORDER BY 1, 2 "
- + "LIMIT 200";
-
- DateTimeFormatSpec dateTimeFormatter
- = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd
HH:mm:ss.SSS");
- DateTimeGranularitySpec dateTimeGranularity = new
DateTimeGranularitySpec("1:HOURS");
-
- BrokerResponseNative gapfillBrokerResponse =
getBrokerResponseForSqlQuery(gapfillQuery);
-
- ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
- Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
- List<Object[]> gapFillRows = gapFillResultTable.getRows();
- long start = dateTimeFormatter.fromFormatToMillis("2021-11-07
03:00:00.000");
- for (int i = 0; i < 32; i += 4) {
- String firstTimeCol = (String) gapFillRows.get(i)[0];
- long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
- Assert.assertEquals(timeStamp, start);
- Set<String> lots = new HashSet<>();
- lots.add((String) gapFillRows.get(i)[1]);
- for (int j = 1; j < 4; j++) {
- Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
- Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
- lots.add((String) gapFillRows.get(i + j)[1]);
- }
- start += dateTimeGranularity.granularityToMillis();
- }
- }
-
- @AfterClass
- public void tearDown()
- throws IOException {
- _indexSegment.destroy();
- FileUtils.deleteDirectory(INDEX_DIR);
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]