This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d3c722cf624 fix projection json column fields handling (#18335)
d3c722cf624 is described below

commit d3c722cf624b860a5fde4bfefc90f34b7250b8df
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Aug 1 01:59:24 2025 -0700

    fix projection json column fields handling (#18335)
    
    * fix projection json column fields handling
    
    changes:
    * `AutoTypeColumnMerger` now sets fields list properly on projected json 
column serializer via new `setFieldsAndOpenWriters` method which sets the 
fields list (to be shared with parent) and creates and opens field writers for 
the projected fields
    * `NestedDataColumnSerializer` now only writes fields and fieldsinfo during 
serialization if also writing the dictionaries, so a projected column will not 
serialize fields and fields info in favor of sharing with the base table column
    * added test for projected json column to ensure both with and without 
projections work correctly
---
 .../apache/druid/segment/AutoTypeColumnMerger.java |   6 +-
 .../segment/nested/NestedDataColumnSerializer.java | 155 ++++++++++++---------
 .../segment/nested/NestedDataColumnSupplier.java   |   7 +-
 .../druid/segment/CursorFactoryProjectionTest.java |  99 +++++++++++--
 4 files changed, 192 insertions(+), 75 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
index 33882d7de9d..8bcc81787e5 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
@@ -445,12 +445,16 @@ public class AutoTypeColumnMerger implements 
DimensionMergerV9
         );
       }
     } else {
-      serializer = new NestedDataColumnSerializer(
+      NestedDataColumnSerializer nestedSerializer = new 
NestedDataColumnSerializer(
           outputName,
           indexSpec,
           segmentWriteOutMedium,
           closer
       );
+      // need to set dictionaries before can open field writers, it is 
harmless that it is set again later
+      nestedSerializer.setDictionaryIdLookup(autoParent.getIdLookup());
+      nestedSerializer.setFieldsAndOpenWriters((NestedDataColumnSerializer) 
autoParent.serializer);
+      serializer = nestedSerializer;
     }
 
     serializer.setDictionaryIdLookup(autoParent.getIdLookup());
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index 3dcef1d2d85..619c739cb5a 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -183,14 +183,6 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
     return globalDictionaryIdLookup;
   }
 
-  @Override
-  public void setDictionaryIdLookup(DictionaryIdLookup dictionaryIdLookup)
-  {
-    this.globalDictionaryIdLookup = dictionaryIdLookup;
-    this.writeDictionary = false;
-    this.dictionarySerialized = true;
-  }
-
   @Override
   public boolean hasNulls()
   {
@@ -279,54 +271,7 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
       final String fieldFileName = NESTED_FIELD_PREFIX + ctr++;
       fieldsWriter.write(fieldName);
       fieldsInfoWriter.write(field.getValue());
-      final GlobalDictionaryEncodedFieldColumnWriter<?> writer;
-      final ColumnType type = field.getValue().getSingleType();
-      if (type != null) {
-        if (Types.is(type, ValueType.STRING)) {
-          writer = new ScalarStringFieldColumnWriter(
-              name,
-              fieldFileName,
-              segmentWriteOutMedium,
-              indexSpec,
-              globalDictionaryIdLookup
-          );
-        } else if (Types.is(type, ValueType.LONG)) {
-          writer = new ScalarLongFieldColumnWriter(
-              name,
-              fieldFileName,
-              segmentWriteOutMedium,
-              indexSpec,
-              globalDictionaryIdLookup
-          );
-        } else if (Types.is(type, ValueType.DOUBLE)) {
-          writer = new ScalarDoubleFieldColumnWriter(
-              name,
-              fieldFileName,
-              segmentWriteOutMedium,
-              indexSpec,
-              globalDictionaryIdLookup
-          );
-        } else if (Types.is(type, ValueType.ARRAY)) {
-          writer = new VariantArrayFieldColumnWriter(
-              name,
-              fieldFileName,
-              segmentWriteOutMedium,
-              indexSpec,
-              globalDictionaryIdLookup
-          );
-        } else {
-          throw DruidException.defensive("Invalid field type [%s], how did 
this happen?", type);
-        }
-      } else {
-        writer = new VariantFieldColumnWriter(
-            name,
-            fieldFileName,
-            segmentWriteOutMedium,
-            indexSpec,
-            globalDictionaryIdLookup
-        );
-      }
-      writer.open();
+      final GlobalDictionaryEncodedFieldColumnWriter<?> writer = 
openFieldWriter(field, fieldFileName);
       fieldWriters.put(fieldName, writer);
     }
   }
