This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d3c722cf624 fix projection json column fields handling (#18335)
d3c722cf624 is described below
commit d3c722cf624b860a5fde4bfefc90f34b7250b8df
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Aug 1 01:59:24 2025 -0700
fix projection json column fields handling (#18335)
* fix projection json column fields handling
changes:
* `AutoTypeColumnMerger` now sets fields list properly on projected json
column serializer via new `setFieldsAndOpenWriters` method which sets the
fields list (to be shared with parent) and creates and opens field writers for
the projected fields
* `NestedDataColumnSerializer` now only writes fields and fieldsinfo during
serialization if also writing the dictionaries, so a projected column will not
serialize fields and fields info in favor of sharing with the base table column
* added test for projected json column to ensure both with and without
projections work correctly
---
.../apache/druid/segment/AutoTypeColumnMerger.java | 6 +-
.../segment/nested/NestedDataColumnSerializer.java | 155 ++++++++++++---------
.../segment/nested/NestedDataColumnSupplier.java | 7 +-
.../druid/segment/CursorFactoryProjectionTest.java | 99 +++++++++++--
4 files changed, 192 insertions(+), 75 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
index 33882d7de9d..8bcc81787e5 100644
---
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
+++
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
@@ -445,12 +445,16 @@ public class AutoTypeColumnMerger implements
DimensionMergerV9
);
}
} else {
- serializer = new NestedDataColumnSerializer(
+ NestedDataColumnSerializer nestedSerializer = new
NestedDataColumnSerializer(
outputName,
indexSpec,
segmentWriteOutMedium,
closer
);
+ // need to set dictionaries before can open field writers, it is
harmless that it is set again later
+ nestedSerializer.setDictionaryIdLookup(autoParent.getIdLookup());
+ nestedSerializer.setFieldsAndOpenWriters((NestedDataColumnSerializer)
autoParent.serializer);
+ serializer = nestedSerializer;
}
serializer.setDictionaryIdLookup(autoParent.getIdLookup());
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index 3dcef1d2d85..619c739cb5a 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -183,14 +183,6 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
return globalDictionaryIdLookup;
}
- @Override
- public void setDictionaryIdLookup(DictionaryIdLookup dictionaryIdLookup)
- {
- this.globalDictionaryIdLookup = dictionaryIdLookup;
- this.writeDictionary = false;
- this.dictionarySerialized = true;
- }
-
@Override
public boolean hasNulls()
{
@@ -279,54 +271,7 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
final String fieldFileName = NESTED_FIELD_PREFIX + ctr++;
fieldsWriter.write(fieldName);
fieldsInfoWriter.write(field.getValue());
- final GlobalDictionaryEncodedFieldColumnWriter<?> writer;
- final ColumnType type = field.getValue().getSingleType();
- if (type != null) {
- if (Types.is(type, ValueType.STRING)) {
- writer = new ScalarStringFieldColumnWriter(
- name,
- fieldFileName,
- segmentWriteOutMedium,
- indexSpec,
- globalDictionaryIdLookup
- );
- } else if (Types.is(type, ValueType.LONG)) {
- writer = new ScalarLongFieldColumnWriter(
- name,
- fieldFileName,
- segmentWriteOutMedium,
- indexSpec,
- globalDictionaryIdLookup
- );
- } else if (Types.is(type, ValueType.DOUBLE)) {
- writer = new ScalarDoubleFieldColumnWriter(
- name,
- fieldFileName,
- segmentWriteOutMedium,
- indexSpec,
- globalDictionaryIdLookup
- );
- } else if (Types.is(type, ValueType.ARRAY)) {
- writer = new VariantArrayFieldColumnWriter(
- name,
- fieldFileName,
- segmentWriteOutMedium,
- indexSpec,
- globalDictionaryIdLookup
- );
- } else {
- throw DruidException.defensive("Invalid field type [%s], how did
this happen?", type);
- }
- } else {
- writer = new VariantFieldColumnWriter(
- name,
- fieldFileName,
- segmentWriteOutMedium,
- indexSpec,
- globalDictionaryIdLookup
- );
- }
- writer.open();
+ final GlobalDictionaryEncodedFieldColumnWriter<?> writer =
openFieldWriter(field, fieldFileName);
fieldWriters.put(fieldName, writer);
}
}
@@ -417,11 +362,13 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
closeForWrite();
long size = 1 + columnNameBytes.capacity();
- if (fieldsWriter != null) {
- size += fieldsWriter.getSerializedSize();
- }
- if (fieldsInfoWriter != null) {
- size += fieldsInfoWriter.getSerializedSize();
+ if (writeDictionary) {
+ if (fieldsWriter != null) {
+ size += fieldsWriter.getSerializedSize();
+ }
+ if (fieldsInfoWriter != null) {
+ size += fieldsInfoWriter.getSerializedSize();
+ }
}
// the value dictionaries, raw column, and null index are all stored in
separate files
return size;
@@ -437,12 +384,12 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
if (writeDictionary) {
Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not
sorted?!?");
}
-
writeV0Header(channel, columnNameBytes);
- fieldsWriter.writeTo(channel, smoosher);
- fieldsInfoWriter.writeTo(channel, smoosher);
+
if (writeDictionary) {
+ fieldsWriter.writeTo(channel, smoosher);
+ fieldsInfoWriter.writeTo(channel, smoosher);
if (globalDictionaryIdLookup.getStringBufferMapper() != null) {
copyFromTempSmoosh(smoosher,
globalDictionaryIdLookup.getStringBufferMapper());
} else {
@@ -486,4 +433,84 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
}
log.info("Column [%s] serialized successfully with [%d] nested columns.",
name, fields.size());
}
+
+ @Override
+ public void setDictionaryIdLookup(DictionaryIdLookup dictionaryIdLookup)
+ {
+ this.globalDictionaryIdLookup = dictionaryIdLookup;
+ this.writeDictionary = false;
+ this.dictionarySerialized = true;
+ }
+
+ public void setFieldsAndOpenWriters(NestedDataColumnSerializer serializer)
throws IOException
+ {
+ fields = serializer.fields;
+ this.fieldWriters = Maps.newHashMapWithExpectedSize(fields.size());
+ int ctr = 0;
+ for (Map.Entry<String, FieldTypeInfo.MutableTypeSet> field :
fields.entrySet()) {
+ final String fieldName = field.getKey();
+ final String fieldFileName = NESTED_FIELD_PREFIX + ctr++;
+ final GlobalDictionaryEncodedFieldColumnWriter<?> writer =
openFieldWriter(
+ field,
+ fieldFileName
+ );
+ fieldWriters.put(fieldName, writer);
+ }
+ }
+
+ private GlobalDictionaryEncodedFieldColumnWriter<?> openFieldWriter(
+ Map.Entry<String, FieldTypeInfo.MutableTypeSet> field,
+ String fieldFileName
+ ) throws IOException
+ {
+ final GlobalDictionaryEncodedFieldColumnWriter<?> writer;
+ final ColumnType type = field.getValue().getSingleType();
+ if (type != null) {
+ if (Types.is(type, ValueType.STRING)) {
+ writer = new ScalarStringFieldColumnWriter(
+ name,
+ fieldFileName,
+ segmentWriteOutMedium,
+ indexSpec,
+ globalDictionaryIdLookup
+ );
+ } else if (Types.is(type, ValueType.LONG)) {
+ writer = new ScalarLongFieldColumnWriter(
+ name,
+ fieldFileName,
+ segmentWriteOutMedium,
+ indexSpec,
+ globalDictionaryIdLookup
+ );
+ } else if (Types.is(type, ValueType.DOUBLE)) {
+ writer = new ScalarDoubleFieldColumnWriter(
+ name,
+ fieldFileName,
+ segmentWriteOutMedium,
+ indexSpec,
+ globalDictionaryIdLookup
+ );
+ } else if (Types.is(type, ValueType.ARRAY)) {
+ writer = new VariantArrayFieldColumnWriter(
+ name,
+ fieldFileName,
+ segmentWriteOutMedium,
+ indexSpec,
+ globalDictionaryIdLookup
+ );
+ } else {
+ throw DruidException.defensive("Invalid field type [%s], how did this
happen?", type);
+ }
+ } else {
+ writer = new VariantFieldColumnWriter(
+ name,
+ fieldFileName,
+ segmentWriteOutMedium,
+ indexSpec,
+ globalDictionaryIdLookup
+ );
+ }
+ writer.open();
+ return writer;
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java
index 71072dfad99..b3ae98f2eb7 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java
@@ -75,15 +75,18 @@ public class NestedDataColumnSupplier implements
Supplier<NestedCommonFormatColu
final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
final Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier;
- fields = GenericIndexed.read(bb, GenericIndexed.STRING_STRATEGY,
mapper);
- fieldInfo = FieldTypeInfo.read(bb, fields.size());
+
if (parent != null) {
+ fields = parent.fields;
+ fieldInfo = parent.fieldInfo;
stringDictionarySupplier = parent.stringDictionarySupplier;
longDictionarySupplier = parent.longDictionarySupplier;
doubleDictionarySupplier = parent.doubleDictionarySupplier;
arrayDictionarySupplier = parent.arrayDictionarySupplier;
} else {
+ fields = GenericIndexed.read(bb, GenericIndexed.STRING_STRATEGY,
mapper);
+ fieldInfo = FieldTypeInfo.read(bb, fields.size());
final ByteBuffer stringDictionaryBuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
diff --git
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
index 32f695b76b1..44ae7b10a7d 100644
---
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
@@ -74,6 +74,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.segment.virtual.NestedFieldVirtualColumn;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.DateTime;
import org.junit.AfterClass;
@@ -92,6 +93,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -107,6 +109,7 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
.add("c",
ColumnType.LONG)
.add("d",
ColumnType.DOUBLE)
.add("e",
ColumnType.FLOAT)
+ .add("f",
ColumnType.NESTED_DATA)
.build();
public static List<InputRow> makeRows(List<String> dimensions)
@@ -116,49 +119,49 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
ROW_SIGNATURE,
TIMESTAMP,
dimensions,
- Arrays.asList("a", "aa", 1L, 1.0)
+ Arrays.asList("a", "aa", 1L, 1.0, null, Map.of("x", "a", "y", 1L,
"z", 1.0))
),
new ListBasedInputRow(
ROW_SIGNATURE,
TIMESTAMP.plusMinutes(2),
dimensions,
- Arrays.asList("a", "bb", 1L, 1.1, 1.1f)
+ Arrays.asList("a", "bb", 1L, 1.1, 1.1f, Map.of("x", "a", "y", 1L,
"z", 1.1))
),
new ListBasedInputRow(
ROW_SIGNATURE,
TIMESTAMP.plusMinutes(4),
dimensions,
- Arrays.asList("a", "cc", 2L, 2.2, 2.2f)
+ Arrays.asList("a", "cc", 2L, 2.2, 2.2f, Map.of("x", "a", "y", 2L,
"z", 2.2))
),
new ListBasedInputRow(
ROW_SIGNATURE,
TIMESTAMP.plusMinutes(6),
dimensions,
- Arrays.asList("b", "aa", 3L, 3.3, 3.3f)
+ Arrays.asList("b", "aa", 3L, 3.3, 3.3f, Map.of("x", "b", "y", 3L,
"z", 3.3))
),
new ListBasedInputRow(
ROW_SIGNATURE,
TIMESTAMP.plusMinutes(8),
dimensions,
- Arrays.asList("b", "aa", 4L, 4.4, 4.4f)
+ Arrays.asList("b", "aa", 4L, 4.4, 4.4f, Map.of("x", "b", "y", 4L,
"z", 4.4))
),
new ListBasedInputRow(
ROW_SIGNATURE,
TIMESTAMP.plusMinutes(10),
dimensions,
- Arrays.asList("b", "bb", 5L, 5.5, 5.5f)
+ Arrays.asList("b", "bb", 5L, 5.5, 5.5f, Map.of("x", "b", "y", 5L,
"z", 5.5))
),
new ListBasedInputRow(
ROW_SIGNATURE,
TIMESTAMP.plusHours(1),
dimensions,
- Arrays.asList("a", "aa", 1L, 1.1, 1.1f)
+ Arrays.asList("a", "aa", 1L, 1.1, 1.1f, Map.of("x", "a", "y", 1L,
"z", 1.1))
),
new ListBasedInputRow(
ROW_SIGNATURE,
TIMESTAMP.plusHours(1).plusMinutes(1),
dimensions,
- Arrays.asList("a", "dd", 2L, 2.2, 2.2f)
+ Arrays.asList("a", "dd", 2L, 2.2, 2.2f, Map.of("x", "a", "y", 2L,
"z", 2.2))
)
);
}
@@ -282,6 +285,14 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
new AggregatorFactory[]{
new LongSumAggregatorFactory("csum", "c")
}
+ ),
+ new AggregateProjectionSpec(
+ "json",
+ VirtualColumns.EMPTY,
+ List.of(new AutoTypeColumnSchema("f", null)),
+ new AggregatorFactory[]{
+ new LongSumAggregatorFactory("_c_sum", "c")
+ }
)
);
@@ -358,6 +369,7 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
new LongDimensionSchema("c"),
new DoubleDimensionSchema("d"),
new FloatDimensionSchema("e"),
+ new AutoTypeColumnSchema("f", null),
new StringDimensionSchema("missing")
)
);
@@ -1686,6 +1698,77 @@ public class CursorFactoryProjectionTest extends
InitializedNullHandlingTest
}
}
+ @Test
+ public void testProjectionJson()
+ {
+ // test can use the single dimension projection
+ final GroupByQuery.Builder bob =
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .setVirtualColumns(
+ new NestedFieldVirtualColumn(
+ "f",
+ "$.x",
+ "v0",
+ ColumnType.STRING
+
+ )
+ )
+ .addDimension("v0")
+ .addAggregator(new LongSumAggregatorFactory("c_sum", "c"));
+
+ final GroupByQuery query = bob.build();
+ final GroupByQuery queryNoProjection =
bob.setContext(Map.of(QueryContexts.NO_PROJECTIONS, true)).build();
+
+ final CursorBuildSpec buildSpec =
GroupingEngine.makeCursorBuildSpec(query, null);
+ try (final CursorHolder cursorHolder =
projectionsCursorFactory.makeCursorHolder(buildSpec)) {
+ final Cursor cursor = cursorHolder.asCursor();
+ int rowCount = 0;
+ while (!cursor.isDone()) {
+ rowCount++;
+ cursor.advance();
+ }
+ Assert.assertEquals(6, rowCount);
+ }
+
+ final CursorBuildSpec buildSpecNoProjection =
GroupingEngine.makeCursorBuildSpec(queryNoProjection, null);
+ try (final CursorHolder cursorHolder =
projectionsCursorFactory.makeCursorHolder(buildSpecNoProjection)) {
+ final Cursor cursor = cursorHolder.asCursor();
+ int rowCount = 0;
+ while (!cursor.isDone()) {
+ rowCount++;
+ cursor.advance();
+ }
+ Assert.assertEquals(8, rowCount);
+ }
+
+ final Sequence<ResultRow> resultRows = groupingEngine.process(
+ query,
+ projectionsCursorFactory,
+ projectionsTimeBoundaryInspector,
+ nonBlockingPool,
+ null
+ );
+ final List<ResultRow> results = resultRows.toList();
+ Assert.assertEquals(2, results.size());
+ Assert.assertArrayEquals(new Object[]{"a", 7L}, results.get(0).getArray());
+ Assert.assertArrayEquals(new Object[]{"b", 12L},
results.get(1).getArray());
+
+ final Sequence<ResultRow> resultRowsNoProjection = groupingEngine.process(
+ query,
+ projectionsCursorFactory,
+ projectionsTimeBoundaryInspector,
+ nonBlockingPool,
+ null
+ );
+ final List<ResultRow> resultsNoProjection =
resultRowsNoProjection.toList();
+ Assert.assertEquals(2, resultsNoProjection.size());
+ Assert.assertArrayEquals(new Object[]{"a", 7L},
resultsNoProjection.get(0).getArray());
+ Assert.assertArrayEquals(new Object[]{"b", 12L},
resultsNoProjection.get(1).getArray());
+ }
+
private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec,
boolean autoSchema, boolean writeNullColumns)
{
File tmp = FileUtils.createTempDir();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]