This is an automated email from the ASF dual-hosted git repository.

yqm pushed a commit to branch 37.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/37.0.0 by this push:
     new fcef02e6faf refactor: adds StringColumnFormatSpec for string dimension 
configs (#19258)
fcef02e6faf is described below

commit fcef02e6fafae40c3a5f8767278191bd2bc9cf5e
Author: Jay Kanakiya <[email protected]>
AuthorDate: Wed Apr 15 09:10:31 2026 -0700

    refactor: adds StringColumnFormatSpec for string dimension configs (#19258)
---
 .../RabbitStreamIndexTaskTuningConfigTest.java     |   1 +
 .../data/input/impl/StringDimensionSchema.java     |  87 ++++++---
 .../java/org/apache/druid/segment/IndexSpec.java   |  44 ++++-
 .../druid/segment/StringColumnFormatSpec.java      | 212 +++++++++++++++++++++
 .../druid/segment/StringDimensionHandler.java      |   3 +-
 .../druid/segment/StringDimensionIndexer.java      |   4 +-
 .../segment/column/StringBitmapIndexType.java      | 102 ++++++++++
 .../data/input/impl/StringDimensionSchemaTest.java | 168 ++++++++++++++--
 .../org/apache/druid/segment/IndexSpecTest.java    |  19 ++
 .../druid/segment/StringColumnFormatSpecTest.java  | 155 +++++++++++++++
 .../realtime/appenderator/BatchAppenderator.java   |   6 +-
 .../realtime/appenderator/StreamAppenderator.java  |   5 +-
 .../apache/druid/segment/realtime/sink/Sink.java   |  24 ++-
 .../server/compaction/CompactionStatusTest.java    |   4 +-
 14 files changed, 784 insertions(+), 50 deletions(-)

diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfigTest.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfigTest.java
index 8eee07a747b..19930d1f9d5 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfigTest.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfigTest.java
@@ -163,6 +163,7 @@ public class RabbitStreamIndexTaskTuningConfigTest
                     "longEncoding=null, " +
                     "complexMetricCompression=null, " +
                     "autoColumnFormatSpec=null, " +
+                    "stringColumnFormatSpec=null, " +
                     "jsonCompression=null, " +
                     "segmentLoader=null" +
                     "}, " +
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/StringDimensionSchema.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/StringDimensionSchema.java
index 018d9ca5c35..20daa347664 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/StringDimensionSchema.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/StringDimensionSchema.java
@@ -23,37 +23,21 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.error.DruidException;
-import org.apache.druid.guice.BuiltInTypesModule;
 import org.apache.druid.segment.DimensionHandler;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.StringColumnFormatSpec;
 import org.apache.druid.segment.StringDimensionHandler;
 import org.apache.druid.segment.column.ColumnType;
 
 import javax.annotation.Nullable;
+import java.util.Objects;
 
 public class StringDimensionSchema extends DimensionSchema
 {
   private static final boolean DEFAULT_CREATE_BITMAP_INDEX = true;
 
   @Nullable
-  public static Integer getDefaultMaxStringLength()
-  {
-    return BuiltInTypesModule.getMaxStringLength();
-  }
-
-  @Nullable
-  private static Integer validateMaxStringLength(String name, @Nullable 
Integer maxStringLength)
-  {
-    if (maxStringLength != null && maxStringLength < 0) {
-      throw DruidException.forPersona(DruidException.Persona.USER)
-                          .ofCategory(DruidException.Category.INVALID_INPUT)
-                          .build("maxStringLength for column [%s] must be >= 
0, got [%s]", name, maxStringLength);
-    }
-    return maxStringLength != null ? maxStringLength : 
getDefaultMaxStringLength();
-  }
-
-  @Nullable
-  private final Integer maxStringLength;
+  private final StringColumnFormatSpec columnFormatSpec;
 
   @JsonCreator
   public static StringDimensionSchema create(String name)
@@ -66,11 +50,11 @@ public class StringDimensionSchema extends DimensionSchema
       @JsonProperty("name") String name,
       @JsonProperty("multiValueHandling") MultiValueHandling 
multiValueHandling,
       @JsonProperty("createBitmapIndex") Boolean createBitmapIndex,
-      @JsonProperty("maxStringLength") @Nullable Integer maxStringLength
+      @JsonProperty("columnFormatSpec") @Nullable StringColumnFormatSpec 
columnFormatSpec
   )
   {
     super(name, multiValueHandling, createBitmapIndex == null ? 
DEFAULT_CREATE_BITMAP_INDEX : createBitmapIndex);
-    this.maxStringLength = validateMaxStringLength(name, maxStringLength);
+    this.columnFormatSpec = columnFormatSpec;
   }
 
   public StringDimensionSchema(
@@ -87,12 +71,29 @@ public class StringDimensionSchema extends DimensionSchema
     this(name, null, DEFAULT_CREATE_BITMAP_INDEX, null);
   }
 
+  @Nullable
   @JsonProperty
   @JsonInclude(JsonInclude.Include.NON_NULL)
-  @Nullable
-  public Integer getMaxStringLength()
+  public StringColumnFormatSpec getColumnFormatSpec()
+  {
+    return columnFormatSpec;
+  }
+
+  @Override
+  public DimensionSchema getEffectiveSchema(IndexSpec indexSpec)
   {
-    return maxStringLength;
+    // If there's no per-column or job-level string format config, nothing to 
resolve
+    if (columnFormatSpec == null && indexSpec.getStringColumnFormatSpec() == 
null) {
+      return this;
+    }
+    StringColumnFormatSpec effective =
+        StringColumnFormatSpec.getEffectiveFormatSpec(columnFormatSpec, 
indexSpec);
+    return new StringDimensionSchema(
+        getName(),
+        getMultiValueHandling(),
+        hasBitmapIndex(),
+        effective
+    );
   }
 
   @Override
@@ -117,6 +118,40 @@ public class StringDimensionSchema extends DimensionSchema
   @Override
   public DimensionHandler getDimensionHandler()
   {
-    return new StringDimensionHandler(getName(), getMultiValueHandling(), 
hasBitmapIndex(), false, maxStringLength);
+    MultiValueHandling mvh = getMultiValueHandling();
+    boolean bitmap = hasBitmapIndex();
+    Integer maxStringLength = null;
+    if (columnFormatSpec != null) {
+      if (columnFormatSpec.getMultiValueHandling() != null) {
+        mvh = columnFormatSpec.getMultiValueHandling();
+      }
+      if (columnFormatSpec.getIndexType() != null) {
+        bitmap = columnFormatSpec.getIndexType().hasBitmapIndex();
+      }
+      maxStringLength = columnFormatSpec.getMaxStringLength();
+    }
+    return new StringDimensionHandler(getName(), mvh, bitmap, false, 
maxStringLength);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    StringDimensionSchema that = (StringDimensionSchema) o;
+    return Objects.equals(columnFormatSpec, that.columnFormatSpec);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), columnFormatSpec);
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java 
b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
index c6f2a59e0ec..470ba256deb 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
@@ -75,6 +75,8 @@ public class IndexSpec
   @Nullable
   private final NestedCommonFormatColumnFormatSpec autoColumnFormatSpec;
   @Nullable
+  private final StringColumnFormatSpec stringColumnFormatSpec;
+  @Nullable
   private final CompressionStrategy metadataCompression;
 
   /**
@@ -109,6 +111,8 @@ public class IndexSpec
    *                                 used to load the written segment
    * @param autoColumnFormatSpec     specify the default {@link 
NestedCommonFormatColumnFormatSpec} to use for json and
    *                                 auto columns. Defaults to null upon 
calling {@link #getEffectiveSpec()}.
+   * @param stringColumnFormatSpec   specify the default {@link 
StringColumnFormatSpec} to use for string columns.
+   *                                 Defaults to null upon calling {@link 
#getEffectiveSpec()}.
    */
   @JsonCreator
   public IndexSpec(
@@ -121,7 +125,8 @@ public class IndexSpec
       @JsonProperty("complexMetricCompression") @Nullable CompressionStrategy 
complexMetricCompression,
       @Deprecated @JsonProperty("jsonCompression") @Nullable 
CompressionStrategy jsonCompression,
       @JsonProperty("segmentLoader") @Nullable SegmentizerFactory 
segmentLoader,
-      @JsonProperty("autoColumnFormatSpec") @Nullable 
NestedCommonFormatColumnFormatSpec autoColumnFormatSpec
+      @JsonProperty("autoColumnFormatSpec") @Nullable 
NestedCommonFormatColumnFormatSpec autoColumnFormatSpec,
+      @JsonProperty("stringColumnFormatSpec") @Nullable StringColumnFormatSpec 
stringColumnFormatSpec
   )
   {
     this.bitmapSerdeFactory = bitmapSerdeFactory;
@@ -134,6 +139,7 @@ public class IndexSpec
     this.jsonCompression = jsonCompression;
     this.segmentLoader = segmentLoader;
     this.autoColumnFormatSpec = autoColumnFormatSpec;
+    this.stringColumnFormatSpec = stringColumnFormatSpec;
   }
 
   @JsonProperty("bitmap")
@@ -212,6 +218,14 @@ public class IndexSpec
     return autoColumnFormatSpec;
   }
 
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @Nullable
+  public StringColumnFormatSpec getStringColumnFormatSpec()
+  {
+    return stringColumnFormatSpec;
+  }
+
   /**
    * Populate all null fields of {@link IndexSpec}, first from {@link 
#getDefault()} and finally falling back to hard
    * coded defaults if no overrides are defined.
@@ -298,6 +312,16 @@ public class IndexSpec
       );
     }
 
+    if (stringColumnFormatSpec != null) {
+      bob.withStringColumnFormatSpec(
+          
StringColumnFormatSpec.getEffectiveFormatSpec(stringColumnFormatSpec, this)
+      );
+    } else if (defaultSpec.stringColumnFormatSpec != null) {
+      bob.withStringColumnFormatSpec(
+          
StringColumnFormatSpec.getEffectiveFormatSpec(defaultSpec.stringColumnFormatSpec,
 this)
+      );
+    }
+
     return bob.build();
   }
 
@@ -320,7 +344,8 @@ public class IndexSpec
            Objects.equals(complexMetricCompression, 
indexSpec.complexMetricCompression) &&
            Objects.equals(jsonCompression, indexSpec.jsonCompression) &&
            Objects.equals(segmentLoader, indexSpec.segmentLoader) &&
-           Objects.equals(autoColumnFormatSpec, 
indexSpec.autoColumnFormatSpec);
+           Objects.equals(autoColumnFormatSpec, 
indexSpec.autoColumnFormatSpec) &&
+           Objects.equals(stringColumnFormatSpec, 
indexSpec.stringColumnFormatSpec);
   }
 
   @Override
@@ -336,7 +361,8 @@ public class IndexSpec
         complexMetricCompression,
         jsonCompression,
         segmentLoader,
-        autoColumnFormatSpec
+        autoColumnFormatSpec,
+        stringColumnFormatSpec
     );
   }
 
@@ -352,6 +378,7 @@ public class IndexSpec
            ", longEncoding=" + longEncoding +
            ", complexMetricCompression=" + complexMetricCompression +
            ", autoColumnFormatSpec=" + autoColumnFormatSpec +
+           ", stringColumnFormatSpec=" + stringColumnFormatSpec +
            ", jsonCompression=" + jsonCompression +
            ", segmentLoader=" + segmentLoader +
            '}';
@@ -379,6 +406,8 @@ public class IndexSpec
     private SegmentizerFactory segmentLoader;
     @Nullable
     private NestedCommonFormatColumnFormatSpec autoColumnFormatSpec;
+    @Nullable
+    private StringColumnFormatSpec stringColumnFormatSpec;
 
     public Builder withBitmapSerdeFactory(@Nullable BitmapSerdeFactory 
bitmapSerdeFactory)
     {
@@ -441,6 +470,12 @@ public class IndexSpec
       return this;
     }
 
+    public Builder withStringColumnFormatSpec(@Nullable StringColumnFormatSpec 
stringColumnFormatSpec)
+    {
+      this.stringColumnFormatSpec = stringColumnFormatSpec;
+      return this;
+    }
+
     public IndexSpec build()
     {
       return new IndexSpec(
@@ -453,7 +488,8 @@ public class IndexSpec
           complexMetricCompression,
           jsonCompression,
           segmentLoader,
-          autoColumnFormatSpec
+          autoColumnFormatSpec,
+          stringColumnFormatSpec
       );
     }
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/StringColumnFormatSpec.java 
b/processing/src/main/java/org/apache/druid/segment/StringColumnFormatSpec.java
new file mode 100644
index 00000000000..6f872ca4983
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/StringColumnFormatSpec.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.column.StringBitmapIndexType;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+public class StringColumnFormatSpec
+{
+  private static final StringColumnFormatSpec DEFAULT =
+      builder()
+          
.setIndexType(StringBitmapIndexType.DictionaryEncodedValueIndex.INSTANCE)
+          
.setMultiValueHandling(DimensionSchema.MultiValueHandling.SORTED_ARRAY)
+          .build();
+
+  public static Builder builder()
+  {
+    return new Builder();
+  }
+
+  public static Builder builder(StringColumnFormatSpec spec)
+  {
+    return new Builder(spec);
+  }
+
+  public static StringColumnFormatSpec getEffectiveFormatSpec(
+      @Nullable StringColumnFormatSpec columnFormatSpec,
+      IndexSpec indexSpec
+  )
+  {
+    final Builder builder = columnFormatSpec == null ? builder() : 
builder(columnFormatSpec);
+
+    final StringColumnFormatSpec defaultSpec;
+    if (indexSpec.getStringColumnFormatSpec() != null) {
+      defaultSpec = indexSpec.getStringColumnFormatSpec();
+    } else {
+      defaultSpec = DEFAULT;
+    }
+
+    if (builder.indexType == null) {
+      if (defaultSpec.getIndexType() != null) {
+        builder.setIndexType(defaultSpec.getIndexType());
+      } else {
+        builder.setIndexType(DEFAULT.getIndexType());
+      }
+    }
+
+    if (builder.multiValueHandling == null) {
+      if (defaultSpec.getMultiValueHandling() != null) {
+        builder.setMultiValueHandling(defaultSpec.getMultiValueHandling());
+      } else {
+        builder.setMultiValueHandling(DEFAULT.getMultiValueHandling());
+      }
+    }
+
+    if (builder.maxStringLength == null) {
+      // No DEFAULT fallback needed: null means "no truncation"
+      builder.setMaxStringLength(defaultSpec.getMaxStringLength());
+    }
+
+    return builder.build();
+  }
+
+  @Nullable
+  private final StringBitmapIndexType indexType;
+
+  @Nullable
+  private final DimensionSchema.MultiValueHandling multiValueHandling;
+
+  @Nullable
+  private final Integer maxStringLength;
+
+  @JsonCreator
+  public StringColumnFormatSpec(
+      @JsonProperty("indexType") @Nullable StringBitmapIndexType indexType,
+      @JsonProperty("multiValueHandling") @Nullable 
DimensionSchema.MultiValueHandling multiValueHandling,
+      @JsonProperty("maxStringLength") @Nullable Integer maxStringLength
+  )
+  {
+    if (maxStringLength != null && maxStringLength < 0) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("maxStringLength must be >= 0, got [%s]", 
maxStringLength);
+    }
+    this.indexType = indexType;
+    this.multiValueHandling = multiValueHandling;
+    this.maxStringLength = maxStringLength;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public StringBitmapIndexType getIndexType()
+  {
+    return indexType;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public DimensionSchema.MultiValueHandling getMultiValueHandling()
+  {
+    return multiValueHandling;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Integer getMaxStringLength()
+  {
+    return maxStringLength;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    StringColumnFormatSpec that = (StringColumnFormatSpec) o;
+    return Objects.equals(indexType, that.indexType)
+           && multiValueHandling == that.multiValueHandling
+           && Objects.equals(maxStringLength, that.maxStringLength);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(indexType, multiValueHandling, maxStringLength);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "StringColumnFormatSpec{" +
+           "indexType=" + indexType +
+           ", multiValueHandling=" + multiValueHandling +
+           ", maxStringLength=" + maxStringLength +
+           '}';
+  }
+
+  public static class Builder
+  {
+    @Nullable
+    private StringBitmapIndexType indexType;
+    @Nullable
+    private DimensionSchema.MultiValueHandling multiValueHandling;
+    @Nullable
+    private Integer maxStringLength;
+
+    public Builder()
+    {
+    }
+
+    public Builder(StringColumnFormatSpec spec)
+    {
+      this.indexType = spec.indexType;
+      this.multiValueHandling = spec.multiValueHandling;
+      this.maxStringLength = spec.maxStringLength;
+    }
+
+    public Builder setIndexType(@Nullable StringBitmapIndexType indexType)
+    {
+      this.indexType = indexType;
+      return this;
+    }
+
+    public Builder setMultiValueHandling(@Nullable 
DimensionSchema.MultiValueHandling multiValueHandling)
+    {
+      this.multiValueHandling = multiValueHandling;
+      return this;
+    }
+
+    public Builder setMaxStringLength(@Nullable Integer maxStringLength)
+    {
+      this.maxStringLength = maxStringLength;
+      return this;
+    }
+
+    public StringColumnFormatSpec build()
+    {
+      return new StringColumnFormatSpec(indexType, multiValueHandling, 
maxStringLength);
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
index 0d23fe24aa7..8deb4aca0ed 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
@@ -23,6 +23,7 @@ import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
 import org.apache.druid.data.input.impl.NewSpatialDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.guice.BuiltInTypesModule;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.segment.column.ColumnCapabilities;
@@ -115,7 +116,7 @@ public class StringDimensionHandler implements 
DimensionHandler<Integer, int[],
       boolean hasSpatialIndexes
   )
   {
-    this(dimensionName, multiValueHandling, hasBitmapIndexes, 
hasSpatialIndexes, StringDimensionSchema.getDefaultMaxStringLength());
+    this(dimensionName, multiValueHandling, hasBitmapIndexes, 
hasSpatialIndexes, BuiltInTypesModule.getMaxStringLength());
   }
 
   public StringDimensionHandler(
diff --git 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
index 88f60ee8042..8c7a59e2c24 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
@@ -24,8 +24,8 @@ import it.unimi.dsi.fastutil.ints.IntArrays;
 import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
-import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.BuiltInTypesModule;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.Comparators;
@@ -68,7 +68,7 @@ public class StringDimensionIndexer extends 
DictionaryEncodedColumnIndexer<int[]
       boolean hasSpatialIndexes
   )
   {
-    this(multiValueHandling, hasBitmapIndexes, hasSpatialIndexes, 
StringDimensionSchema.getDefaultMaxStringLength());
+    this(multiValueHandling, hasBitmapIndexes, hasSpatialIndexes, 
BuiltInTypesModule.getMaxStringLength());
   }
 
   public StringDimensionIndexer(
diff --git 
a/processing/src/main/java/org/apache/druid/segment/column/StringBitmapIndexType.java
 
b/processing/src/main/java/org/apache/druid/segment/column/StringBitmapIndexType.java
new file mode 100644
index 00000000000..463b174edc8
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/column/StringBitmapIndexType.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.util.Objects;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes({
+    @JsonSubTypes.Type(value = 
StringBitmapIndexType.DictionaryEncodedValueIndex.class, name = 
StringBitmapIndexType.TYPE_DICTIONARY),
+    @JsonSubTypes.Type(value = StringBitmapIndexType.NoIndex.class, name = 
StringBitmapIndexType.TYPE_NONE)
+})
+public abstract class StringBitmapIndexType
+{
+  protected static final String TYPE_DICTIONARY = 
"dictionaryEncodedValueIndex";
+  protected static final String TYPE_NONE = "none";
+
+  public abstract boolean hasBitmapIndex();
+
+  public static class DictionaryEncodedValueIndex extends StringBitmapIndexType
+  {
+    public static final DictionaryEncodedValueIndex INSTANCE = new 
DictionaryEncodedValueIndex();
+
+    @Override
+    public boolean hasBitmapIndex()
+    {
+      return true;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hashCode(getClass());
+    }
+
+    @Override
+    public String toString()
+    {
+      return TYPE_DICTIONARY;
+    }
+  }
+
+  public static class NoIndex extends StringBitmapIndexType
+  {
+    public static final NoIndex INSTANCE = new NoIndex();
+
+    @Override
+    public boolean hasBitmapIndex()
+    {
+      return false;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hashCode(getClass());
+    }
+
+    @Override
+    public String toString()
+    {
+      return TYPE_NONE;
+    }
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/data/input/impl/StringDimensionSchemaTest.java
 
b/processing/src/test/java/org/apache/druid/data/input/impl/StringDimensionSchemaTest.java
index dbee07bddb8..cbdecdcf320 100644
--- 
a/processing/src/test/java/org/apache/druid/data/input/impl/StringDimensionSchemaTest.java
+++ 
b/processing/src/test/java/org/apache/druid/data/input/impl/StringDimensionSchemaTest.java
@@ -23,11 +23,13 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.AnnotationIntrospector;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
-import org.apache.druid.error.DruidException;
 import org.apache.druid.guice.DruidSecondaryModule;
 import org.apache.druid.guice.GuiceAnnotationIntrospector;
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.StringColumnFormatSpec;
+import org.apache.druid.segment.column.StringBitmapIndexType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
 public class StringDimensionSchemaTest
 {
@@ -46,30 +48,172 @@ public class StringDimensionSchemaTest
   {
     final String json = "\"dim\"";
     final StringDimensionSchema schema = (StringDimensionSchema) 
jsonMapper.readValue(json, DimensionSchema.class);
-    Assert.assertEquals(new StringDimensionSchema("dim"), schema);
+    Assertions.assertEquals(new StringDimensionSchema("dim"), schema);
   }
 
   @Test
   public void testDeserializeFromJson() throws JsonProcessingException
+  {
+    final String json = "{\n"
+                        + "  \"name\" : \"dim\",\n"
+                        + "  \"multiValueHandling\" : \"SORTED_SET\",\n"
+                        + "  \"createBitmapIndex\" : false\n"
+                        + "}";
+    final StringDimensionSchema schema = (StringDimensionSchema) 
jsonMapper.readValue(json, DimensionSchema.class);
+    Assertions.assertEquals(new StringDimensionSchema("dim", 
MultiValueHandling.SORTED_SET, false), schema);
+  }
+
+  @Test
+  public void testDeserializeFromJsonWithColumnFormatSpec() throws 
JsonProcessingException
   {
     final String json = "{\n"
                         + "  \"name\" : \"dim\",\n"
                         + "  \"multiValueHandling\" : \"SORTED_SET\",\n"
                         + "  \"createBitmapIndex\" : false,\n"
-                        + "  \"maxStringLength\" : 200\n"
+                        + "  \"columnFormatSpec\" : { \"maxStringLength\" : 
200 }\n"
                         + "}";
     final StringDimensionSchema schema = (StringDimensionSchema) 
jsonMapper.readValue(json, DimensionSchema.class);
-    Assert.assertEquals(new StringDimensionSchema("dim", 
MultiValueHandling.SORTED_SET, false), schema);
-    Assert.assertEquals(Integer.valueOf(200), schema.getMaxStringLength());
+    final StringColumnFormatSpec expectedSpec = 
StringColumnFormatSpec.builder()
+        .setMaxStringLength(200)
+        .build();
+    Assertions.assertEquals(
+        new StringDimensionSchema("dim", MultiValueHandling.SORTED_SET, false, 
expectedSpec),
+        schema
+    );
+    Assertions.assertEquals(Integer.valueOf(200), 
schema.getColumnFormatSpec().getMaxStringLength());
   }
 
   @Test
-  public void testInvalidMaxStringLength()
+  public void testGetEffectiveSchemaResolvesMaxStringLengthFromIndexSpec()
   {
-    final Exception exception = Assert.assertThrows(
-        DruidException.class,
-        () -> new StringDimensionSchema("dim", null, true, -1)
+    final StringDimensionSchema schema = new StringDimensionSchema("dim");
+    final IndexSpec indexSpec = IndexSpec.builder()
+        .withStringColumnFormatSpec(
+            StringColumnFormatSpec.builder().setMaxStringLength(50).build()
+        )
+        .build();
+
+    final StringDimensionSchema effective = (StringDimensionSchema) 
schema.getEffectiveSchema(indexSpec);
+
+    Assertions.assertEquals(Integer.valueOf(50), 
effective.getColumnFormatSpec().getMaxStringLength());
+    Assertions.assertEquals("dim", effective.getName());
+  }
+
+  @Test
+  public void testGetEffectiveSchemaPreservesPerColumnMaxStringLength()
+  {
+    final StringColumnFormatSpec columnSpec = StringColumnFormatSpec.builder()
+        .setMaxStringLength(20)
+        .build();
+    final StringDimensionSchema schema = new StringDimensionSchema("dim", 
null, true, columnSpec);
+    final IndexSpec indexSpec = IndexSpec.builder()
+        .withStringColumnFormatSpec(
+            StringColumnFormatSpec.builder().setMaxStringLength(50).build()
+        )
+        .build();
+
+    final StringDimensionSchema effective = (StringDimensionSchema) 
schema.getEffectiveSchema(indexSpec);
+
+    // Per-column maxStringLength=20 should not be overridden by job level 50
+    Assertions.assertEquals(Integer.valueOf(20), 
effective.getColumnFormatSpec().getMaxStringLength());
+  }
+
+  @Test
+  public void testGetEffectiveSchemaPreservesCreateBitmapIndex()
+  {
+    final StringDimensionSchema schema = new StringDimensionSchema("dim", 
null, false);
+    final IndexSpec indexSpec = IndexSpec.builder().build();
+
+    final StringDimensionSchema effective = (StringDimensionSchema) 
schema.getEffectiveSchema(indexSpec);
+
+    Assertions.assertFalse(effective.hasBitmapIndex());
+  }
+
+  @Test
+  public void testGetEffectiveSchemaPreservesMultiValueHandling()
+  {
+    final StringDimensionSchema schema = new StringDimensionSchema("dim", 
MultiValueHandling.ARRAY, true);
+    final IndexSpec indexSpec = IndexSpec.builder().build();
+
+    final StringDimensionSchema effective = (StringDimensionSchema) 
schema.getEffectiveSchema(indexSpec);
+
+    // multiValueHandling=ARRAY must not be overridden by the DEFAULT 
(SORTED_ARRAY)
+    Assertions.assertEquals(MultiValueHandling.ARRAY, 
effective.getMultiValueHandling());
+  }
+
+  @Test
+  public void testGetEffectiveSchemaNoChangeWithoutStringColumnFormatSpec()
+  {
+    final StringDimensionSchema schema = new StringDimensionSchema("dim");
+    final IndexSpec indexSpec = IndexSpec.builder().build();
+
+    final StringDimensionSchema effective = (StringDimensionSchema) 
schema.getEffectiveSchema(indexSpec);
+
+    // With no stringColumnFormatSpec, should return same object
+    Assertions.assertSame(schema, effective);
+    Assertions.assertEquals(schema.hasBitmapIndex(), 
effective.hasBitmapIndex());
+    Assertions.assertEquals(schema.getMultiValueHandling(), 
effective.getMultiValueHandling());
+  }
+
+  @Test
+  public void testGetEffectiveSchemaResolvesIndexTypeFromIndexSpec()
+  {
+    final StringDimensionSchema schema = new StringDimensionSchema("dim");
+    final IndexSpec indexSpec = IndexSpec.builder()
+        .withStringColumnFormatSpec(
+            StringColumnFormatSpec.builder()
+                .setIndexType(StringBitmapIndexType.NoIndex.INSTANCE)
+                .build()
+        )
+        .build();
+
+    final StringDimensionSchema effective = (StringDimensionSchema) 
schema.getEffectiveSchema(indexSpec);
+
+    Assertions.assertEquals(
+        StringBitmapIndexType.NoIndex.INSTANCE,
+        effective.getColumnFormatSpec().getIndexType()
     );
-    Assert.assertTrue(exception.getMessage().contains("maxStringLength for 
column [dim] must be >= 0"));
+  }
+
+  @Test
+  public void testGetDimensionHandlerColumnFormatSpecOverridesParent()
+  {
+    final StringColumnFormatSpec spec = StringColumnFormatSpec.builder()
+        .setIndexType(StringBitmapIndexType.NoIndex.INSTANCE)
+        .setMultiValueHandling(MultiValueHandling.ARRAY)
+        .build();
+    // Parent has SORTED_SET + createBitmapIndex=true, columnFormatSpec has 
ARRAY + NoIndex, formatSpec should be honored
+    final StringDimensionSchema schema = new StringDimensionSchema("dim", 
MultiValueHandling.SORTED_SET, true, spec);
+
+    Assertions.assertEquals(MultiValueHandling.ARRAY, 
schema.getDimensionHandler().getMultivalueHandling());
+    final DimensionSchema handlerSchema = 
schema.getDimensionHandler().getDimensionSchema(null);
+    Assertions.assertFalse(handlerSchema.hasBitmapIndex());
+  }
+
+  @Test
+  public void 
testGetDimensionHandlerColumnFormatSpecIndexTypeOverridesParentFalse()
+  {
+    final StringColumnFormatSpec spec = StringColumnFormatSpec.builder()
+        
.setIndexType(StringBitmapIndexType.DictionaryEncodedValueIndex.INSTANCE)
+        .build();
+    // Parent has createBitmapIndex=false, columnFormatSpec has 
DictionaryEncodedValueIndex, formatSpec should be honored
+    final StringDimensionSchema schema = new StringDimensionSchema("dim", 
null, false, spec);
+    final DimensionSchema handlerSchema = 
schema.getDimensionHandler().getDimensionSchema(null);
+
+    Assertions.assertTrue(handlerSchema.hasBitmapIndex());
+  }
+
+  @Test
+  public void 
testGetDimensionHandlerFallsBackToParentWhenColumnFormatSpecFieldsNull()
+  {
+    final StringColumnFormatSpec spec = StringColumnFormatSpec.builder()
+        .setMaxStringLength(100)
+        .build();
+    // columnFormatSpec has only maxStringLength, and no indexType or 
multiValueHandling, parent values should be used
+    final StringDimensionSchema schema = new StringDimensionSchema("dim", 
MultiValueHandling.SORTED_SET, false, spec);
+
+    Assertions.assertEquals(MultiValueHandling.SORTED_SET, 
schema.getDimensionHandler().getMultivalueHandling());
+    final DimensionSchema handlerSchema = 
schema.getDimensionHandler().getDimensionSchema(null);
+    Assertions.assertFalse(handlerSchema.hasBitmapIndex());
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/IndexSpecTest.java 
b/processing/src/test/java/org/apache/druid/segment/IndexSpecTest.java
index 663f1b80a24..973aedb938e 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexSpecTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexSpecTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.segment.column.StringBitmapIndexType;
 import org.apache.druid.segment.data.CompressionFactory;
 import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
 import org.apache.druid.segment.data.CompressionStrategy;
@@ -69,11 +70,29 @@ public class IndexSpecTest
     Assert.assertEquals(LongEncodingStrategy.LONGS, spec.getLongEncoding());
   }
 
+  @Test
+  public void testSerdeWithStringColumnFormatSpec() throws Exception
+  {
+    final ObjectMapper objectMapper = new DefaultObjectMapper();
+    final String json = "{ \"stringColumnFormatSpec\" : { \"maxStringLength\" 
: 50 } }";
+
+    final IndexSpec spec = objectMapper.readValue(json, IndexSpec.class);
+    Assert.assertNotNull(spec.getStringColumnFormatSpec());
+    Assert.assertEquals(Integer.valueOf(50), 
spec.getStringColumnFormatSpec().getMaxStringLength());
+
+    Assert.assertEquals(spec, 
objectMapper.readValue(objectMapper.writeValueAsBytes(spec), IndexSpec.class));
+  }
+
   @Test
   public void testEquals()
   {
     EqualsVerifier.forClass(IndexSpec.class)
                   .usingGetClass()
+                  .withPrefabValues(
+                      StringBitmapIndexType.class,
+                      
StringBitmapIndexType.DictionaryEncodedValueIndex.INSTANCE,
+                      StringBitmapIndexType.NoIndex.INSTANCE
+                  )
                   .verify();
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/StringColumnFormatSpecTest.java
 
b/processing/src/test/java/org/apache/druid/segment/StringColumnFormatSpecTest.java
new file mode 100644
index 00000000000..f2d7f80f9f4
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/StringColumnFormatSpecTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
+import org.apache.druid.segment.column.StringBitmapIndexType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StringColumnFormatSpecTest
+{
+  @Test
+  public void testSerde() throws JsonProcessingException
+  {
+    StringColumnFormatSpec spec = StringColumnFormatSpec.builder()
+        .setIndexType(StringBitmapIndexType.NoIndex.INSTANCE)
+        .setMultiValueHandling(MultiValueHandling.SORTED_SET)
+        .setMaxStringLength(100)
+        .build();
+
+    StringColumnFormatSpec roundTripped = TestHelper.JSON_MAPPER.readValue(
+        TestHelper.JSON_MAPPER.writeValueAsString(spec),
+        StringColumnFormatSpec.class
+    );
+    Assert.assertEquals(spec, roundTripped);
+  }
+
+  @Test
+  public void testSerdeNullFields() throws JsonProcessingException
+  {
+    StringColumnFormatSpec spec = StringColumnFormatSpec.builder().build();
+
+    String json = TestHelper.JSON_MAPPER.writeValueAsString(spec);
+    Assert.assertEquals("{}", json);
+
+    StringColumnFormatSpec roundTripped = 
TestHelper.JSON_MAPPER.readValue(json, StringColumnFormatSpec.class);
+    Assert.assertEquals(spec, roundTripped);
+  }
+
+  @Test
+  public void testGetEffectiveFormatSpecDefaults()
+  {
+    StringColumnFormatSpec effective = 
StringColumnFormatSpec.getEffectiveFormatSpec(
+        null,
+        IndexSpec.builder().build()
+    );
+
+    
Assert.assertEquals(StringBitmapIndexType.DictionaryEncodedValueIndex.INSTANCE, 
effective.getIndexType());
+    Assert.assertEquals(MultiValueHandling.SORTED_ARRAY, 
effective.getMultiValueHandling());
+    Assert.assertNull(effective.getMaxStringLength());
+  }
+
+  @Test
+  public void testGetEffectiveFormatSpecJobLevelOverride()
+  {
+    IndexSpec indexSpec = IndexSpec.builder()
+        .withStringColumnFormatSpec(
+            StringColumnFormatSpec.builder()
+                .setMaxStringLength(50)
+                .build()
+        )
+        .build();
+
+    StringColumnFormatSpec effective = 
StringColumnFormatSpec.getEffectiveFormatSpec(null, indexSpec);
+
+    
Assert.assertEquals(StringBitmapIndexType.DictionaryEncodedValueIndex.INSTANCE, 
effective.getIndexType());
+    Assert.assertEquals(MultiValueHandling.SORTED_ARRAY, 
effective.getMultiValueHandling());
+    Assert.assertEquals(Integer.valueOf(50), effective.getMaxStringLength());
+  }
+
+  @Test
+  public void testGetEffectiveFormatSpecColumnOverridesJobLevel()
+  {
+    StringColumnFormatSpec columnSpec = StringColumnFormatSpec.builder()
+        .setMaxStringLength(20)
+        .build();
+
+    IndexSpec indexSpec = IndexSpec.builder()
+        .withStringColumnFormatSpec(
+            StringColumnFormatSpec.builder()
+                .setMaxStringLength(50)
+                .build()
+        )
+        .build();
+
+    StringColumnFormatSpec effective = 
StringColumnFormatSpec.getEffectiveFormatSpec(columnSpec, indexSpec);
+
+    Assert.assertEquals(Integer.valueOf(20), effective.getMaxStringLength());
+  }
+
+  @Test
+  public void testGetEffectiveFormatSpecColumnFallsBackToJobLevel()
+  {
+    StringColumnFormatSpec columnSpec = StringColumnFormatSpec.builder()
+        .setIndexType(StringBitmapIndexType.NoIndex.INSTANCE)
+        .build();
+
+    IndexSpec indexSpec = IndexSpec.builder()
+        .withStringColumnFormatSpec(
+            StringColumnFormatSpec.builder()
+                .setMaxStringLength(50)
+                .setMultiValueHandling(MultiValueHandling.ARRAY)
+                .build()
+        )
+        .build();
+
+    StringColumnFormatSpec effective = 
StringColumnFormatSpec.getEffectiveFormatSpec(columnSpec, indexSpec);
+
+    Assert.assertEquals(StringBitmapIndexType.NoIndex.INSTANCE, 
effective.getIndexType());
+    Assert.assertEquals(MultiValueHandling.ARRAY, 
effective.getMultiValueHandling());
+    Assert.assertEquals(Integer.valueOf(50), effective.getMaxStringLength());
+  }
+
+  @Test
+  public void testInvalidMaxStringLength()
+  {
+    final Exception exception = Assert.assertThrows(
+        Exception.class,
+        () -> StringColumnFormatSpec.builder().setMaxStringLength(-1).build()
+    );
+    Assert.assertTrue(exception.getMessage().contains("maxStringLength must be 
>= 0"));
+  }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    EqualsVerifier.forClass(StringColumnFormatSpec.class)
+                  .usingGetClass()
+                  .withPrefabValues(
+                      StringBitmapIndexType.class,
+                      
StringBitmapIndexType.DictionaryEncodedValueIndex.INSTANCE,
+                      StringBitmapIndexType.NoIndex.INSTANCE
+                  )
+                  .verify();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
index fb3688f5fa1..453b7ade29e 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -81,6 +81,7 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -466,7 +467,9 @@ public class BatchAppenderator implements Appenderator
           identifier.getVersion(),
           tuningConfig.getAppendableIndexSpec(),
           tuningConfig.getMaxRowsInMemory(),
-          maxBytesTuningConfig
+          maxBytesTuningConfig,
+          tuningConfig.getIndexSpec(),
+          Collections.emptyList()
       );
       bytesCurrentlyInMemory += calculateSinkMemoryInUsed();
       sinks.put(identifier, retVal);
@@ -1068,6 +1071,7 @@ public class BatchAppenderator implements Appenderator
         tuningConfig.getAppendableIndexSpec(),
         tuningConfig.getMaxRowsInMemory(),
         maxBytesTuningConfig,
+        tuningConfig.getIndexSpec(),
         hydrants
     );
     retVal.finishWriting(); // this sink is not writable
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 2835a7752c7..650e8eec25a 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -516,7 +516,9 @@ public class StreamAppenderator implements Appenderator
           identifier.getVersion(),
           tuningConfig.getAppendableIndexSpec(),
           tuningConfig.getMaxRowsInMemory(),
-          maxBytesTuningConfig
+          maxBytesTuningConfig,
+          tuningConfig.getIndexSpec(),
+          Collections.emptyList()
       );
       bytesCurrentlyInMemory.addAndGet(calculateSinkMemoryInUsed(retVal));
 
@@ -1412,6 +1414,7 @@ public class StreamAppenderator implements Appenderator
             tuningConfig.getAppendableIndexSpec(),
             tuningConfig.getMaxRowsInMemory(),
             maxBytesTuningConfig,
+            tuningConfig.getIndexSpec(),
             hydrants
         );
         rowsSoFar += currSink.getNumRows();
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java 
b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
index 87b2ab9cc52..422a913d1b9 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
@@ -26,12 +26,15 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.CursorFactory;
+import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentMapFunction;
@@ -64,6 +67,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
 {
@@ -81,6 +85,7 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
   private final AppendableIndexSpec appendableIndexSpec;
   private final int maxRowsInMemory;
   private final long maxBytesInMemory;
+  private final IndexSpec indexSpec;
   private final CopyOnWriteArrayList<FireHydrant> hydrants = new 
CopyOnWriteArrayList<>();
 
   private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
@@ -115,6 +120,7 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
         appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
+        null,
         Collections.emptyList()
     );
   }
@@ -127,6 +133,7 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
       AppendableIndexSpec appendableIndexSpec,
       int maxRowsInMemory,
       long maxBytesInMemory,
+      @Nullable IndexSpec indexSpec,
       List<FireHydrant> hydrants
   )
   {
@@ -137,6 +144,7 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
     this.appendableIndexSpec = appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory;
     this.maxBytesInMemory = maxBytesInMemory;
+    this.indexSpec = (indexSpec != null ? indexSpec : 
IndexSpec.getDefault()).getEffectiveSpec();
 
     int maxCount = -1;
     for (int i = 0; i < hydrants.size(); ++i) {
@@ -306,11 +314,14 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
 
   private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
   {
+    // Resolve effective dimension schemas up front because column indexers 
need the fully resolved
+    // StringColumnFormatSpec at construction time.
+    final DimensionsSpec dimensionsSpec = 
resolveEffectiveDimensionsSpec(schema.getDimensionsSpec());
     final IncrementalIndexSchema indexSchema = new 
IncrementalIndexSchema.Builder()
         .withMinTimestamp(minTimestamp)
         .withTimestampSpec(schema.getTimestampSpec())
         
.withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
-        .withDimensionsSpec(schema.getDimensionsSpec())
+        .withDimensionsSpec(dimensionsSpec)
         .withMetrics(schema.getAggregators())
         .withRollup(schema.getGranularitySpec().isRollup())
         .withProjections(schema.getProjections())
@@ -386,6 +397,17 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
     return old;
   }
 
+  private DimensionsSpec resolveEffectiveDimensionsSpec(DimensionsSpec 
dimensionsSpec)
+  {
+    final List<DimensionSchema> effectiveDimensions = 
dimensionsSpec.getDimensions()
+        .stream()
+        .map(dim -> dim.getEffectiveSchema(indexSpec))
+        .collect(Collectors.toList());
+    return DimensionsSpec.builder(dimensionsSpec)
+                         .setDimensions(effectiveDimensions)
+                         .build();
+  }
+
   /**
    * Merge the column from the index with the existing columns.
    */
diff --git 
a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
 
b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
index cdabf99198b..04cee38e87c 100644
--- 
a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java
@@ -301,12 +301,12 @@ public class CompactionStatusTest
         + " metadataCompression=none,"
         + " dimensionCompression=lz4, stringDictionaryEncoding=Utf8{},"
         + " metricCompression=lz4, longEncoding=longs, 
complexMetricCompression=null,"
-        + " autoColumnFormatSpec=null, jsonCompression=null, 
segmentLoader=null}], "
+        + " autoColumnFormatSpec=null, stringColumnFormatSpec=null, 
jsonCompression=null, segmentLoader=null}], "
         + "current[IndexSpec{bitmapSerdeFactory=RoaringBitmapSerdeFactory{},"
         + " metadataCompression=none,"
         + " dimensionCompression=zstd, stringDictionaryEncoding=Utf8{},"
         + " metricCompression=lz4, longEncoding=longs, 
complexMetricCompression=null,"
-        + " autoColumnFormatSpec=null, jsonCompression=null, 
segmentLoader=null}]"
+        + " autoColumnFormatSpec=null, stringColumnFormatSpec=null, 
jsonCompression=null, segmentLoader=null}]"
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to