This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ac664859004 additional validation for projection schema (#18524)
ac664859004 is described below
commit ac66485900448ace4ff280ab60e4098c1fc9619a
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Sep 18 13:36:55 2025 -0700
additional validation for projection schema (#18524)
---
.../data/input/impl/AggregateProjectionSpec.java | 31 ++++--
.../input/impl/AggregateProjectionSpecTest.java | 65 ++++++++++--
.../druid/segment/CursorFactoryProjectionTest.java | 22 +++-
.../apache/druid/segment/indexing/DataSchema.java | 114 +++++++++++++++------
.../druid/segment/indexing/DataSchemaTest.java | 74 +++++++++++++
5 files changed, 254 insertions(+), 52 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
index 42fc58eb15f..ff742b88f0e 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
@@ -37,6 +38,7 @@ import org.apache.druid.segment.AggregateProjectionMetadata;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTimeZone;
@@ -225,19 +227,34 @@ public class AggregateProjectionSpec
Granularity granularity = null;
// determine the granularity and time column name for the projection,
based on the finest time-like grouping column.
- for (final DimensionSchema dimension : groupingColumns) {
- ordering.add(OrderBy.ascending(dimension.getName()));
- if (ColumnHolder.TIME_COLUMN_NAME.equals(dimension.getName())) {
- timeColumnName = dimension.getName();
+ for (final DimensionSchema groupingColumn : groupingColumns) {
+ ordering.add(OrderBy.ascending(groupingColumn.getName()));
+ if (ColumnHolder.TIME_COLUMN_NAME.equals(groupingColumn.getName())) {
+ // time must be a LONG type
+ if (!groupingColumn.getColumnType().is(ValueType.LONG)) {
+ throw DruidException
+ .forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ "Encountered grouping column[%s] with incorrect type[%s].
Type must be 'long'.",
+ groupingColumn.getName(),
+ groupingColumn.getColumnType()
+ );
+ }
+ timeColumnName = groupingColumn.getName();
// already found exact __time grouping, skip assigning, granularity =
Granularities.NONE;
break;
} else {
- final VirtualColumn vc =
virtualColumns.getVirtualColumn(dimension.getName());
+ // time must be a LONG type
+ if (!groupingColumn.getColumnType().is(ValueType.LONG)) {
+ continue;
+ }
+ final VirtualColumn vc =
virtualColumns.getVirtualColumn(groupingColumn.getName());
final Granularity maybeGranularity =
Granularities.fromVirtualColumn(vc);
if (maybeGranularity == null ||
maybeGranularity.equals(Granularities.ALL)) {
// no __time in inputs or not supported, skip
} else if (Granularities.NONE.equals(maybeGranularity)) {
- timeColumnName = dimension.getName();
+ timeColumnName = groupingColumn.getName();
// already found exact __time grouping, skip assigning, granularity
= Granularities.NONE;
break;
} else if (maybeGranularity.getClass().equals(PeriodGranularity.class)
@@ -245,7 +262,7 @@ public class AggregateProjectionSpec
&& ((PeriodGranularity) maybeGranularity).getOrigin() == null
&& (granularity == null ||
maybeGranularity.isFinerThan(granularity))) {
// found a finer period granularity than the existing granularity,
or it's the first one
- timeColumnName = dimension.getName();
+ timeColumnName = groupingColumn.getName();
granularity = maybeGranularity;
}
}
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java
index 24f7d2741a4..6c33ec57a11 100644
---
a/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.EqualityFilter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -112,12 +113,18 @@ class AggregateProjectionSpecTest extends
InitializedNullHandlingTest
ColumnType.LONG,
TestExprMacroTable.INSTANCE
);
- ExpressionVirtualColumn ptEvery10Min = new ExpressionVirtualColumn(
- "ptEvery10Min",
+ ExpressionVirtualColumn every10MinLA = new ExpressionVirtualColumn(
+ "every10MinLA",
"timestamp_floor(__time, 'PT10M', null, 'America/Los_Angeles')",
ColumnType.LONG,
TestExprMacroTable.INSTANCE
);
+ ExpressionVirtualColumn every10Min = new ExpressionVirtualColumn(
+ "every10Min",
+ "timestamp_floor(__time, 'PT10M', null, null)",
+ ColumnType.LONG,
+ TestExprMacroTable.INSTANCE
+ );
ExpressionVirtualColumn every90Min = new ExpressionVirtualColumn(
"every90Min",
"timestamp_floor(__time, 'PT1H30M', null, null)",
@@ -128,11 +135,11 @@ class AggregateProjectionSpecTest extends
InitializedNullHandlingTest
Assertions.assertEquals("hourly", new AggregateProjectionSpec(
"some_projection",
null,
- VirtualColumns.create(daily, hourly, ptEvery10Min),
+ VirtualColumns.create(daily, hourly, every10MinLA),
List.of(
new LongDimensionSchema("daily"),
new LongDimensionSchema("hourly"),
- new LongDimensionSchema("ptEvery10Min")
+ new LongDimensionSchema("every10MinLA")
),
new AggregatorFactory[]{}
).toMetadataSchema().getTimeColumnName());
@@ -140,16 +147,39 @@ class AggregateProjectionSpecTest extends
InitializedNullHandlingTest
Assertions.assertNull(new AggregateProjectionSpec(
"some_projection",
null,
- VirtualColumns.create(ptEvery10Min),
- List.of(new LongDimensionSchema("ptEvery10Min")),
+ VirtualColumns.create(every10MinLA),
+ List.of(new LongDimensionSchema("every10MinLA")),
new AggregatorFactory[]{}
).toMetadataSchema().getTimeColumnName());
Assertions.assertEquals("every90Min", new AggregateProjectionSpec(
"some_projection",
null,
- VirtualColumns.create(every90Min, ptEvery10Min),
- List.of(new LongDimensionSchema("every90Min"), new
LongDimensionSchema("ptEvery10Min")),
+ VirtualColumns.create(every90Min, every10MinLA),
+ List.of(new LongDimensionSchema("every90Min"), new
LongDimensionSchema("every10MinLA")),
+ new AggregatorFactory[]{}
+ ).toMetadataSchema().getTimeColumnName());
+
+ Assertions.assertEquals("every10Min", new AggregateProjectionSpec(
+ "some_projection",
+ null,
+ VirtualColumns.create(daily, hourly, every10Min),
+ List.of(
+ new LongDimensionSchema("daily"),
+ new LongDimensionSchema("hourly"),
+ new LongDimensionSchema("every10Min")
+ ),
+ new AggregatorFactory[]{}
+ ).toMetadataSchema().getTimeColumnName());
+ Assertions.assertEquals("hourly", new AggregateProjectionSpec(
+ "some_projection",
+ null,
+ VirtualColumns.create(daily, hourly, every10Min),
+ List.of(
+ new LongDimensionSchema("daily"),
+ new LongDimensionSchema("hourly"),
+ new StringDimensionSchema("every10Min")
+ ),
new AggregatorFactory[]{}
).toMetadataSchema().getTimeColumnName());
}
@@ -217,6 +247,25 @@ class AggregateProjectionSpecTest extends
InitializedNullHandlingTest
);
}
+ @Test
+ void testInvalidTimeColumnType()
+ {
+ Throwable t = Assertions.assertThrows(
+ DruidException.class,
+ () -> new AggregateProjectionSpec(
+ "projection",
+ null,
+ VirtualColumns.EMPTY,
+ List.of(new StringDimensionSchema(ColumnHolder.TIME_COLUMN_NAME)),
+ null
+ )
+ );
+ Assertions.assertEquals(
+ "Encountered grouping column[__time] with incorrect type[STRING]. Type
must be 'long'.",
+ t.getMessage()
+ );
+ }
+
@Test
void testEqualsAndHashcode()
{
diff --git
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
index ba8506ac5d6..23798992e50 100644
---
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
@@ -130,6 +130,12 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
.add("f",
ColumnType.NESTED_DATA)
.build();
+ private static final Set<String> PROJECTION_TIME_COLUMNS = Set.of(
+ ColumnHolder.TIME_COLUMN_NAME,
+ Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME,
+ "__gran"
+ );
+
public static List<InputRow> makeRows(List<String> dimensions)
{
return Arrays.asList(
@@ -388,7 +394,7 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
.groupingColumns(
projection.getGroupingColumns()
.stream()
- .map(x -> new
AutoTypeColumnSchema(x.getName(), null))
+
.map(CursorFactoryProjectionTest::toAutoColumn)
.collect(Collectors.toList())
)
.build()
@@ -403,7 +409,7 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
.builder(projection)
.groupingColumns(projection.getGroupingColumns()
.stream()
- .map(x -> new
AutoTypeColumnSchema(x.getName(), null))
+
.map(CursorFactoryProjectionTest::toAutoColumn)
.collect(Collectors.toList()))
.build()
)
@@ -451,12 +457,12 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
List<DimensionSchema> autoDims = dimsOrdered.getDimensions()
.stream()
- .map(x -> new
AutoTypeColumnSchema(x.getName(), null))
+
.map(CursorFactoryProjectionTest::toAutoColumn)
.collect(Collectors.toList());
List<DimensionSchema> rollupAutoDims = rollupDimsOrdered.getDimensions()
.stream()
- .map(x -> new
AutoTypeColumnSchema(x.getName(), null))
+
.map(CursorFactoryProjectionTest::toAutoColumn)
.collect(Collectors.toList());
for (boolean incremental : new boolean[]{true, false}) {
@@ -2069,6 +2075,14 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
Assert.assertEquals(expectedRowCount, rowCount);
}
}
+
+ private static AutoTypeColumnSchema toAutoColumn(DimensionSchema x)
+ {
+ if (PROJECTION_TIME_COLUMNS.contains(x.getName())) {
+ return new AutoTypeColumnSchema(x.getName(), ColumnType.LONG);
+ }
+ return new AutoTypeColumnSchema(x.getName(), null);
+ }
private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec,
boolean autoSchema, boolean writeNullColumns)
{
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
index 782ed8580b7..6e9728d1b55 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
@@ -415,9 +415,88 @@ public class DataSchema
}
}
+ return getFieldsOrThrowIfErrors(fields);
+ }
+
+ /**
+ * Validates that each {@link AggregateProjectionSpec} does not have
duplicate column names in
+ * {@link AggregateProjectionSpec#groupingColumns} and {@link
AggregateProjectionSpec#aggregators} and that segment
+ * {@link Granularity} is at least as coarse as {@link
AggregateProjectionMetadata.Schema#effectiveGranularity}
+ */
+ public static void validateProjections(
+ @Nullable List<AggregateProjectionSpec> projections,
+ @Nullable Granularity segmentGranularity
+ )
+ {
+ if (projections != null) {
+ final Set<String> names =
Sets.newHashSetWithExpectedSize(projections.size());
+ for (AggregateProjectionSpec projection : projections) {
+ if (names.contains(projection.getName())) {
+ throw InvalidInput.exception("projection[%s] is already defined,
projection names must be unique", projection.getName());
+ }
+ names.add(projection.getName());
+ final AggregateProjectionMetadata.Schema schema =
projection.toMetadataSchema();
+
+ if (schema.getTimeColumnName() != null) {
+ final Granularity projectionGranularity =
schema.getEffectiveGranularity();
+ if (segmentGranularity != null) {
+ if (segmentGranularity.isFinerThan(projectionGranularity)) {
+ throw InvalidInput.exception(
+ "projection[%s] has granularity[%s] which must be finer than
or equal to segment granularity[%s]",
+ projection.getName(),
+ projectionGranularity,
+ segmentGranularity
+ );
+ }
+ }
+ }
+
+ final Map<String, Multiset<String>> fields = new TreeMap<>();
+ int position = 0;
+ for (DimensionSchema grouping : projection.getGroupingColumns()) {
+ final String field = grouping.getName();
+ if (Strings.isNullOrEmpty(field)) {
+ throw DruidException
+ .forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("Encountered grouping column with null or empty name at
position[%d]", position);
+ }
+ fields.computeIfAbsent(field, k ->
TreeMultiset.create()).add("projection[" + projection.getName() + "] grouping
column list");
+ position++;
+ }
+ for (AggregatorFactory aggregator : projection.getAggregators()) {
+ final String field = aggregator.getName();
+ if (Strings.isNullOrEmpty(field)) {
+ throw DruidException
+ .forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("Encountered aggregator with null or empty name at
position[%d]", position);
+ }
+
+ fields.computeIfAbsent(field, k ->
TreeMultiset.create()).add("projection[" + projection.getName() + "]
aggregators list");
+ position++;
+ }
+
+ getFieldsOrThrowIfErrors(fields);
+ }
+ }
+ }
+
+ /**
+ * Helper method that processes a validation result stored as a {@link Map}
of field names to {@link Multiset} of
+ * where they were defined. An error is indicated by the multi-set having
more than a single entry
+ * (such as if a field is defined as both a dimension and an aggregator). If
all fields have only a single entry, this
+ * method returns the list of output field names. If there are duplicates,
this method throws a {@link DruidException}
+ * collecting all validation errors to help indicate where a field is defined
+ *
+ * @see #computeAndValidateOutputFieldNames
+ * @see #validateProjections(List, Granularity)
+ */
+ private static Set<String> getFieldsOrThrowIfErrors(Map<String,
Multiset<String>> validatedFields)
+ {
final List<String> errors = new ArrayList<>();
- for (Map.Entry<String, Multiset<String>> fieldEntry : fields.entrySet()) {
+ for (Map.Entry<String, Multiset<String>> fieldEntry :
validatedFields.entrySet()) {
if
(fieldEntry.getValue().entrySet().stream().mapToInt(Multiset.Entry::getCount).sum()
> 1) {
errors.add(
StringUtils.format(
@@ -440,7 +519,7 @@ public class DataSchema
}
if (errors.isEmpty()) {
- return fields.keySet();
+ return validatedFields.keySet();
} else {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
@@ -448,37 +527,6 @@ public class DataSchema
}
}
- public static void validateProjections(
- @Nullable List<AggregateProjectionSpec> projections,
- @Nullable Granularity segmentGranularity
- )
- {
- if (projections != null) {
- final Set<String> names =
Sets.newHashSetWithExpectedSize(projections.size());
- for (AggregateProjectionSpec projection : projections) {
- if (names.contains(projection.getName())) {
- throw InvalidInput.exception("projection[%s] is already defined,
projection names must be unique", projection.getName());
- }
- names.add(projection.getName());
- final AggregateProjectionMetadata.Schema schema =
projection.toMetadataSchema();
- if (schema.getTimeColumnName() == null) {
- continue;
- }
- final Granularity projectionGranularity =
schema.getEffectiveGranularity();
- if (segmentGranularity != null) {
- if (segmentGranularity.isFinerThan(projectionGranularity)) {
- throw InvalidInput.exception(
- "projection[%s] has granularity[%s] which must be finer than
or equal to segment granularity[%s]",
- projection.getName(),
- projectionGranularity,
- segmentGranularity
- );
- }
- }
- }
- }
- }
-
public static class Builder
{
private String dataSource;
diff --git
a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index 64f864a26fd..d810b9c95db 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -49,6 +49,7 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.TestHelper;
@@ -944,4 +945,77 @@ class DataSchemaTest extends InitializedNullHandlingTest
t.getMessage()
);
}
+
+ @Test
+ void testInvalidProjectionDupeGroupingNames()
+ {
+ Throwable t = Assertions.assertThrows(
+ DruidException.class,
+ () -> DataSchema.builder()
+ .withDataSource("dataSource")
+ .withGranularity(
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.NONE,
+ false,
+ List.of(Intervals.of("2014/2015"))
+ )
+ )
+ .withProjections(
+ List.of(
+ AggregateProjectionSpec.builder("some
projection")
+ .virtualColumns(
+
Granularities.toVirtualColumn(Granularities.HOUR, "g")
+ )
+ .groupingColumns(new
LongDimensionSchema("g"), new StringDimensionSchema("g"))
+ .aggregators(new
CountAggregatorFactory("count"))
+ .build()
+ )
+ )
+ .build()
+ );
+
+ Assertions.assertEquals(
+ "Cannot specify a column more than once: [g] seen in projection[some
projection] grouping column list (2 occurrences)",
+ t.getMessage()
+ );
+ }
+
+ @Test
+ void testInvalidProjectionDupeAggNames()
+ {
+ Throwable t = Assertions.assertThrows(
+ DruidException.class,
+ () -> DataSchema.builder()
+ .withDataSource("dataSource")
+ .withGranularity(
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.NONE,
+ false,
+ List.of(Intervals.of("2014/2015"))
+ )
+ )
+ .withProjections(
+ List.of(
+ AggregateProjectionSpec.builder("some
projection")
+ .virtualColumns(
+
Granularities.toVirtualColumn(Granularities.HOUR, "g")
+ )
+ .groupingColumns(new
LongDimensionSchema("g"))
+ .aggregators(
+ new
LongSumAggregatorFactory("a0", "added"),
+ new
DoubleSumAggregatorFactory("a0", "added")
+ )
+ .build()
+ )
+ )
+ .build()
+ );
+
+ Assertions.assertEquals(
+ "Cannot specify a column more than once: [a0] seen in projection[some
projection] aggregators list (2 occurrences)",
+ t.getMessage()
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]