This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f0c6bba73c [timeseries] Response Size Limit, Metrics and Series Limit
(#14501)
f0c6bba73c is described below
commit f0c6bba73c0e4de43115d69ffc4ffdc010848fee
Author: Ankit Sultana <[email protected]>
AuthorDate: Fri Nov 22 13:45:41 2024 -0600
[timeseries] Response Size Limit, Metrics and Series Limit (#14501)
---
.../requesthandler/TimeSeriesRequestHandler.java | 38 +++++---
.../apache/pinot/common/metrics/BrokerMeter.java | 8 ++
.../blocks/results/TimeSeriesResultsBlock.java | 14 ++-
.../merger/TimeSeriesAggResultsBlockMerger.java | 13 +++
.../timeseries/TimeSeriesAggregationOperator.java | 25 ++++-
.../apache/pinot/core/plan/TimeSeriesPlanNode.java | 3 +-
.../TimeSeriesAggregationOperatorTest.java | 103 ++++++++++++++++++++-
.../runtime/timeseries/LeafTimeSeriesOperator.java | 4 +
.../timeseries/TimeSeriesDispatchClient.java | 7 +-
.../spi/series/SimpleTimeSeriesBuilderFactory.java | 19 ++++
.../tsdb/spi/series/TimeSeriesBuilderFactory.java | 14 +++
11 files changed, 221 insertions(+), 27 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 1773fca957..52cf63f562 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import org.apache.commons.lang3.StringUtils;
@@ -39,6 +40,8 @@ import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerTimer;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.utils.HumanReadableDuration;
@@ -90,19 +93,32 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
@Override
public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang,
String rawQueryParamString,
RequestContext requestContext) {
- requestContext.setBrokerId(_brokerId);
- requestContext.setRequestId(_requestIdGenerator.get());
- RangeTimeSeriesRequest timeSeriesRequest = null;
+ PinotBrokerTimeSeriesResponse timeSeriesResponse = null;
+ long queryStartTime = System.currentTimeMillis();
try {
- timeSeriesRequest = buildRangeTimeSeriesRequest(lang,
rawQueryParamString);
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES, 1);
+ requestContext.setBrokerId(_brokerId);
+ requestContext.setRequestId(_requestIdGenerator.get());
+ RangeTimeSeriesRequest timeSeriesRequest = null;
+ try {
+ timeSeriesRequest = buildRangeTimeSeriesRequest(lang,
rawQueryParamString);
+ } catch (URISyntaxException e) {
+ return PinotBrokerTimeSeriesResponse.newErrorResponse("BAD_REQUEST",
"Error building RangeTimeSeriesRequest");
+ }
+ TimeSeriesLogicalPlanResult logicalPlanResult =
_queryEnvironment.buildLogicalPlan(timeSeriesRequest);
+ TimeSeriesDispatchablePlan dispatchablePlan =
+ _queryEnvironment.buildPhysicalPlan(timeSeriesRequest,
requestContext, logicalPlanResult);
+ timeSeriesResponse = _queryDispatcher.submitAndGet(requestContext,
dispatchablePlan,
+ timeSeriesRequest.getTimeout().toMillis(), new HashMap<>());
+ return timeSeriesResponse;
+ } finally {
+ _brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS,
System.currentTimeMillis() - queryStartTime,
+ TimeUnit.MILLISECONDS);
+ if (timeSeriesResponse == null
+ ||
timeSeriesResponse.getStatus().equals(PinotBrokerTimeSeriesResponse.ERROR_STATUS))
{
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED,
1);
+ }
}
- TimeSeriesLogicalPlanResult logicalPlanResult =
_queryEnvironment.buildLogicalPlan(timeSeriesRequest);
- TimeSeriesDispatchablePlan dispatchablePlan =
_queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext,
- logicalPlanResult);
- return _queryDispatcher.submitAndGet(requestContext, dispatchablePlan,
timeSeriesRequest.getTimeout().toMillis(),
- new HashMap<>());
}
@Override
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 086d5ffd9e..c36b4ab504 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -60,6 +60,14 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
* Number of single-stage queries executed that would not have successfully
run on the multi-stage query engine as is.
*/
SINGLE_STAGE_QUERIES_INVALID_MULTI_STAGE("queries", true),
+ /**
+ * Number of time-series queries. This metric is not grouped on the table
name.
+ */
+ TIME_SERIES_GLOBAL_QUERIES("queries", true),
+ /**
+ * Number of time-series queries that failed. This metric is not grouped on
the table name.
+ */
+ TIME_SERIES_GLOBAL_QUERIES_FAILED("queries", true),
// These metrics track the exceptions caught during query execution in
broker side.
// Query rejected by Jersey thread pool executor
QUERY_REJECTED_EXCEPTIONS("exceptions", true),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java
index 30a66bd624..f8e7fac944 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java
@@ -27,6 +27,7 @@ import
org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
+// TODO(timeseries): Implement unsupported functions when merging with MSE.
public class TimeSeriesResultsBlock extends BaseResultsBlock {
private final TimeSeriesBuilderBlock _timeSeriesBuilderBlock;
@@ -36,34 +37,31 @@ public class TimeSeriesResultsBlock extends
BaseResultsBlock {
@Override
public int getNumRows() {
- // TODO: Unused right now.
- return 0;
+ return _timeSeriesBuilderBlock.getSeriesBuilderMap().size();
}
@Nullable
@Override
public QueryContext getQueryContext() {
- // TODO: Unused right now.
- return null;
+ throw new UnsupportedOperationException("Time series results block does
not support getting QueryContext yet");
}
@Nullable
@Override
public DataSchema getDataSchema() {
- // TODO: Unused right now.
- return null;
+ throw new UnsupportedOperationException("Time series results block does
not support getting DataSchema yet");
}
@Nullable
@Override
public List<Object[]> getRows() {
- return null;
+ throw new UnsupportedOperationException("Time series results block does
not support getRows yet");
}
@Override
public DataTable getDataTable()
throws IOException {
- return null;
+ throw new UnsupportedOperationException("Time series results block does
not support returning DataTable");
}
public TimeSeriesBuilderBlock getTimeSeriesBuilderBlock() {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java
index 17f22a1737..428cfde555 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.operator.combine.merger;
+import com.google.common.base.Preconditions;
import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
import org.apache.pinot.tsdb.spi.AggInfo;
@@ -28,10 +29,14 @@ import
org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
public class TimeSeriesAggResultsBlockMerger implements
ResultsBlockMerger<TimeSeriesResultsBlock> {
private final TimeSeriesBuilderFactory _seriesBuilderFactory;
private final AggInfo _aggInfo;
+ private final int _maxSeriesLimit;
+ private final long _maxDataPointsLimit;
public TimeSeriesAggResultsBlockMerger(TimeSeriesBuilderFactory
seriesBuilderFactory, AggInfo aggInfo) {
_seriesBuilderFactory = seriesBuilderFactory;
_aggInfo = aggInfo;
+ _maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit();
+ _maxDataPointsLimit =
_seriesBuilderFactory.getMaxDataPointsPerServerLimit();
}
@Override
@@ -44,6 +49,14 @@ public class TimeSeriesAggResultsBlockMerger implements
ResultsBlockMerger<TimeS
BaseTimeSeriesBuilder newTimeSeriesToMerge = entry.getValue();
if (currentTimeSeriesBuilder == null) {
currentTimeSeriesBlock.getSeriesBuilderMap().put(seriesHash,
newTimeSeriesToMerge);
+ final long currentUniqueSeries =
currentTimeSeriesBlock.getSeriesBuilderMap().size();
+ final long numBuckets =
currentTimeSeriesBlock.getTimeBuckets().getNumBuckets();
+ Preconditions.checkState(currentUniqueSeries * numBuckets <=
_maxDataPointsLimit,
+ "Max data points limit reached in combine operator. Limit: %s.
Current count: %s",
+ _maxDataPointsLimit, currentUniqueSeries * numBuckets);
+ Preconditions.checkState(currentUniqueSeries <= _maxSeriesLimit,
+ "Max series limit reached in combine operator. Limit: %s. Current
Size: %s",
+ _maxSeriesLimit,
currentTimeSeriesBlock.getSeriesBuilderMap().size());
} else {
currentTimeSeriesBuilder.mergeAlignedSeriesBuilder(newTimeSeriesToMerge);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
index fd895a4607..a39c996f4f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.timeseries;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.HashMap;
@@ -37,6 +38,7 @@ import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
+import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
@@ -59,6 +61,10 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
private final BaseProjectOperator<? extends ValueBlock> _projectOperator;
private final TimeBuckets _timeBuckets;
private final TimeSeriesBuilderFactory _seriesBuilderFactory;
+ private final int _maxSeriesLimit;
+ private final long _maxDataPointsLimit;
+ private final long _numTotalDocs;
+ private long _numDocsScanned = 0;
public TimeSeriesAggregationOperator(
String timeColumn,
@@ -69,7 +75,8 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
List<String> groupByExpressions,
TimeBuckets timeBuckets,
BaseProjectOperator<? extends ValueBlock> projectOperator,
- TimeSeriesBuilderFactory seriesBuilderFactory) {
+ TimeSeriesBuilderFactory seriesBuilderFactory,
+ SegmentMetadata segmentMetadata) {
_timeColumn = timeColumn;
_storedTimeUnit = timeUnit;
_timeOffset = timeOffsetSeconds == null ? 0L :
timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds));
@@ -79,6 +86,9 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
_projectOperator = projectOperator;
_timeBuckets = timeBuckets;
_seriesBuilderFactory = seriesBuilderFactory;
+ _maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit();
+ _maxDataPointsLimit =
_seriesBuilderFactory.getMaxDataPointsPerServerLimit();
+ _numTotalDocs = segmentMetadata.getTotalDocs();
}
@Override
@@ -87,6 +97,7 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024);
while ((valueBlock = _projectOperator.nextBlock()) != null) {
int numDocs = valueBlock.getNumDocs();
+ _numDocsScanned += numDocs;
// TODO: This is quite unoptimized and allocates liberally
BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn);
long[] timeValues = blockValSet.getLongValuesSV();
@@ -129,6 +140,12 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
throw new IllegalStateException(
"Don't yet support value expression of type: " +
valueExpressionBlockValSet.getValueType());
}
+ Preconditions.checkState(seriesBuilderMap.size() * (long)
_timeBuckets.getNumBuckets() <= _maxDataPointsLimit,
+ "Exceeded max data point limit per server. Limit: %s. Data points in
current segment so far: %s",
+ _maxDataPointsLimit, seriesBuilderMap.size() *
_timeBuckets.getNumBuckets());
+ Preconditions.checkState(seriesBuilderMap.size() <= _maxSeriesLimit,
+ "Exceeded max unique series limit per server. Limit: %s. Series in
current segment so far: %s",
+ _maxSeriesLimit, seriesBuilderMap.size());
}
return new TimeSeriesResultsBlock(new TimeSeriesBuilderBlock(_timeBuckets,
seriesBuilderMap));
}
@@ -147,8 +164,10 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
@Override
public ExecutionStatistics getExecutionStatistics() {
- // TODO: Implement this.
- return new ExecutionStatistics(0, 0, 0, 0);
+ long numEntriesScannedInFilter =
_projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+ long numEntriesScannedPostFilter = _numDocsScanned *
_projectOperator.getNumColumnsProjected();
+ return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter,
+ _numTotalDocs);
}
@VisibleForTesting
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
index 22e3d7b912..dae0479ebb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
@@ -66,7 +66,8 @@ public class TimeSeriesPlanNode implements PlanNode {
getGroupByColumns(),
_timeSeriesContext.getTimeBuckets(),
projectionOperator,
- _seriesBuilderFactory);
+ _seriesBuilderFactory,
+ _segmentContext.getIndexSegment().getSegmentMetadata());
}
private List<ExpressionContext> getProjectPlanNodeExpressions() {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
index 888d0658e9..eea81a4ba1 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
@@ -20,11 +20,19 @@ package org.apache.pinot.core.operator.timeseries;
import java.time.Duration;
import java.util.Collections;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.BaseProjectOperator;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
+import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
import org.testng.annotations.Test;
@@ -33,9 +41,49 @@ import static org.testng.Assert.*;
public class TimeSeriesAggregationOperatorTest {
+ private static final Random RANDOM = new Random();
private static final String DUMMY_TIME_COLUMN = "someTimeColumn";
- private static final AggInfo AGG_INFO = new AggInfo("",
Collections.emptyMap());
+ private static final String GROUP_BY_COLUMN = "city";
+ private static final AggInfo AGG_INFO = new AggInfo("SUM",
Collections.emptyMap());
private static final ExpressionContext VALUE_EXPRESSION =
ExpressionContext.forIdentifier("someValueColumn");
+ private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000,
Duration.ofSeconds(100), 10);
+ private static final int NUM_DOCS_IN_DUMMY_DATA = 1000;
+
+ @Test
+ public void testTimeSeriesAggregationOperator() {
+ TimeSeriesAggregationOperator timeSeriesAggregationOperator =
buildOperatorWithSampleData(
+ new SimpleTimeSeriesBuilderFactory());
+ TimeSeriesResultsBlock resultsBlock =
timeSeriesAggregationOperator.getNextBlock();
+ // Expect 2 series: Chicago and San Francisco
+ assertNotNull(resultsBlock);
+ assertEquals(2, resultsBlock.getNumRows());
+ }
+
+ @Test
+ public void testTimeSeriesAggregationOperatorWhenSeriesLimit() {
+ // Since we test with 2 series, use 1 as the limit.
+ TimeSeriesAggregationOperator timeSeriesAggregationOperator =
buildOperatorWithSampleData(
+ new SimpleTimeSeriesBuilderFactory(1, 100_000_000L));
+ try {
+ timeSeriesAggregationOperator.getNextBlock();
+ fail();
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Limit: 1. Series in current"));
+ }
+ }
+
+ @Test
+ public void testTimeSeriesAggregationOperatorWhenDataPoints() {
+ // Since we test with 2 series, use 1 as the limit.
+ TimeSeriesAggregationOperator timeSeriesAggregationOperator =
buildOperatorWithSampleData(
+ new SimpleTimeSeriesBuilderFactory(1000, 11));
+ try {
+ timeSeriesAggregationOperator.getNextBlock();
+ fail();
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Limit: 11. Data points in current"));
+ }
+ }
@Test
public void testGetTimeValueIndexForSeconds() {
@@ -88,9 +136,60 @@ public class TimeSeriesAggregationOperatorTest {
assertTrue(indexes[0] < 0 || indexes[0] >= numTimeBuckets, "Expected time
index to spill beyond valid range");
}
+ private TimeSeriesAggregationOperator
buildOperatorWithSampleData(TimeSeriesBuilderFactory seriesBuilderFactory) {
+ BaseProjectOperator<ValueBlock> mockProjectOperator =
mock(BaseProjectOperator.class);
+ ValueBlock valueBlock = buildValueBlockForProjectOperator();
+ when(mockProjectOperator.nextBlock()).thenReturn(valueBlock, (ValueBlock)
null);
+ return new TimeSeriesAggregationOperator(DUMMY_TIME_COLUMN,
+ TimeUnit.SECONDS, 0L, AGG_INFO, VALUE_EXPRESSION,
Collections.singletonList(GROUP_BY_COLUMN),
+ TIME_BUCKETS, mockProjectOperator, seriesBuilderFactory,
mock(SegmentMetadata.class));
+ }
+
+ private static ValueBlock buildValueBlockForProjectOperator() {
+ ValueBlock valueBlock = mock(ValueBlock.class);
+ doReturn(NUM_DOCS_IN_DUMMY_DATA).when(valueBlock).getNumDocs();
+
doReturn(buildBlockValSetForTime()).when(valueBlock).getBlockValueSet(DUMMY_TIME_COLUMN);
+
doReturn(buildBlockValSetForValues()).when(valueBlock).getBlockValueSet(VALUE_EXPRESSION);
+
doReturn(buildBlockValSetForGroupByColumns()).when(valueBlock).getBlockValueSet(GROUP_BY_COLUMN);
+ return valueBlock;
+ }
+
+ private static BlockValSet buildBlockValSetForGroupByColumns() {
+ BlockValSet blockValSet = mock(BlockValSet.class);
+ String[] stringArray = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+ for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) {
+ stringArray[index] = RANDOM.nextBoolean() ? "Chicago" : "San Francisco";
+ }
+ doReturn(stringArray).when(blockValSet).getStringValuesSV();
+ doReturn(FieldSpec.DataType.STRING).when(blockValSet).getValueType();
+ return blockValSet;
+ }
+
+ private static BlockValSet buildBlockValSetForValues() {
+ BlockValSet blockValSet = mock(BlockValSet.class);
+ long[] valuesArray = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+ for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) {
+ valuesArray[index] = index;
+ }
+ doReturn(valuesArray).when(blockValSet).getLongValuesSV();
+ doReturn(FieldSpec.DataType.LONG).when(blockValSet).getValueType();
+ return blockValSet;
+ }
+
+ private static BlockValSet buildBlockValSetForTime() {
+ BlockValSet blockValSet = mock(BlockValSet.class);
+ long[] timeValueArray = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+ for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) {
+ timeValueArray[index] = 901 + RANDOM.nextInt(1000);
+ }
+ doReturn(timeValueArray).when(blockValSet).getLongValuesSV();
+ return blockValSet;
+ }
+
private TimeSeriesAggregationOperator buildOperator(TimeUnit storedTimeUnit,
TimeBuckets timeBuckets) {
return new TimeSeriesAggregationOperator(
DUMMY_TIME_COLUMN, storedTimeUnit, 0L, AGG_INFO, VALUE_EXPRESSION,
Collections.emptyList(),
- timeBuckets, mock(BaseProjectOperator.class),
mock(TimeSeriesBuilderFactory.class));
+ timeBuckets, mock(BaseProjectOperator.class),
mock(TimeSeriesBuilderFactory.class),
+ mock(SegmentMetadata.class));
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
index ca119ebf1d..8b577105d3 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java
@@ -25,6 +25,7 @@ import org.apache.commons.collections.MapUtils;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.logger.ServerQueryLogger;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
@@ -34,6 +35,7 @@ public class LeafTimeSeriesOperator extends
BaseTimeSeriesOperator {
private final ServerQueryRequest _request;
private final QueryExecutor _queryExecutor;
private final ExecutorService _executorService;
+ private final ServerQueryLogger _queryLogger;
public LeafTimeSeriesOperator(ServerQueryRequest serverQueryRequest,
QueryExecutor queryExecutor,
ExecutorService executorService) {
@@ -41,6 +43,7 @@ public class LeafTimeSeriesOperator extends
BaseTimeSeriesOperator {
_request = serverQueryRequest;
_queryExecutor = queryExecutor;
_executorService = executorService;
+ _queryLogger = ServerQueryLogger.getInstance();
}
@Override
@@ -48,6 +51,7 @@ public class LeafTimeSeriesOperator extends
BaseTimeSeriesOperator {
Preconditions.checkNotNull(_queryExecutor, "Leaf time series operator has
not been initialized");
InstanceResponseBlock instanceResponseBlock =
_queryExecutor.execute(_request, _executorService);
assert instanceResponseBlock.getResultsBlock() instanceof
TimeSeriesResultsBlock;
+ _queryLogger.logQuery(_request, instanceResponseBlock, "TimeSeries");
if (MapUtils.isNotEmpty(instanceResponseBlock.getExceptions())) {
// TODO: Return error in the TimeSeriesBlock instead?
String oneException =
instanceResponseBlock.getExceptions().values().iterator().next();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java
index a68e636b96..df77344665 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java
@@ -29,16 +29,19 @@ import org.apache.pinot.query.routing.QueryServerInstance;
/**
* Dispatch client used to dispatch a runnable plan to the server.
- * TODO: This shouldn't exist and we should re-use DispatchClient. TBD as part
of multi-stage
+ * TODO(timeseries): This shouldn't exist and we should re-use DispatchClient.
TBD as part of multi-stage
* engine integration.
*/
public class TimeSeriesDispatchClient {
+ // TODO(timeseries): Note that time-series engine at present uses
QueryServer for data transfer from server to broker.
+ // This will be fixed as we integrate with MSE.
+ private static final int INBOUND_SIZE_LIMIT = 256 * 1024 * 1024;
private final ManagedChannel _channel;
private final PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub;
public TimeSeriesDispatchClient(String host, int port) {
_channel = ManagedChannelBuilder.forAddress(host,
port).usePlaintext().build();
- _dispatchStub = PinotQueryWorkerGrpc.newStub(_channel);
+ _dispatchStub =
PinotQueryWorkerGrpc.newStub(_channel).withMaxInboundMessageSize(INBOUND_SIZE_LIMIT);
}
public ManagedChannel getChannel() {
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java
index 9ed40954e5..98882d19a7 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java
@@ -28,8 +28,17 @@ import
org.apache.pinot.tsdb.spi.series.builders.SummingTimeSeriesBuilder;
public class SimpleTimeSeriesBuilderFactory extends TimeSeriesBuilderFactory {
+ private final int _maxSeriesLimit;
+ private final long _maxDataPointsLimit;
+
public SimpleTimeSeriesBuilderFactory() {
+ this(DEFAULT_MAX_UNIQUE_SERIES_PER_SERVER_LIMIT,
DEFAULT_MAX_DATA_POINTS_PER_SERVER_LIMIT);
+ }
+
+ public SimpleTimeSeriesBuilderFactory(int maxSeriesLimit, long
maxDataPointsLimit) {
super();
+ _maxSeriesLimit = maxSeriesLimit;
+ _maxDataPointsLimit = maxDataPointsLimit;
}
@Override
@@ -50,4 +59,14 @@ public class SimpleTimeSeriesBuilderFactory extends
TimeSeriesBuilderFactory {
@Override
public void init(PinotConfiguration pinotConfiguration) {
}
+
+ @Override
+ public int getMaxUniqueSeriesPerServerLimit() {
+ return _maxSeriesLimit;
+ }
+
+ @Override
+ public long getMaxDataPointsPerServerLimit() {
+ return _maxDataPointsLimit;
+ }
}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
index 088f9b3c85..c48307efea 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
@@ -25,6 +25,12 @@ import org.apache.pinot.tsdb.spi.TimeBuckets;
public abstract class TimeSeriesBuilderFactory {
+ protected static final int DEFAULT_MAX_UNIQUE_SERIES_PER_SERVER_LIMIT =
100_000;
+ /**
+ * Default limit for the total number of values across all series.
+ */
+ protected static final long DEFAULT_MAX_DATA_POINTS_PER_SERVER_LIMIT =
100_000_000;
+
public abstract BaseTimeSeriesBuilder newTimeSeriesBuilder(
AggInfo aggInfo,
String id,
@@ -32,5 +38,13 @@ public abstract class TimeSeriesBuilderFactory {
List<String> tagNames,
Object[] tagValues);
+ public int getMaxUniqueSeriesPerServerLimit() {
+ return DEFAULT_MAX_UNIQUE_SERIES_PER_SERVER_LIMIT;
+ }
+
+ public long getMaxDataPointsPerServerLimit() {
+ return DEFAULT_MAX_DATA_POINTS_PER_SERVER_LIMIT;
+ }
+
public abstract void init(PinotConfiguration pinotConfiguration);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]