@@ -417,11 +362,13 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
     closeForWrite();
 
     long size = 1 + columnNameBytes.capacity();
-    if (fieldsWriter != null) {
-      size += fieldsWriter.getSerializedSize();
-    }
-    if (fieldsInfoWriter != null) {
-      size += fieldsInfoWriter.getSerializedSize();
+    if (writeDictionary) {
+      if (fieldsWriter != null) {
+        size += fieldsWriter.getSerializedSize();
+      }
+      if (fieldsInfoWriter != null) {
+        size += fieldsInfoWriter.getSerializedSize();
+      }
     }
     // the value dictionaries, raw column, and null index are all stored in 
separate files
     return size;
@@ -437,12 +384,12 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
     if (writeDictionary) {
       Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not 
sorted?!?");
     }
-
     writeV0Header(channel, columnNameBytes);
-    fieldsWriter.writeTo(channel, smoosher);
-    fieldsInfoWriter.writeTo(channel, smoosher);
+
 
     if (writeDictionary) {
+      fieldsWriter.writeTo(channel, smoosher);
+      fieldsInfoWriter.writeTo(channel, smoosher);
       if (globalDictionaryIdLookup.getStringBufferMapper() != null) {
         copyFromTempSmoosh(smoosher, 
globalDictionaryIdLookup.getStringBufferMapper());
       } else {
@@ -486,4 +433,84 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
     }
     log.info("Column [%s] serialized successfully with [%d] nested columns.", 
name, fields.size());
   }
+
+  @Override
+  public void setDictionaryIdLookup(DictionaryIdLookup dictionaryIdLookup)
+  {
+    this.globalDictionaryIdLookup = dictionaryIdLookup;
+    this.writeDictionary = false;
+    this.dictionarySerialized = true;
+  }
+
+  public void setFieldsAndOpenWriters(NestedDataColumnSerializer serializer) 
throws IOException
+  {
+    fields = serializer.fields;
+    this.fieldWriters = Maps.newHashMapWithExpectedSize(fields.size());
+    int ctr = 0;
+    for (Map.Entry<String, FieldTypeInfo.MutableTypeSet> field : 
fields.entrySet()) {
+      final String fieldName = field.getKey();
+      final String fieldFileName = NESTED_FIELD_PREFIX + ctr++;
+      final GlobalDictionaryEncodedFieldColumnWriter<?> writer = 
openFieldWriter(
+          field,
+          fieldFileName
+      );
+      fieldWriters.put(fieldName, writer);
+    }
+  }
+
+  private GlobalDictionaryEncodedFieldColumnWriter<?> openFieldWriter(
+      Map.Entry<String, FieldTypeInfo.MutableTypeSet> field,
+      String fieldFileName
+  ) throws IOException
+  {
+    final GlobalDictionaryEncodedFieldColumnWriter<?> writer;
+    final ColumnType type = field.getValue().getSingleType();
+    if (type != null) {
+      if (Types.is(type, ValueType.STRING)) {
+        writer = new ScalarStringFieldColumnWriter(
+            name,
+            fieldFileName,
+            segmentWriteOutMedium,
+            indexSpec,
+            globalDictionaryIdLookup
+        );
+      } else if (Types.is(type, ValueType.LONG)) {
+        writer = new ScalarLongFieldColumnWriter(
+            name,
+            fieldFileName,
+            segmentWriteOutMedium,
+            indexSpec,
+            globalDictionaryIdLookup
+        );
+      } else if (Types.is(type, ValueType.DOUBLE)) {
+        writer = new ScalarDoubleFieldColumnWriter(
+            name,
+            fieldFileName,
+            segmentWriteOutMedium,
+            indexSpec,
+            globalDictionaryIdLookup
+        );
+      } else if (Types.is(type, ValueType.ARRAY)) {
+        writer = new VariantArrayFieldColumnWriter(
+            name,
+            fieldFileName,
+            segmentWriteOutMedium,
+            indexSpec,
+            globalDictionaryIdLookup
+        );
+      } else {
+        throw DruidException.defensive("Invalid field type [%s], how did this 
happen?", type);
+      }
+    } else {
+      writer = new VariantFieldColumnWriter(
+          name,
+          fieldFileName,
+          segmentWriteOutMedium,
+          indexSpec,
+          globalDictionaryIdLookup
+      );
+    }
+    writer.open();
+    return writer;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java
index 71072dfad99..b3ae98f2eb7 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java
@@ -75,15 +75,18 @@ public class NestedDataColumnSupplier implements 
Supplier<NestedCommonFormatColu
         final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
         final Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier;
 
-        fields = GenericIndexed.read(bb, GenericIndexed.STRING_STRATEGY, 
mapper);
-        fieldInfo = FieldTypeInfo.read(bb, fields.size());
+
 
         if (parent != null) {
+          fields = parent.fields;
+          fieldInfo = parent.fieldInfo;
           stringDictionarySupplier = parent.stringDictionarySupplier;
           longDictionarySupplier = parent.longDictionarySupplier;
           doubleDictionarySupplier = parent.doubleDictionarySupplier;
           arrayDictionarySupplier = parent.arrayDictionarySupplier;
         } else {
+          fields = GenericIndexed.read(bb, GenericIndexed.STRING_STRATEGY, 
mapper);
+          fieldInfo = FieldTypeInfo.read(bb, fields.size());
           final ByteBuffer stringDictionaryBuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
               mapper,
               columnName,
diff --git 
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
 
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
index 32f695b76b1..44ae7b10a7d 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
@@ -74,6 +74,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.segment.virtual.NestedFieldVirtualColumn;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.DateTime;
 import org.junit.AfterClass;
@@ -92,6 +93,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -107,6 +109,7 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
                                                         .add("c", 
ColumnType.LONG)
                                                         .add("d", 
ColumnType.DOUBLE)
                                                         .add("e", 
ColumnType.FLOAT)
+                                                        .add("f", 
ColumnType.NESTED_DATA)
                                                         .build();
 
   public static List<InputRow> makeRows(List<String> dimensions)
@@ -116,49 +119,49 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
             ROW_SIGNATURE,
             TIMESTAMP,
             dimensions,
-            Arrays.asList("a", "aa", 1L, 1.0)
+            Arrays.asList("a", "aa", 1L, 1.0, null, Map.of("x", "a", "y", 1L, 
"z", 1.0))
         ),
         new ListBasedInputRow(
             ROW_SIGNATURE,
             TIMESTAMP.plusMinutes(2),
             dimensions,
-            Arrays.asList("a", "bb", 1L, 1.1, 1.1f)
+            Arrays.asList("a", "bb", 1L, 1.1, 1.1f, Map.of("x", "a", "y", 1L, 
"z", 1.1))
         ),
         new ListBasedInputRow(
             ROW_SIGNATURE,
             TIMESTAMP.plusMinutes(4),
             dimensions,
-            Arrays.asList("a", "cc", 2L, 2.2, 2.2f)
+            Arrays.asList("a", "cc", 2L, 2.2, 2.2f, Map.of("x", "a", "y", 2L, 
"z", 2.2))
         ),
         new ListBasedInputRow(
             ROW_SIGNATURE,
             TIMESTAMP.plusMinutes(6),
             dimensions,
-            Arrays.asList("b", "aa", 3L, 3.3, 3.3f)
+            Arrays.asList("b", "aa", 3L, 3.3, 3.3f, Map.of("x", "b", "y", 3L, 
"z", 3.3))
         ),
         new ListBasedInputRow(
             ROW_SIGNATURE,
             TIMESTAMP.plusMinutes(8),
             dimensions,
-            Arrays.asList("b", "aa", 4L, 4.4, 4.4f)
+            Arrays.asList("b", "aa", 4L, 4.4, 4.4f, Map.of("x", "b", "y", 4L, 
"z", 4.4))
         ),
         new ListBasedInputRow(
             ROW_SIGNATURE,
             TIMESTAMP.plusMinutes(10),
             dimensions,
-            Arrays.asList("b", "bb", 5L, 5.5, 5.5f)
+            Arrays.asList("b", "bb", 5L, 5.5, 5.5f, Map.of("x", "b", "y", 5L, 
"z", 5.5))
         ),
         new ListBasedInputRow(
             ROW_SIGNATURE,
             TIMESTAMP.plusHours(1),
             dimensions,
-            Arrays.asList("a", "aa", 1L, 1.1, 1.1f)
+            Arrays.asList("a", "aa", 1L, 1.1, 1.1f, Map.of("x", "a", "y", 1L, 
"z", 1.1))
         ),
         new ListBasedInputRow(
             ROW_SIGNATURE,
             TIMESTAMP.plusHours(1).plusMinutes(1),
             dimensions,
-            Arrays.asList("a", "dd", 2L, 2.2, 2.2f)
+            Arrays.asList("a", "dd", 2L, 2.2, 2.2f, Map.of("x", "a", "y", 2L, 
"z", 2.2))
         )
     );
   }
@@ -282,6 +285,14 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
           new AggregatorFactory[]{
               new LongSumAggregatorFactory("csum", "c")
           }
+      ),
+      new AggregateProjectionSpec(
+          "json",
+          VirtualColumns.EMPTY,
+          List.of(new AutoTypeColumnSchema("f", null)),
+          new AggregatorFactory[]{
+              new LongSumAggregatorFactory("_c_sum", "c")
+          }
       )
   );
 
