This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e8dcba123a Fix query option validation for group-by queries (#14618)
e8dcba123a is described below
commit e8dcba123aad95ba2848f88a4b8c7b4db2e1ff26
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sun Dec 8 17:35:26 2024 -0800
Fix query option validation for group-by queries (#14618)
---
.../BaseSingleStageBrokerRequestHandler.java | 16 +-
.../common/utils/config/QueryOptionsUtils.java | 160 +++++----
.../common/utils/config/QueryOptionsUtilsTest.java | 137 +++++---
.../operator/combine/GroupByCombineOperator.java | 26 +-
.../streaming/StreamingGroupByCombineOperator.java | 23 +-
.../core/plan/maker/InstancePlanMakerImplV2.java | 2 +-
.../core/query/reduce/GroupByDataTableReducer.java | 16 +-
.../aggregation/function/ArrayAggFunctionTest.java | 374 +++++++++------------
.../pinot/queries/WithOptionQueriesTest.java | 190 -----------
.../query/runtime/queries/QueryRunnerTest.java | 120 +++----
10 files changed, 438 insertions(+), 626 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index ed6c58ad0f..1364919592 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -1847,21 +1847,7 @@ public abstract class
BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
throw new IllegalStateException(
"Value for 'LIMIT' (" + limit + ") exceeds maximum allowed value of
" + queryResponseLimit);
}
-
- Map<String, String> queryOptions = pinotQuery.getQueryOptions();
- try {
- // throw errors if options is less than 1 or invalid
- Integer numReplicaGroupsToQuery =
QueryOptionsUtils.getNumReplicaGroupsToQuery(queryOptions);
- if (numReplicaGroupsToQuery != null) {
- Preconditions.checkState(numReplicaGroupsToQuery > 0,
"numReplicaGroups must be " + "positive number, got: %d",
- numReplicaGroupsToQuery);
- }
- } catch (NumberFormatException e) {
- String numReplicaGroupsToQuery =
queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
- throw new IllegalStateException(
- String.format("numReplicaGroups must be a positive number, got: %s",
numReplicaGroupsToQuery));
- }
-
+ QueryOptionsUtils.getNumReplicaGroupsToQuery(pinotQuery.getQueryOptions());
if (pinotQuery.getDataSource().getSubquery() != null) {
validateRequest(pinotQuery.getDataSource().getSubquery(),
queryResponseLimit);
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index bcc82efbf5..1ac9e6fab8 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -98,19 +98,19 @@ public class QueryOptionsUtils {
@Nullable
public static Long getTimeoutMs(Map<String, String> queryOptions) {
String timeoutMsString = queryOptions.get(QueryOptionKey.TIMEOUT_MS);
- return checkedParseLong(QueryOptionKey.TIMEOUT_MS, timeoutMsString, 1);
+ return checkedParseLongPositive(QueryOptionKey.TIMEOUT_MS,
timeoutMsString);
}
@Nullable
public static Long getMaxServerResponseSizeBytes(Map<String, String>
queryOptions) {
String responseSize =
queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES);
- return checkedParseLong(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
responseSize, 1);
+ return
checkedParseLongPositive(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
responseSize);
}
@Nullable
public static Long getMaxQueryResponseSizeBytes(Map<String, String>
queryOptions) {
String responseSize =
queryOptions.get(QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES);
- return checkedParseLong(QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES,
responseSize, 1);
+ return
checkedParseLongPositive(QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES,
responseSize);
}
public static boolean isAndScanReorderingEnabled(Map<String, String>
queryOptions) {
@@ -179,7 +179,7 @@ public class QueryOptionsUtils {
@Nullable
public static Integer getNumReplicaGroupsToQuery(Map<String, String>
queryOptions) {
String numReplicaGroupsToQuery =
queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
- return checkedParseInt(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY,
numReplicaGroupsToQuery);
+ return checkedParseIntPositive(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY,
numReplicaGroupsToQuery);
}
public static boolean isExplainPlanVerbose(Map<String, String> queryOptions)
{
@@ -201,25 +201,35 @@ public class QueryOptionsUtils {
@Nullable
public static Integer getMaxExecutionThreads(Map<String, String>
queryOptions) {
String maxExecutionThreadsString =
queryOptions.get(QueryOptionKey.MAX_EXECUTION_THREADS);
- return checkedParseInt(QueryOptionKey.MAX_EXECUTION_THREADS,
maxExecutionThreadsString);
+ return checkedParseIntPositive(QueryOptionKey.MAX_EXECUTION_THREADS,
maxExecutionThreadsString);
}
@Nullable
public static Integer getMinSegmentGroupTrimSize(Map<String, String>
queryOptions) {
String minSegmentGroupTrimSizeString =
queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
- return checkedParseInt(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE,
minSegmentGroupTrimSizeString);
+ // NOTE: Non-positive value means turning off the segment level trim
+ return uncheckedParseInt(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE,
minSegmentGroupTrimSizeString);
}
@Nullable
public static Integer getMinServerGroupTrimSize(Map<String, String>
queryOptions) {
String minServerGroupTrimSizeString =
queryOptions.get(QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE);
- return checkedParseInt(QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE,
minServerGroupTrimSizeString);
+ // NOTE: Non-positive value means turning off the segment level trim
+ return uncheckedParseInt(QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE,
minServerGroupTrimSizeString);
}
@Nullable
public static Integer getMinBrokerGroupTrimSize(Map<String, String>
queryOptions) {
String minBrokerGroupTrimSizeString =
queryOptions.get(QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE);
- return checkedParseInt(QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE,
minBrokerGroupTrimSizeString);
+ // NOTE: Non-positive value means turning off the broker level trim
+ return uncheckedParseInt(QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE,
minBrokerGroupTrimSizeString);
+ }
+
+ @Nullable
+ public static Integer getGroupTrimThreshold(Map<String, String>
queryOptions) {
+ String groupByTrimThreshold =
queryOptions.get(QueryOptionKey.GROUP_TRIM_THRESHOLD);
+ // NOTE: Non-positive value means turning off the on-the-fly trim before
all groups are added
+ return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD,
groupByTrimThreshold);
}
public static boolean isNullHandlingEnabled(Map<String, String>
queryOptions) {
@@ -246,73 +256,25 @@ public class QueryOptionsUtils {
@Nullable
public static Integer getMultiStageLeafLimit(Map<String, String>
queryOptions) {
String maxLeafLimitStr =
queryOptions.get(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT);
- return checkedParseInt(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT,
maxLeafLimitStr);
+ return checkedParseIntNonNegative(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT,
maxLeafLimitStr);
}
@Nullable
public static Integer getNumGroupsLimit(Map<String, String> queryOptions) {
String maxNumGroupLimit =
queryOptions.get(QueryOptionKey.NUM_GROUPS_LIMIT);
- return checkedParseInt(QueryOptionKey.NUM_GROUPS_LIMIT, maxNumGroupLimit);
+ return checkedParseIntPositive(QueryOptionKey.NUM_GROUPS_LIMIT,
maxNumGroupLimit);
}
@Nullable
public static Integer getMaxInitialResultHolderCapacity(Map<String, String>
queryOptions) {
- String maxInitResultCap =
queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY);
- return checkedParseInt(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY,
maxInitResultCap);
+ String maxInitialResultHolderCapacity =
queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+ return
checkedParseIntPositive(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY,
maxInitialResultHolderCapacity);
}
public static boolean
optimizeMaxInitialResultHolderCapacityEnabled(Map<String, String> queryOptions)
{
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY));
}
- @Nullable
- public static Integer getGroupTrimThreshold(Map<String, String>
queryOptions) {
- String groupByTrimThreshold =
queryOptions.get(QueryOptionKey.GROUP_TRIM_THRESHOLD);
- return checkedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD,
groupByTrimThreshold);
- }
-
- private static Long checkedParseLong(String optionName, String optionValue,
int minValue) {
- try {
- if (optionValue != null) {
- Long value = Long.parseLong(optionValue);
- if (value < minValue) {
- throw longParseException(optionName, optionValue, minValue);
- }
- return value;
- } else {
- return null;
- }
- } catch (NumberFormatException nfe) {
- throw longParseException(optionName, optionValue, minValue);
- }
- }
-
- private static IllegalArgumentException longParseException(String
optionName, String optionValue, int minValue) {
- return new IllegalArgumentException(
- String.format("%s must be a number between %d and 2^63-1, got: %s",
optionName, minValue, optionValue));
- }
-
- private static Integer checkedParseInt(String optionName, String
optionValue) {
- try {
- if (optionValue != null) {
- int value = Integer.parseInt(optionValue);
- if (value < 0) {
- throw intParseException(optionName, optionValue);
- }
- return value;
- } else {
- return null;
- }
- } catch (NumberFormatException nfe) {
- throw intParseException(optionName, optionValue);
- }
- }
-
- private static IllegalArgumentException intParseException(String optionName,
String optionValue) {
- return new IllegalArgumentException(
- String.format("%s must be a number between 0 and 2^31-1, got: %s",
optionName, optionValue));
- }
-
public static boolean shouldDropResults(Map<String, String> queryOptions) {
return
Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.DROP_RESULTS));
}
@@ -320,13 +282,13 @@ public class QueryOptionsUtils {
@Nullable
public static Integer getMaxStreamingPendingBlocks(Map<String, String>
queryOptions) {
String maxStreamingPendingBlocks =
queryOptions.get(QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS);
- return checkedParseInt(QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS,
maxStreamingPendingBlocks);
+ return
checkedParseIntPositive(QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS,
maxStreamingPendingBlocks);
}
@Nullable
public static Integer getMaxRowsInJoin(Map<String, String> queryOptions) {
String maxRowsInJoin = queryOptions.get(QueryOptionKey.MAX_ROWS_IN_JOIN);
- return checkedParseInt(QueryOptionKey.MAX_ROWS_IN_JOIN, maxRowsInJoin);
+ return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_JOIN,
maxRowsInJoin);
}
@Nullable
@@ -338,7 +300,7 @@ public class QueryOptionsUtils {
@Nullable
public static Integer getMaxRowsInWindow(Map<String, String> queryOptions) {
String maxRowsInWindow =
queryOptions.get(QueryOptionKey.MAX_ROWS_IN_WINDOW);
- return checkedParseInt(QueryOptionKey.MAX_ROWS_IN_WINDOW, maxRowsInWindow);
+ return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_WINDOW,
maxRowsInWindow);
}
@Nullable
@@ -354,4 +316,76 @@ public class QueryOptionsUtils {
public static boolean isSecondaryWorkload(Map<String, String> queryOptions) {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD));
}
+
+ @Nullable
+ private static Integer uncheckedParseInt(String optionName, @Nullable String
optionValue) {
+ if (optionValue == null) {
+ return null;
+ }
+ try {
+ return Integer.parseInt(optionValue);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException(String.format("%s must be an integer,
got: %s", optionName, optionValue));
+ }
+ }
+
+ @Nullable
+ private static Integer checkedParseIntPositive(String optionName, @Nullable
String optionValue) {
+ return checkedParseInt(optionName, optionValue, 1);
+ }
+
+ @Nullable
+ private static Integer checkedParseIntNonNegative(String optionName,
@Nullable String optionValue) {
+ return checkedParseInt(optionName, optionValue, 0);
+ }
+
+ @Nullable
+ private static Integer checkedParseInt(String optionName, @Nullable String
optionValue, int minValue) {
+ if (optionValue == null) {
+ return null;
+ }
+ int value;
+ try {
+ value = Integer.parseInt(optionValue);
+ } catch (NumberFormatException nfe) {
+ throw intParseException(optionName, optionValue, minValue);
+ }
+ if (value < minValue) {
+ throw intParseException(optionName, optionValue, minValue);
+ }
+ return value;
+ }
+
+ private static IllegalArgumentException intParseException(String optionName,
String optionValue, int minValue) {
+ return new IllegalArgumentException(
+ String.format("%s must be a number between %d and 2^31-1, got: %s",
optionName, minValue, optionValue));
+ }
+
+ @Nullable
+ private static Long checkedParseLongPositive(String optionName, @Nullable
String optionValue) {
+ return checkedParseLong(optionName, optionValue, 1);
+ }
+
+ @Nullable
+ private static Long checkedParseLong(String optionName, @Nullable String
optionValue, long minValue) {
+ if (optionValue == null) {
+ return null;
+ }
+ long value;
+ try {
+ value = Long.parseLong(optionValue);
+ } catch (NumberFormatException nfe) {
+ throw longParseException(optionName, optionValue, minValue);
+ }
+ if (value < minValue) {
+ throw longParseException(optionName, optionValue, minValue);
+ }
+ return value;
+ }
+
+ private static IllegalArgumentException longParseException(String
optionName, @Nullable String optionValue,
+ long minValue) {
+ return new IllegalArgumentException(
+ String.format("%s must be a number between %d and 2^63-1, got: %s",
optionName, minValue, optionValue));
+ }
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
index 6f0f469c5b..b2ca6573b6 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
@@ -19,110 +19,127 @@
package org.apache.pinot.common.utils.config;
-import com.google.common.collect.ImmutableMap;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.testng.Assert;
import org.testng.annotations.Test;
import static
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
public class QueryOptionsUtilsTest {
+ private static final List<String> POSITIVE_INT_KEYS =
+ List.of(NUM_REPLICA_GROUPS_TO_QUERY, MAX_EXECUTION_THREADS,
NUM_GROUPS_LIMIT, MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ MAX_STREAMING_PENDING_BLOCKS, MAX_ROWS_IN_JOIN, MAX_ROWS_IN_WINDOW);
+ private static final List<String> NON_NEGATIVE_INT_KEYS =
List.of(MULTI_STAGE_LEAF_LIMIT);
+ private static final List<String> UNBOUNDED_INT_KEYS =
+ List.of(MIN_SEGMENT_GROUP_TRIM_SIZE, MIN_SERVER_GROUP_TRIM_SIZE,
MIN_BROKER_GROUP_TRIM_SIZE,
+ GROUP_TRIM_THRESHOLD);
+ private static final List<String> INT_KEYS = new ArrayList<>() {{
+ addAll(POSITIVE_INT_KEYS);
+ addAll(NON_NEGATIVE_INT_KEYS);
+ addAll(UNBOUNDED_INT_KEYS);
+ }};
+ private static final List<String> POSITIVE_LONG_KEYS =
+ List.of(TIMEOUT_MS, MAX_SERVER_RESPONSE_SIZE_BYTES,
MAX_QUERY_RESPONSE_SIZE_BYTES);
@Test
public void shouldConvertCaseInsensitiveMapToUseCorrectValues() {
// Given:
- Map<String, String> configs = ImmutableMap.of(
- "ENABLENullHandling", "true",
- "useMULTISTAGEEngine", "false"
- );
+ Map<String, String> configs = Map.of("ENABLENullHandling", "true",
"useMULTISTAGEEngine", "false");
// When:
Map<String, String> resolved =
QueryOptionsUtils.resolveCaseInsensitiveOptions(configs);
// Then:
-
Assert.assertEquals(resolved.get(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING),
"true");
-
Assert.assertEquals(resolved.get(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE),
"false");
+ assertEquals(resolved.get(ENABLE_NULL_HANDLING), "true");
+ assertEquals(resolved.get(USE_MULTISTAGE_ENGINE), "false");
}
@Test
public void testSkipIndexesParsing() {
String skipIndexesStr = "col1=inverted,range&col2=sorted";
- Map<String, String> queryOptions =
- Map.of(CommonConstants.Broker.Request.QueryOptionKey.SKIP_INDEXES,
skipIndexesStr);
+ Map<String, String> queryOptions = Map.of(SKIP_INDEXES, skipIndexesStr);
Map<String, Set<FieldConfig.IndexType>> skipIndexes =
QueryOptionsUtils.getSkipIndexes(queryOptions);
- Assert.assertEquals(skipIndexes.get("col1"),
- Set.of(FieldConfig.IndexType.RANGE, FieldConfig.IndexType.INVERTED));
- Assert.assertEquals(skipIndexes.get("col2"),
- Set.of(FieldConfig.IndexType.SORTED));
+ assertEquals(skipIndexes.get("col1"), Set.of(FieldConfig.IndexType.RANGE,
FieldConfig.IndexType.INVERTED));
+ assertEquals(skipIndexes.get("col2"),
Set.of(FieldConfig.IndexType.SORTED));
}
@Test(expectedExceptions = RuntimeException.class)
public void testSkipIndexesParsingInvalid() {
String skipIndexesStr = "col1=inverted,range&col2";
- Map<String, String> queryOptions =
- Map.of(CommonConstants.Broker.Request.QueryOptionKey.SKIP_INDEXES,
skipIndexesStr);
- QueryOptionsUtils.getSkipIndexes(queryOptions);
+ Map<String, String> queryOptions = Map.of(SKIP_INDEXES, skipIndexesStr);
+ QueryOptionsUtils.getSkipIndexes(queryOptions);
}
@Test
public void testIntegerSettingParseSuccess() {
HashMap<String, String> map = new HashMap<>();
- for (String setting : Arrays.asList(NUM_GROUPS_LIMIT,
MAX_INITIAL_RESULT_HOLDER_CAPACITY, MULTI_STAGE_LEAF_LIMIT,
- GROUP_TRIM_THRESHOLD, MAX_STREAMING_PENDING_BLOCKS, MAX_ROWS_IN_JOIN,
MAX_STREAMING_PENDING_BLOCKS,
- MAX_EXECUTION_THREADS, MIN_SEGMENT_GROUP_TRIM_SIZE,
MIN_SERVER_GROUP_TRIM_SIZE)) {
- map.clear();
- for (Integer val : new Integer[]{null, 1, 10, Integer.MAX_VALUE}) {
- map.put(setting, val != null ? String.valueOf(val) : null);
- Assert.assertEquals(getValue(map, setting), val);
+ for (String key : INT_KEYS) {
+ for (Integer value : new Integer[]{null, 1, 10, Integer.MAX_VALUE}) {
+ map.put(key, value != null ? String.valueOf(value) : null);
+ assertEquals(getValue(map, key), value);
}
}
- for (String setting : Arrays.asList(TIMEOUT_MS,
MAX_SERVER_RESPONSE_SIZE_BYTES, MAX_QUERY_RESPONSE_SIZE_BYTES)) {
- map.clear();
- for (Long val : new Long[]{null, 1L, 10L, Long.MAX_VALUE}) {
- map.put(setting, val != null ? String.valueOf(val) : null);
- Assert.assertEquals(getValue(map, setting), val);
+ for (String key : POSITIVE_LONG_KEYS) {
+ for (Long value : new Long[]{null, 1L, 10L, Long.MAX_VALUE}) {
+ map.put(key, value != null ? String.valueOf(value) : null);
+ assertEquals(getValue(map, key), value);
}
}
}
@Test
public void testIntegerSettingParseErrors() {
- HashMap<String, String> map = new HashMap<>();
+ for (String key : POSITIVE_INT_KEYS) {
+ for (String value : new String[]{"-10000000000", "-2147483648", "-1",
"0", "2147483648", "10000000000"}) {
+ try {
+ getValue(Map.of(key, value), key);
+ fail(key);
+ } catch (IllegalArgumentException ise) {
+ assertEquals(ise.getMessage(), key + " must be a number between 1
and 2^31-1, got: " + value);
+ }
+ }
+ }
+
+ for (String key : NON_NEGATIVE_INT_KEYS) {
+ for (String value : new String[]{"-10000000000", "-2147483648", "-1",
"2147483648", "10000000000"}) {
+ try {
+ getValue(Map.of(key, value), key);
+ fail();
+ } catch (IllegalArgumentException ise) {
+ assertEquals(ise.getMessage(), key + " must be a number between 0
and 2^31-1, got: " + value);
+ }
+ }
+ }
- for (String setting : Arrays.asList(NUM_GROUPS_LIMIT,
MAX_INITIAL_RESULT_HOLDER_CAPACITY, MULTI_STAGE_LEAF_LIMIT,
- GROUP_TRIM_THRESHOLD, MAX_STREAMING_PENDING_BLOCKS, MAX_ROWS_IN_JOIN,
MAX_STREAMING_PENDING_BLOCKS,
- MAX_EXECUTION_THREADS, MIN_SEGMENT_GROUP_TRIM_SIZE,
MIN_SERVER_GROUP_TRIM_SIZE)) {
- for (String val : new String[]{"-10000000000", "-2147483648", "-1",
"2147483648", "10000000000"}) {
- map.clear();
- map.put(setting, val);
+ for (String key : UNBOUNDED_INT_KEYS) {
+ for (String value : new String[]{"-10000000000", "2147483648",
"10000000000"}) {
try {
- getValue(map, setting);
- Assert.fail();
+ getValue(Map.of(key, value), key);
+ fail();
} catch (IllegalArgumentException ise) {
- Assert.assertEquals(ise.getMessage(), setting + " must be a number
between 0 and 2^31-1, got: " + val);
+ assertEquals(ise.getMessage(), key + " must be an integer, got: " +
value);
}
}
}
- for (String setting : Arrays.asList(TIMEOUT_MS,
MAX_SERVER_RESPONSE_SIZE_BYTES, MAX_QUERY_RESPONSE_SIZE_BYTES)) {
- for (String val : new String[]{
+ for (String key : POSITIVE_LONG_KEYS) {
+ for (String value : new String[]{
"-100000000000000000000", "-9223372036854775809", "-1", "0",
"9223372036854775808", "100000000000000000000"
}) {
- map.clear();
- map.put(setting, val);
try {
- getValue(map, setting);
- Assert.fail();
+ getValue(Map.of(key, value), key);
+ fail();
} catch (IllegalArgumentException ise) {
- Assert.assertEquals(ise.getMessage(), setting + " must be a number
between 1 and 2^63-1, got: " + val);
+ assertEquals(ise.getMessage(), key + " must be a number between 1
and 2^63-1, got: " + value);
}
}
}
@@ -130,26 +147,34 @@ public class QueryOptionsUtilsTest {
private static Object getValue(Map<String, String> map, String key) {
switch (key) {
- //ints
+ // Positive ints
+ case NUM_REPLICA_GROUPS_TO_QUERY:
+ return QueryOptionsUtils.getNumReplicaGroupsToQuery(map);
+ case MAX_EXECUTION_THREADS:
+ return QueryOptionsUtils.getMaxExecutionThreads(map);
case NUM_GROUPS_LIMIT:
return QueryOptionsUtils.getNumGroupsLimit(map);
case MAX_INITIAL_RESULT_HOLDER_CAPACITY:
return QueryOptionsUtils.getMaxInitialResultHolderCapacity(map);
- case MULTI_STAGE_LEAF_LIMIT:
- return QueryOptionsUtils.getMultiStageLeafLimit(map);
- case GROUP_TRIM_THRESHOLD:
- return QueryOptionsUtils.getGroupTrimThreshold(map);
case MAX_STREAMING_PENDING_BLOCKS:
return QueryOptionsUtils.getMaxStreamingPendingBlocks(map);
case MAX_ROWS_IN_JOIN:
return QueryOptionsUtils.getMaxRowsInJoin(map);
- case MAX_EXECUTION_THREADS:
- return QueryOptionsUtils.getMaxExecutionThreads(map);
+ case MAX_ROWS_IN_WINDOW:
+ return QueryOptionsUtils.getMaxRowsInWindow(map);
+ // Non-negative ints
+ case MULTI_STAGE_LEAF_LIMIT:
+ return QueryOptionsUtils.getMultiStageLeafLimit(map);
+ // Unbounded ints
case MIN_SEGMENT_GROUP_TRIM_SIZE:
return QueryOptionsUtils.getMinSegmentGroupTrimSize(map);
case MIN_SERVER_GROUP_TRIM_SIZE:
return QueryOptionsUtils.getMinServerGroupTrimSize(map);
- //longs
+ case MIN_BROKER_GROUP_TRIM_SIZE:
+ return QueryOptionsUtils.getMinBrokerGroupTrimSize(map);
+ case GROUP_TRIM_THRESHOLD:
+ return QueryOptionsUtils.getGroupTrimThreshold(map);
+ // Positive longs
case TIMEOUT_MS:
return QueryOptionsUtils.getTimeoutMs(map);
case MAX_SERVER_RESPONSE_SIZE_BYTES:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 0b928b9a17..ecb0a56cbf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -33,6 +33,7 @@ import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
@@ -86,7 +87,8 @@ public class GroupByCombineOperator extends
BaseSingleBlockCombineOperator<Group
// without ordering. Consider ordering on group-by columns if no
ordering is specified.
_trimSize = limit;
}
- _trimThreshold = queryContext.getGroupTrimThreshold();
+ int trimThreshold = queryContext.getGroupTrimThreshold();
+ _trimThreshold = trimThreshold > 0 ? trimThreshold : Integer.MAX_VALUE;
} else {
// Server trim is disabled
_trimSize = Integer.MAX_VALUE;
@@ -135,16 +137,20 @@ public class GroupByCombineOperator extends
BaseSingleBlockCombineOperator<Group
synchronized (this) {
if (_indexedTable == null) {
DataSchema dataSchema = resultsBlock.getDataSchema();
- // NOTE: Use trimSize as resultSize on server size.
- if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
- // special case of trim threshold where it is set to max value.
- // there won't be any trimming during upsert in this case.
- // thus we can avoid the overhead of read-lock and write-lock
- // in the upsert method.
- _indexedTable = new
UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
+ // NOTE: Use trimSize as resultSize on server side.
+ if (_numTasks == 1) {
+ _indexedTable = new SimpleIndexedTable(dataSchema,
_queryContext, _trimSize, _trimSize, _trimThreshold);
} else {
- _indexedTable =
- new ConcurrentIndexedTable(dataSchema, _queryContext,
_trimSize, _trimSize, _trimThreshold);
+ if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
+ // special case of trim threshold where it is set to max
value.
+ // there won't be any trimming during upsert in this case.
+ // thus we can avoid the overhead of read-lock and write-lock
+ // in the upsert method.
+ _indexedTable = new
UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
+ } else {
+ _indexedTable =
+ new ConcurrentIndexedTable(dataSchema, _queryContext,
_trimSize, _trimSize, _trimThreshold);
+ }
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
index 943ed169a1..1e8c88e9ce 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
@@ -34,6 +34,7 @@ import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
@@ -163,16 +164,20 @@ public class StreamingGroupByCombineOperator extends
BaseStreamingCombineOperato
synchronized (this) {
if (_indexedTable == null) {
DataSchema dataSchema = resultsBlock.getDataSchema();
- // NOTE: Use trimSize as resultSize on server size.
- if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
- // special case of trim threshold where it is set to max value.
- // there won't be any trimming during upsert in this case.
- // thus we can avoid the overhead of read-lock and write-lock
- // in the upsert method.
- _indexedTable = new
UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
+ // NOTE: Use trimSize as resultSize on server side.
+ if (_numTasks == 1) {
+ _indexedTable = new SimpleIndexedTable(dataSchema,
_queryContext, _trimSize, _trimSize, _trimThreshold);
} else {
- _indexedTable =
- new ConcurrentIndexedTable(dataSchema, _queryContext,
_trimSize, _trimSize, _trimThreshold);
+ if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
+ // special case of trim threshold where it is set to max
value.
+ // there won't be any trimming during upsert in this case.
+ // thus we can avoid the overhead of read-lock and write-lock
+ // in the upsert method.
+ _indexedTable = new
UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
+ } else {
+ _indexedTable =
+ new ConcurrentIndexedTable(dataSchema, _queryContext,
_trimSize, _trimSize, _trimThreshold);
+ }
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 6912dd2586..e76a649886 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -181,7 +181,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
// Set maxExecutionThreads
int maxExecutionThreads;
Integer maxExecutionThreadsFromQuery =
QueryOptionsUtils.getMaxExecutionThreads(queryOptions);
- if (maxExecutionThreadsFromQuery != null && maxExecutionThreadsFromQuery >
0) {
+ if (maxExecutionThreadsFromQuery != null) {
// Do not allow query to override the execution threads over the
instance-level limit
if (_maxExecutionThreads > 0) {
maxExecutionThreads = Math.min(_maxExecutionThreads,
maxExecutionThreadsFromQuery);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 09b4d6a156..34395febfb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -240,11 +240,23 @@ public class GroupByDataTableReducer implements
DataTableReducer {
boolean hasFinalInput =
_queryContext.isServerReturnFinalResult() ||
_queryContext.isServerReturnFinalResultKeyUnpartitioned();
int limit = _queryContext.getLimit();
- int trimSize = GroupByUtils.getTableCapacity(limit,
reducerContext.getMinGroupTrimSize());
+ int minTrimSize = reducerContext.getMinGroupTrimSize();
+ int trimSize;
+ int trimThreshold;
+ if (minTrimSize > 0) {
+ trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize);
+ trimThreshold = reducerContext.getGroupByTrimThreshold();
+ if (trimThreshold <= 0) {
+ trimThreshold = Integer.MAX_VALUE;
+ }
+ } else {
+ // Broker trim is disabled
+ trimSize = Integer.MAX_VALUE;
+ trimThreshold = Integer.MAX_VALUE;
+ }
// NOTE: For query with HAVING clause, use trimSize as resultSize to
ensure the result accuracy.
// TODO: Resolve the HAVING clause within the IndexedTable before
returning the result
int resultSize = _queryContext.getHavingFilter() != null ? trimSize :
limit;
- int trimThreshold = reducerContext.getGroupByTrimThreshold();
IndexedTable indexedTable;
if (numReduceThreadsToUse == 1) {
indexedTable =
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ArrayAggFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ArrayAggFunctionTest.java
index 4f062d522a..17487a7d13 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ArrayAggFunctionTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ArrayAggFunctionTest.java
@@ -142,65 +142,53 @@ public class ArrayAggFunctionTest extends
AbstractAggregationFunctionTest {
@Test
void aggregationGroupBySVIntWithNullHandlingDisabled() {
new DataTypeScenario(FieldSpec.DataType.INT).getDeclaringTable(false)
- .onFirstInstance("myField",
- "1",
- "2",
- "null"
- ).andOnSecondInstance("myField",
- "1",
- "2",
- "null"
- ).whenQuery("select myField, arrayagg(myField, 'INT') from testTable
group by myField")
- .thenResultIs(new Object[]{1, new int[]{1, 1}}, new Object[]{2, new
int[]{2, 2}},
- new Object[]{Integer.MIN_VALUE, new int[]{Integer.MIN_VALUE,
Integer.MIN_VALUE}});
+ .onFirstInstance("myField", "1", "2", "null")
+ .andOnSecondInstance("myField", "1", "2", "null")
+ .whenQuery("select myField, arrayagg(myField, 'INT') from testTable
group by myField order by myField")
+ .thenResultIs(
+ new Object[]{Integer.MIN_VALUE, new int[]{Integer.MIN_VALUE,
Integer.MIN_VALUE}},
+ new Object[]{1, new int[]{1, 1}},
+ new Object[]{2, new int[]{2, 2}}
+ );
}
@Test
void aggregationGroupBySVIntWithNullHandlingEnabled() {
new DataTypeScenario(FieldSpec.DataType.INT).getDeclaringTable(true)
- .onFirstInstance("myField",
- "1",
- "2",
- "null"
- ).andOnSecondInstance("myField",
- "1",
- "2",
- "null"
- ).whenQuery("select myField, arrayagg(myField, 'INT') from testTable
group by myField")
- .thenResultIs(new Object[]{1, new int[]{1, 1}}, new Object[]{2, new
int[]{2, 2}},
- new Object[]{null, new int[0]});
+ .onFirstInstance("myField", "1", "2", "null")
+ .andOnSecondInstance("myField", "1", "2", "null")
+ .whenQuery("select myField, arrayagg(myField, 'INT') from testTable
group by myField order by myField")
+ .thenResultIs(
+ new Object[]{1, new int[]{1, 1}},
+ new Object[]{2, new int[]{2, 2}},
+ new Object[]{null, new int[0]}
+ );
}
@Test
void aggregationDistinctGroupBySVIntWithNullHandlingDisabled() {
new DataTypeScenario(FieldSpec.DataType.INT).getDeclaringTable(false)
- .onFirstInstance("myField",
- "1",
- "2",
- "null"
- ).andOnSecondInstance("myField",
- "1",
- "2",
- "null"
- ).whenQuery("select myField, arrayagg(myField, 'INT', true) from
testTable group by myField")
- .thenResultIs(new Object[]{1, new int[]{1}}, new Object[]{2, new
int[]{2}},
- new Object[]{Integer.MIN_VALUE, new int[]{Integer.MIN_VALUE}});
+ .onFirstInstance("myField", "1", "2", "null")
+ .andOnSecondInstance("myField", "1", "2", "null")
+ .whenQuery("select myField, arrayagg(myField, 'INT', true) from
testTable group by myField order by myField")
+ .thenResultIs(
+ new Object[]{Integer.MIN_VALUE, new int[]{Integer.MIN_VALUE}},
+ new Object[]{1, new int[]{1}},
+ new Object[]{2, new int[]{2}}
+ );
}
@Test
void aggregationDistinctGroupBySVIntWithNullHandlingEnabled() {
new DataTypeScenario(FieldSpec.DataType.INT).getDeclaringTable(true)
- .onFirstInstance("myField",
- "1",
- "2",
- "null"
- ).andOnSecondInstance("myField",
- "1",
- "2",
- "null"
- ).whenQuery("select myField, arrayagg(myField, 'INT', true) from
testTable group by myField")
- .thenResultIs(new Object[]{1, new int[]{1}}, new Object[]{2, new
int[]{2}},
- new Object[]{null, new int[0]});
+ .onFirstInstance("myField", "1", "2", "null")
+ .andOnSecondInstance("myField", "1", "2", "null")
+ .whenQuery("select myField, arrayagg(myField, 'INT', true) from
testTable group by myField order by myField")
+ .thenResultIs(
+ new Object[]{1, new int[]{1}},
+ new Object[]{2, new int[]{2}},
+ new Object[]{null, new int[0]}
+ );
}
@Test
@@ -257,65 +245,53 @@ public class ArrayAggFunctionTest extends
AbstractAggregationFunctionTest {
@Test
void aggregationGroupBySVLongWithNullHandlingDisabled() {
new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(false)
- .onFirstInstance("myField",
- "1",
- "2",
- "null"
- ).andOnSecondInstance("myField",
- "1",
- "2",
- "null"
- ).whenQuery("select myField, arrayagg(myField, 'LONG') from testTable
group by myField")
- .thenResultIs(new Object[]{1L, new long[]{1, 1}}, new Object[]{2L, new
long[]{2, 2}},
- new Object[]{Long.MIN_VALUE, new long[]{Long.MIN_VALUE,
Long.MIN_VALUE}});
+ .onFirstInstance("myField", "1", "2", "null")
+ .andOnSecondInstance("myField", "1", "2", "null")
+ .whenQuery("select myField, arrayagg(myField, 'LONG') from testTable
group by myField order by myField")
+ .thenResultIs(
+ new Object[]{Long.MIN_VALUE, new long[]{Long.MIN_VALUE,
Long.MIN_VALUE}},
+ new Object[]{1L, new long[]{1, 1}},
+ new Object[]{2L, new long[]{2, 2}}
+ );
}
@Test
void aggregationGroupBySVLongWithNullHandlingEnabled() {
new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(true)
- .onFirstInstance("myField",
- "1",
- "2",
- "null"
- ).andOnSecondInstance("myField",
- "1",
- "2",
- "null"
- ).whenQuery("select myField, arrayagg(myField, 'LONG') from testTable
group by myField")
- .thenResultIs(new Object[]{1L, new long[]{1, 1}}, new Object[]{2L, new
long[]{2, 2}},
- new Object[]{null, new long[0]});
+ .onFirstInstance("myField", "1", "2", "null")
+ .andOnSecondInstance("myField", "1", "2", "null")
+ .whenQuery("select myField, arrayagg(myField, 'LONG') from testTable
group by myField order by myField")
+ .thenResultIs(
+ new Object[]{1L, new long[]{1, 1}},
+ new Object[]{2L, new long[]{2, 2}},
+ new Object[]{null, new long[0]}
+ );
}
@Test
void aggregationDistinctGroupBySVLongWithNullHandlingDisabled() {
new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(false)
- .onFirstInstance("myField",
- "1",
- "2",
- "null"
- ).andOnSecondInstance("myField",
- "1",
- "2",
- "null"
- ).whenQuery("select myField, arrayagg(myField, 'LONG', true) from
testTable group by myField")
- .thenResultIs(new Object[]{1L, new long[]{1}}, new Object[]{2L, new
long[]{2}},
- new Object[]{Long.MIN_VALUE, new long[]{Long.MIN_VALUE}});
+ .onFirstInstance("myField", "1", "2", "null")
+ .andOnSecondInstance("myField", "1", "2", "null")
+ .whenQuery("select myField, arrayagg(myField, 'LONG', true) from
testTable group by myField order by myField")
+ .thenResultIs(
+ new Object[]{Long.MIN_VALUE, new long[]{Long.MIN_VALUE}},
+ new Object[]{1L, new long[]{1}},
+ new Object[]{2L, new long[]{2}}
+ );
}
@Test
void aggregationDistinctGroupBySVLongWithNullHandlingEnabled() {
new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(true)
- .onFirstInstance("myField",
- "1",
- "2",
- "null"
- ).andOnSecondInstance("myField",
- "1",
- "2",
- "null"
- ).whenQuery("select myField, arrayagg(myField, 'LONG', true) from
testTable group by myField")
- .thenResultIs(new Object[]{1L, new long[]{1}}, new Object[]{2L, new
long[]{2}},
- new Object[]{null, new long[0]});
+ .onFirstInstance("myField", "1", "2", "null")
+ .andOnSecondInstance("myField", "1", "2", "null")
+ .whenQuery("select myField, arrayagg(myField, 'LONG', true) from
testTable group by myField order by myField")
+ .thenResultIs(
+ new Object[]{1L, new long[]{1}},
+ new Object[]{2L, new long[]{2}},
+ new Object[]{null, new long[0]}
+ );
}
@Test
@@ -373,60 +349,51 @@ public class ArrayAggFunctionTest extends
AbstractAggregationFunctionTest {
@Test
void aggregationGroupBySVFloatWithNullHandlingDisabled() {
new DataTypeScenario(FieldSpec.DataType.FLOAT).getDeclaringTable(false)
- .onFirstInstance("myField",
- "null",
- "1.0",
- "2.0"
- ).andOnSecondInstance("myField",
- "null",
- "1.0",
- "2.0"
- ).whenQuery("select myField, arrayagg(myField, 'FLOAT') from testTable
group by myField")
- .thenResultIs(new Object[]{Float.NEGATIVE_INFINITY,
- new float[]{Float.NEGATIVE_INFINITY, Float.NEGATIVE_INFINITY}},
- new Object[]{1.0f, new float[]{1.0f, 1.0f}}, new Object[]{2.0f,
new float[]{2.0f, 2.0f}});
+ .onFirstInstance("myField", "null", "1.0", "2.0")
+ .andOnSecondInstance("myField", "null", "1.0", "2.0")
+ .whenQuery("select myField, arrayagg(myField, 'FLOAT') from testTable
group by myField order by myField")
+ .thenResultIs(
+ new Object[]{Float.NEGATIVE_INFINITY, new
float[]{Float.NEGATIVE_INFINITY, Float.NEGATIVE_INFINITY}},
+ new Object[]{1.0f, new float[]{1.0f, 1.0f}},
+ new Object[]{2.0f, new float[]{2.0f, 2.0f}}
+ );
}
@Test
void aggregationGroupBySVFloatWithNullHandlingEnabled() {
new DataTypeScenario(FieldSpec.DataType.FLOAT).getDeclaringTable(true)
- .onFirstInstance("myField",
- "null",
- "1.0"
- ).andOnSecondInstance("myField",
- "null",
- "1.0"
- ).whenQuery("select myField, arrayagg(myField, 'FLOAT') from testTable
group by myField")
- .thenResultIs(new Object[]{null, new float[0]}, new Object[]{1.0f, new
float[]{1.0f, 1.0f}});
+ .onFirstInstance("myField", "null", "1.0")
+ .andOnSecondInstance("myField", "null", "1.0")
+ .whenQuery("select myField, arrayagg(myField, 'FLOAT') from testTable
group by myField order by myField")
+ .thenResultIs(
+ new Object[]{1.0f, new float[]{1.0f, 1.0f}},
+ new Object[]{null, new float[0]}
+ );
}
@Test
void aggregationDistinctGroupBySVFloatWithNullHandlingDisabled() {
new DataTypeScenario(FieldSpec.DataType.FLOAT).getDeclaringTable(false)
- .onFirstInstance("myField",
- "null",
- "1.0",
- "2.0"
- ).andOnSecondInstance("myField",
- "null",
- "1.0",
- "2.0"
- ).whenQuery("select myField, arrayagg(myField, 'FLOAT', true) from
testTable group by myField")
- .thenResultIs(new Object[]{Float.NEGATIVE_INFINITY, new
float[]{Float.NEGATIVE_INFINITY}},
- new Object[]{1.0f, new float[]{1.0f}}, new Object[]{2.0f, new
float[]{2.0f}});
+ .onFirstInstance("myField", "null", "1.0", "2.0")
+ .andOnSecondInstance("myField", "null", "1.0", "2.0")
+ .whenQuery("select myField, arrayagg(myField, 'FLOAT', true) from
testTable group by myField order by myField")
+ .thenResultIs(
+ new Object[]{Float.NEGATIVE_INFINITY, new
float[]{Float.NEGATIVE_INFINITY}},
+ new Object[]{1.0f, new float[]{1.0f}},
+ new Object[]{2.0f, new float[]{2.0f}}
+ );
}
@Test
void aggregationDistinctGroupBySVFloatWithNullHandlingEnabled() {
new DataTypeScenario(FieldSpec.DataType.FLOAT).getDeclaringTable(true)
- .onFirstInstance("myField",
- "null",
- "1.0"
- ).andOnSecondInstance("myField",
- "null",
- "1.0"
- ).whenQuery("select myField, arrayagg(myField, 'FLOAT', true) from
testTable group by myField")
- .thenResultIs(new Object[]{null, new float[0]}, new Object[]{1.0f, new
float[]{1.0f}});
+ .onFirstInstance("myField", "null", "1.0")
+ .andOnSecondInstance("myField", "null", "1.0")
+ .whenQuery("select myField, arrayagg(myField, 'FLOAT', true) from
testTable group by myField order by myField")
+ .thenResultIs(
+ new Object[]{1.0f, new float[]{1.0f}},
+ new Object[]{null, new float[0]}
+ );
}
@Test
@@ -484,66 +451,53 @@ public class ArrayAggFunctionTest extends
AbstractAggregationFunctionTest {
@Test
void aggregationGroupBySVDoubleWithNullHandlingDisabled() {
new DataTypeScenario(FieldSpec.DataType.DOUBLE).getDeclaringTable(false)
- .onFirstInstance("myField",
- "null",
- "1.0",
- "2.0"
- ).andOnSecondInstance("myField",
- "null",
- "1.0",
- "2.0"
- ).whenQuery("select myField, arrayagg(myField, 'DOUBLE') from
testTable group by myField")
- .thenResultIs(new Object[]{Double.NEGATIVE_INFINITY, new
double[]{Double.NEGATIVE_INFINITY,
- Double.NEGATIVE_INFINITY}}, new Object[]{1.0, new
double[]{1.0, 1.0}},
- new Object[]{2.0, new double[]{2.0, 2.0}});
+ .onFirstInstance("myField", "null", "1.0", "2.0")
+ .andOnSecondInstance("myField", "null", "1.0", "2.0")
+ .whenQuery("select myField, arrayagg(myField, 'DOUBLE') from testTable
group by myField order by myField")
+ .thenResultIs(
+ new Object[]{Double.NEGATIVE_INFINITY, new
double[]{Double.NEGATIVE_INFINITY, Double.NEGATIVE_INFINITY}},
+ new Object[]{1.0, new double[]{1.0, 1.0}},
+ new Object[]{2.0, new double[]{2.0, 2.0}}
+ );
}
@Test
void aggregationGroupBySVDoubleWithNullHandlingEnabled() {
new DataTypeScenario(FieldSpec.DataType.DOUBLE).getDeclaringTable(true)
- .onFirstInstance("myField",
- "null",
- "1.0",
- "2.0"
- ).andOnSecondInstance("myField",
- "null",
- "1.0",
- "2.0"
- ).whenQuery("select myField, arrayagg(myField, 'DOUBLE') from
testTable group by myField")
- .thenResultIs(new Object[]{null, new double[0]}, new Object[]{1.0, new
double[]{1.0, 1.0}},
- new Object[]{2.0, new double[]{2.0, 2.0}});
+ .onFirstInstance("myField", "null", "1.0", "2.0")
+ .andOnSecondInstance("myField", "null", "1.0", "2.0")
+ .whenQuery("select myField, arrayagg(myField, 'DOUBLE') from testTable
group by myField order by myField")
+ .thenResultIs(
+ new Object[]{1.0, new double[]{1.0, 1.0}},
+ new Object[]{2.0, new double[]{2.0, 2.0}},
+ new Object[]{null, new double[0]}
+ );
}
@Test
void aggregationDistinctGroupBySVDoubleWithNullHandlingDisabled() {
new DataTypeScenario(FieldSpec.DataType.DOUBLE).getDeclaringTable(false)
- .onFirstInstance("myField",
- "null",
- "1.0",
- "2.0"
- ).andOnSecondInstance("myField",
- "null",
- "1.0",
- "2.0"
- ).whenQuery("select myField, arrayagg(myField, 'DOUBLE', true) from
testTable group by myField")
- .thenResultIs(new Object[]{Double.NEGATIVE_INFINITY, new
double[]{Double.NEGATIVE_INFINITY}},
- new Object[]{1.0, new double[]{1.0}}, new Object[]{2.0, new
double[]{2.0}});
+ .onFirstInstance("myField", "null", "1.0", "2.0")
+ .andOnSecondInstance("myField", "null", "1.0", "2.0")
+ .whenQuery("select myField, arrayagg(myField, 'DOUBLE', true) from
testTable group by myField order by myField")
+ .thenResultIs(
+ new Object[]{Double.NEGATIVE_INFINITY, new
double[]{Double.NEGATIVE_INFINITY}},
+ new Object[]{1.0, new double[]{1.0}},
+ new Object[]{2.0, new double[]{2.0}}
+ );
}
@Test
void aggregationDistinctGroupBySVDoubleWithNullHandlingEnabled() {
new DataTypeScenario(FieldSpec.DataType.DOUBLE).getDeclaringTable(true)
- .onFirstInstance("myField",
- "null",
- "1.0",
- "2.0"
- ).andOnSecondInstance("myField",
- "null",
- "1.0",
- "2.0"
- ).whenQuery("select myField, arrayagg(myField, 'DOUBLE', true) from
testTable group by myField")
- .thenResultIs(new Object[]{null, new double[0]}, new Object[]{1.0, new
double[]{1.0}},
- new Object[]{2.0, new double[]{2.0}});
+ .onFirstInstance("myField", "null", "1.0", "2.0")
+ .andOnSecondInstance("myField", "null", "1.0", "2.0")
+ .whenQuery("select myField, arrayagg(myField, 'DOUBLE', true) from
testTable group by myField order by myField")
+ .thenResultIs(
+ new Object[]{1.0, new double[]{1.0}},
+ new Object[]{2.0, new double[]{2.0}},
+ new Object[]{null, new double[0]}
+ );
}
@Test
@@ -600,65 +554,53 @@ public class ArrayAggFunctionTest extends
AbstractAggregationFunctionTest {
@Test
void aggregationGroupBySVStringWithNullHandlingDisabled() {
new DataTypeScenario(FieldSpec.DataType.STRING).getDeclaringTable(false)
- .onFirstInstance("myField",
- "a",
- "b",
- "null"
- ).andOnSecondInstance("myField",
- "a",
- "b",
- "null"
- ).whenQuery("select myField, arrayagg(myField, 'STRING') from
testTable group by myField")
- .thenResultIs(new Object[]{"a", new String[]{"a", "a"}}, new
Object[]{"b", new String[]{"b", "b"}},
- new Object[]{"null", new String[]{"null", "null"}});
+ .onFirstInstance("myField", "a", "b", "null")
+ .andOnSecondInstance("myField", "a", "b", "null")
+ .whenQuery("select myField, arrayagg(myField, 'STRING') from testTable
group by myField order by myField")
+ .thenResultIs(
+ new Object[]{"a", new String[]{"a", "a"}},
+ new Object[]{"b", new String[]{"b", "b"}},
+ new Object[]{"null", new String[]{"null", "null"}}
+ );
}
@Test
void aggregationGroupBySVStringWithNullHandlingEnabled() {
new DataTypeScenario(FieldSpec.DataType.STRING).getDeclaringTable(true)
- .onFirstInstance("myField",
- "a",
- "b",
- "null"
- ).andOnSecondInstance("myField",
- "a",
- "b",
- "null"
- ).whenQuery("select myField, arrayagg(myField, 'STRING') from
testTable group by myField")
- .thenResultIs(new Object[]{"a", new String[]{"a", "a"}}, new
Object[]{"b", new String[]{"b", "b"}},
- new Object[]{null, new String[0]});
+ .onFirstInstance("myField", "a", "b", "null")
+ .andOnSecondInstance("myField", "a", "b", "null")
+ .whenQuery("select myField, arrayagg(myField, 'STRING') from testTable
group by myField order by myField")
+ .thenResultIs(
+ new Object[]{"a", new String[]{"a", "a"}},
+ new Object[]{"b", new String[]{"b", "b"}},
+ new Object[]{null, new String[0]}
+ );
}
@Test
void aggregationDistinctGroupBySVStringWithNullHandlingDisabled() {
new DataTypeScenario(FieldSpec.DataType.STRING).getDeclaringTable(false)
- .onFirstInstance("myField",
- "a",
- "b",
- "null"
- ).andOnSecondInstance("myField",
- "a",
- "b",
- "null"
- ).whenQuery("select myField, arrayagg(myField, 'STRING', true) from
testTable group by myField")
- .thenResultIs(new Object[]{"a", new String[]{"a"}}, new Object[]{"b",
new String[]{"b"}},
- new Object[]{"null", new String[]{"null"}});
+ .onFirstInstance("myField", "a", "b", "null")
+ .andOnSecondInstance("myField", "a", "b", "null")
+ .whenQuery("select myField, arrayagg(myField, 'STRING', true) from
testTable group by myField order by myField")
+ .thenResultIs(
+ new Object[]{"a", new String[]{"a"}},
+ new Object[]{"b", new String[]{"b"}},
+ new Object[]{"null", new String[]{"null"}}
+ );
}
@Test
void aggregationDistinctGroupBySVStringWithNullHandlingEnabled() {
new DataTypeScenario(FieldSpec.DataType.STRING).getDeclaringTable(true)
- .onFirstInstance("myField",
- "a",
- "b",
- "null"
- ).andOnSecondInstance("myField",
- "a",
- "b",
- "null"
- ).whenQuery("select myField, arrayagg(myField, 'STRING', true) from
testTable group by myField")
- .thenResultIs(new Object[]{"a", new String[]{"a"}}, new Object[]{"b",
new String[]{"b"}},
- new Object[]{null, new String[0]});
+ .onFirstInstance("myField", "a", "b", "null")
+ .andOnSecondInstance("myField", "a", "b", "null")
+ .whenQuery("select myField, arrayagg(myField, 'STRING', true) from
testTable group by myField order by myField")
+ .thenResultIs(
+ new Object[]{"a", new String[]{"a"}},
+ new Object[]{"b", new String[]{"b"}},
+ new Object[]{null, new String[0]}
+ );
}
@Test
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/WithOptionQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/WithOptionQueriesTest.java
deleted file mode 100644
index 654a0add08..0000000000
---
a/pinot-core/src/test/java/org/apache/pinot/queries/WithOptionQueriesTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.queries;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import org.apache.commons.io.FileUtils;
-import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.ImmutableSegment;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES;
-import static
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES;
-import static
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS;
-
-
-public class WithOptionQueriesTest extends BaseQueriesTest {
-
- private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"WithOptionQueriesTest");
- private static final String RAW_TABLE_NAME = "testTable";
- private static final String SEGMENT_NAME = "testSegment";
-
- private static final int NUM_RECORDS = 10;
- private static final String X_COL = "x";
- private static final String Y_COL = "y";
-
- private static final Schema SCHEMA = new
Schema.SchemaBuilder().addSingleValueDimension(X_COL, FieldSpec.DataType.INT)
- .addSingleValueDimension(Y_COL, FieldSpec.DataType.DOUBLE).build();
-
- private static final TableConfig TABLE_CONFIG =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
-
- private IndexSegment _indexSegment;
- private List<IndexSegment> _indexSegments;
-
- @Override
- protected String getFilter() {
- return "";
- }
-
- @Override
- protected IndexSegment getIndexSegment() {
- return _indexSegment;
- }
-
- @Override
- protected List<IndexSegment> getIndexSegments() {
- return _indexSegments;
- }
-
- private final List<Object[]> _allRecords = new ArrayList<>();
-
- @BeforeClass
- public void setUp()
- throws Exception {
- FileUtils.deleteQuietly(INDEX_DIR);
-
- List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
- for (int i = 0; i < NUM_RECORDS; i++) {
- GenericRow record = new GenericRow();
- record.putValue(X_COL, i);
- record.putValue(Y_COL, 0.25);
- records.add(record);
- _allRecords.add(new Object[]{i, 0.25});
- }
-
- SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
- segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
- segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
- segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
-
- SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
- driver.build();
-
- ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
- _indexSegment = immutableSegment;
- _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
- }
-
- @Test
- public void testOptionParsingFailure() {
- HashMap<String, String> options = new HashMap<>();
-
- // int values
- for (String setting : Arrays.asList(QueryOptionKey.NUM_GROUPS_LIMIT,
- QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY,
QueryOptionKey.GROUP_TRIM_THRESHOLD,
- QueryOptionKey.MAX_EXECUTION_THREADS,
QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE,
- QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE,
QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE)) {
-
- options.clear();
- for (String value : new String[]{"-10000000000", "-2147483648", "-1",
"2147483648", "10000000000"}) {
- options.put(setting, value);
-
- IllegalArgumentException exception =
Assert.expectThrows(IllegalArgumentException.class, () -> {
- getBrokerResponse("SELECT x, count(*) FROM " + RAW_TABLE_NAME + "
GROUP BY x", options);
- });
- Assert.assertEquals(setting + " must be a number between 0 and 2^31-1,
got: " + value, exception.getMessage());
- }
- }
- }
-
- @Test
- public void testOptionParsingSuccess() {
- HashMap<String, String> options = new HashMap<>();
- List<Object> groupRows = new ArrayList();
- groupRows.add(new Object[]{0d, 40L}); //four times 10 records because
segment gets multiplied under the hood
-
- // int values
- for (String setting : Arrays.asList(QueryOptionKey.NUM_GROUPS_LIMIT,
- QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY,
QueryOptionKey.MULTI_STAGE_LEAF_LIMIT,
- QueryOptionKey.GROUP_TRIM_THRESHOLD,
QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS,
- QueryOptionKey.MAX_ROWS_IN_JOIN, QueryOptionKey.MAX_EXECUTION_THREADS,
- QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE,
QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE,
- QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE,
QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY)) {
-
- options.clear();
- for (String value : new String[]{"0", "1", "10000", "2147483647"}) {
-
- options.put(setting, value);
- List<Object[]> rows =
- getBrokerResponse("SELECT mod(x,1), count(*) FROM " +
RAW_TABLE_NAME + " GROUP BY mod(x,1)",
- options).getResultTable().getRows();
- if (QueryOptionKey.NUM_GROUPS_LIMIT == setting && "0".equals(value)) {
- Assert.assertEquals(0, rows.size());
- } else {
- assertEquals(rows, groupRows);
- }
- }
- }
-
- //long values
- for (String setting : Arrays.asList(TIMEOUT_MS,
MAX_SERVER_RESPONSE_SIZE_BYTES, MAX_QUERY_RESPONSE_SIZE_BYTES)) {
- options.clear();
- for (String value : new String[]{"1", "10000", "9223372036854775807"}) {
- options.put(setting, value);
- List<Object[]> rows = getBrokerResponse("SELECT * FROM " +
RAW_TABLE_NAME, options).getResultTable().getRows();
- assertEquals(rows, _allRecords);
- }
- }
- }
-
- private void assertEquals(List actual, List expected) {
- if (actual == expected) {
- return;
- }
-
- if (actual == null || expected == null || actual.size() !=
expected.size()) {
- Assert.fail("Expected \n" + expected + "\n but got \n" + actual);
- }
-
- for (int i = 0; i < actual.size(); i++) {
- Assert.assertEquals(actual.get(i), expected.get(i));
- }
- }
-}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
index 3e946d5eab..b70b39a8f5 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
@@ -39,7 +39,6 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
-import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
@@ -48,6 +47,8 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.*;
+
/**
* all special tests that doesn't fit into {@link
org.apache.pinot.query.runtime.queries.ResourceBasedQueriesTest}
@@ -69,7 +70,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
SCHEMA_BUILDER = new
Schema.SchemaBuilder().addSingleValueDimension("col1",
FieldSpec.DataType.STRING, "")
.addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
.addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH",
"1:HOURS")
- .addMetric("col3", FieldSpec.DataType.INT,
0).setSchemaName("defaultSchemaName")
+ .addMetric("col3", FieldSpec.DataType.INT, 0)
+ .setSchemaName("defaultSchemaName")
.setEnableColumnBasedNullHandling(true);
}
@@ -293,69 +295,59 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
@DataProvider(name = "testDataWithSqlExecutionExceptions")
protected Iterator<Object[]> provideTestSqlWithExecutionException() {
- //@formatter:off
- List<Object[]> testCases = new ArrayList();
- testCases.addAll(
- Arrays.asList(
- // Missing index
- new Object[]{"SELECT col1 FROM a WHERE textMatch(col1, 'f') LIMIT 10",
"without text index"},
-
- // Query hint with dynamic broadcast pipeline breaker should return
error upstream
- new Object[]{
- "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ col1
FROM a WHERE a.col1 IN "
- + "(SELECT b.col2 FROM b WHERE textMatch(col1, 'f')) AND
a.col3 > 0",
- "without text index"
- },
-
- // Timeout exception should occur with this option:
- new Object[]{"SET timeoutMs = 1; SELECT * FROM a JOIN b ON a.col1 =
b.col1 JOIN c ON a.col1 = c.col1",
- "Timeout"},
-
- // Function with incorrect argument signature should throw runtime
exception when casting string to numeric
- new Object[]{"SELECT least(a.col2, b.col3) FROM a JOIN b ON a.col1 =
b.col1", "For input string:"},
-
- // Scalar function that doesn't have a valid use should throw an
exception on the leaf stage
- // - predicate only functions:
- new Object[]{"SELECT * FROM a WHERE textMatch(col1, 'f')", "without
text index"},
- new Object[]{"SELECT * FROM a WHERE text_match(col1, 'f')", "without
text index"},
- new Object[]{"SELECT * FROM a WHERE textContains(col1, 'f')",
"supported only on native text index"},
- new Object[]{"SELECT * FROM a WHERE text_contains(col1, 'f')",
"supported only on native text index"},
-
- // - transform only functions
- new Object[]{"SELECT jsonExtractKey(col1, 'path') FROM a", "was
expecting (JSON String"},
- new Object[]{"SELECT json_extract_key(col1, 'path') FROM a", "was
expecting (JSON String"},
-
- // - PlaceholderScalarFunction registered will throw on intermediate
stage, but works on leaf stage.
- // - checked "Illegal Json Path" as col1 is not actually a json
string, but the call is correctly triggered.
- new Object[]{"SELECT CAST(jsonExtractScalar(col1, 'path', 'INT') AS
INT) FROM a", "Cannot resolve JSON path"},
- // - checked function cannot be found b/c there's no intermediate
stage impl for json_extract_scalar
- new Object[]{
- "SELECT CAST(json_extract_scalar(a.col1, b.col2, 'INT') AS INT)
FROM a JOIN b ON a.col1 = b.col1",
- "Unsupported function: JSONEXTRACTSCALAR"
- }));
- //@formatter:on
-
- // int values
- for (String setting : Arrays.asList(QueryOptionKey.NUM_GROUPS_LIMIT,
- QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY,
QueryOptionKey.MULTI_STAGE_LEAF_LIMIT,
- QueryOptionKey.GROUP_TRIM_THRESHOLD,
QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS,
- QueryOptionKey.MAX_ROWS_IN_JOIN, QueryOptionKey.MAX_EXECUTION_THREADS,
- QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE,
QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE)) {
-
- for (String val : new String[]{"-10000000000", "-2147483648", "-1",
"2147483648", "10000000000"}) {
- testCases.add(new Object[]{
- "set " + setting + " = " + val + "; SELECT col1, count(*) FROM a
GROUP BY col1",
- setting + " must be a number between 0 and 2^31-1, got: " + val
- });
- }
- }
-
- // int values; triggered for query with window clause
- for (String setting : Arrays.asList(QueryOptionKey.MAX_ROWS_IN_WINDOW)) {
- for (String val : new String[]{"-10000000000", "-2147483648", "-1",
"2147483648", "10000000000"}) {
+ List<Object[]> testCases = new ArrayList<>();
+ // Missing index
+ testCases.add(new Object[]{"SELECT col1 FROM a WHERE textMatch(col1, 'f')
LIMIT 10", "without text index"});
+
+ // Query hint with dynamic broadcast pipeline breaker should return error
upstream
+ testCases.add(new Object[]{
+ "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ col1
FROM a WHERE a.col1 IN "
+ + "(SELECT b.col2 FROM b WHERE textMatch(col1, 'f')) AND a.col3 >
0",
+ "without text index"
+ });
+
+ // Timeout exception should occur with this option:
+ testCases.add(new Object[]{
+ "SET timeoutMs = 1; SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c
ON a.col1 = c.col1",
+ "Timeout"
+ });
+
+ // Function with incorrect argument signature should throw runtime
exception when casting string to numeric
+ testCases.add(new Object[]{"SELECT least(a.col2, b.col3) FROM a JOIN b ON
a.col1 = b.col1", "For input string:"});
+
+ // Scalar function that doesn't have a valid use should throw an exception
on the leaf stage
+ // - predicate only functions:
+ testCases.add(new Object[]{"SELECT * FROM a WHERE textMatch(col1, 'f')",
"without text index"});
+ testCases.add(new Object[]{"SELECT * FROM a WHERE text_match(col1, 'f')",
"without text index"});
+ testCases.add(new Object[]{"SELECT * FROM a WHERE textContains(col1,
'f')", "supported only on native text index"});
+ testCases.add(new Object[]{
+ "SELECT * FROM a WHERE text_contains(col1, 'f')",
+ "supported only on native text index"}
+ );
+
+ // - transform only functions
+ testCases.add(new Object[]{"SELECT jsonExtractKey(col1, 'path') FROM a",
"was expecting (JSON String"});
+ testCases.add(new Object[]{"SELECT json_extract_key(col1, 'path') FROM a",
"was expecting (JSON String"});
+
+ // - PlaceholderScalarFunction registered will throw on intermediate
stage, but works on leaf stage.
+ // - checked "Illegal Json Path" as col1 is not actually a json string,
but the call is correctly triggered.
+ testCases.add(
+ new Object[]{"SELECT CAST(jsonExtractScalar(col1, 'path', 'INT') AS
INT) FROM a", "Cannot resolve JSON path"});
+ // - checked function cannot be found b/c there's no intermediate stage
impl for json_extract_scalar
+ testCases.add(new Object[]{
+ "SELECT CAST(json_extract_scalar(a.col1, b.col2, 'INT') AS INT) FROM a
JOIN b ON a.col1 = b.col1",
+ "Unsupported function: JSONEXTRACTSCALAR"
+ });
+
+ // Positive int keys (only included ones that will be parsed for this
query)
+ for (String key : new String[]{
+ MAX_EXECUTION_THREADS, NUM_GROUPS_LIMIT,
MAX_INITIAL_RESULT_HOLDER_CAPACITY, MAX_STREAMING_PENDING_BLOCKS,
+ MAX_ROWS_IN_JOIN
+ }) {
+ for (String value : new String[]{"-10000000000", "-2147483648", "-1",
"0", "2147483648", "10000000000"}) {
testCases.add(new Object[]{
- "set " + setting + " = " + val + "; SELECT ROW_NUMBER() over
(PARTITION BY col1) FROM a",
- setting + " must be a number between 0 and 2^31-1, got: " + val
+ "set " + key + " = " + value + "; SELECT col1, count(*) FROM a
GROUP BY col1",
+ key + " must be a number between 1 and 2^31-1, got: " + value
});
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]