@@ -358,6 +369,7 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
                               new LongDimensionSchema("c"),
                               new DoubleDimensionSchema("d"),
                               new FloatDimensionSchema("e"),
+                              new AutoTypeColumnSchema("f", null),
                               new StringDimensionSchema("missing")
                           )
                       );
@@ -1686,6 +1698,77 @@ public class CursorFactoryProjectionTest extends 
InitializedNullHandlingTest
     }
   }
 
+  @Test
+  public void testProjectionJson()
+  {
+    // test can use the single dimension projection
+    final GroupByQuery.Builder bob =
+        GroupByQuery.builder()
+                    .setDataSource("test")
+                    .setGranularity(Granularities.ALL)
+                    .setInterval(Intervals.ETERNITY)
+                    .setVirtualColumns(
+                        new NestedFieldVirtualColumn(
+                            "f",
+                            "$.x",
+                            "v0",
+                            ColumnType.STRING
+
+                        )
+                    )
+                    .addDimension("v0")
+                    .addAggregator(new LongSumAggregatorFactory("c_sum", "c"));
+
+    final GroupByQuery query = bob.build();
+    final GroupByQuery queryNoProjection = 
bob.setContext(Map.of(QueryContexts.NO_PROJECTIONS, true)).build();
+
+    final CursorBuildSpec buildSpec = 
GroupingEngine.makeCursorBuildSpec(query, null);
+    try (final CursorHolder cursorHolder = 
projectionsCursorFactory.makeCursorHolder(buildSpec)) {
+      final Cursor cursor = cursorHolder.asCursor();
+      int rowCount = 0;
+      while (!cursor.isDone()) {
+        rowCount++;
+        cursor.advance();
+      }
+      Assert.assertEquals(6, rowCount);
+    }
+
+    final CursorBuildSpec buildSpecNoProjection = 
GroupingEngine.makeCursorBuildSpec(queryNoProjection, null);
+    try (final CursorHolder cursorHolder = 
projectionsCursorFactory.makeCursorHolder(buildSpecNoProjection)) {
+      final Cursor cursor = cursorHolder.asCursor();
+      int rowCount = 0;
+      while (!cursor.isDone()) {
+        rowCount++;
+        cursor.advance();
+      }
+      Assert.assertEquals(8, rowCount);
+    }
+
+    final Sequence<ResultRow> resultRows = groupingEngine.process(
+        query,
+        projectionsCursorFactory,
+        projectionsTimeBoundaryInspector,
+        nonBlockingPool,
+        null
+    );
+    final List<ResultRow> results = resultRows.toList();
+    Assert.assertEquals(2, results.size());
+    Assert.assertArrayEquals(new Object[]{"a", 7L}, results.get(0).getArray());
+    Assert.assertArrayEquals(new Object[]{"b", 12L}, 
results.get(1).getArray());
+
+    final Sequence<ResultRow> resultRowsNoProjection = groupingEngine.process(
+        query,
+        projectionsCursorFactory,
+        projectionsTimeBoundaryInspector,
+        nonBlockingPool,
+        null
+    );
+    final List<ResultRow> resultsNoProjection = 
resultRowsNoProjection.toList();
+    Assert.assertEquals(2, resultsNoProjection.size());
+    Assert.assertArrayEquals(new Object[]{"a", 7L}, 
resultsNoProjection.get(0).getArray());
+    Assert.assertArrayEquals(new Object[]{"b", 12L}, 
resultsNoProjection.get(1).getArray());
+  }
+
   private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec, 
boolean autoSchema, boolean writeNullColumns)
   {
     File tmp = FileUtils.createTempDir();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to