This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 97623b408c5 add optional 'castToType' parameter to 'auto' column 
schema (#15417)
97623b408c5 is described below

commit 97623b408c52754cfd5a6f97b3b54e1919f6ed18
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Nov 28 17:19:23 2023 -0800

    add optional 'castToType' parameter to 'auto' column schema (#15417)
    
    * auto but.. with an expected type
---
 .../benchmark/query/SqlNestedDataBenchmark.java    |   2 +-
 .../data/input/AvroStreamInputFormatTest.java      |  10 +-
 .../scan/ExternalColumnSelectorFactory.java        |   4 +-
 .../druid/msq/util/DimensionSchemaUtils.java       |   9 +-
 .../druid/msq/util/DimensionSchemaUtilsTest.java   | 236 +++++++++++++++++++++
 .../apache/druid/data/input/orc/OrcReaderTest.java |  20 +-
 .../parquet/NestedColumnParquetReaderTest.java     |  12 +-
 .../input/protobuf/ProtobufInputFormatTest.java    |  16 +-
 .../common/task/CompactionTaskRunTest.java         | 139 ++++++++++++
 .../indexing/input/DruidSegmentReaderTest.java     | 116 +++++++++-
 .../sampler/InputSourceSamplerDiscoveryTest.java   |  42 ++--
 .../overlord/sampler/SamplerResponseTest.java      |   2 +-
 .../druid/data/input/impl/DimensionSchema.java     |   2 +-
 .../org/apache/druid/guice/NestedDataModule.java   |   2 +-
 .../druid/segment/AutoTypeColumnIndexer.java       | 135 +++++++++---
 .../apache/druid/segment/AutoTypeColumnMerger.java |  36 +++-
 .../apache/druid/segment/AutoTypeColumnSchema.java |  58 ++++-
 .../segment/NestedCommonFormatColumnHandler.java   |  21 +-
 .../druid/segment/NestedDataColumnSchema.java      |   2 +-
 .../incremental/AppendableIndexBuilder.java        |   6 -
 .../segment/incremental/IncrementalIndex.java      |   2 +-
 .../segment/nested/NestedCommonFormatColumn.java   |  16 +-
 .../nested/ScalarDoubleColumnSerializer.java       |   3 +-
 .../segment/nested/ScalarLongColumnSerializer.java |   3 +-
 .../segment/nested/VariantColumnSerializer.java    |  22 ++
 .../serde/NestedCommonFormatColumnPartSerde.java   |  31 ++-
 .../apache/druid/query/NestedDataTestUtils.java    |  12 +-
 .../druid/segment/AutoTypeColumnIndexerTest.java   | 105 ++++++++-
 .../druid/segment/filter/BaseFilterTest.java       |  14 +-
 .../druid/segment/generator/DataGeneratorTest.java |   1 -
 .../segment/incremental/IncrementalIndexTest.java  |  61 +++---
 .../nested/NestedDataColumnSupplierTest.java       |   4 +-
 .../nested/ScalarDoubleColumnSupplierTest.java     |   2 +-
 .../nested/ScalarLongColumnSupplierTest.java       |   2 +-
 .../nested/ScalarStringColumnSupplierTest.java     |   2 +-
 .../segment/nested/VariantColumnSupplierTest.java  |   3 +-
 .../sql/calcite/CalciteNestedDataQueryTest.java    |  14 +-
 37 files changed, 973 insertions(+), 194 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
index 1915776dbca..4c49f918094 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
@@ -298,7 +298,7 @@ public class SqlNestedDataBenchmark
     );
     List<DimensionSchema> dims = ImmutableList.<DimensionSchema>builder()
                                               
.addAll(schemaInfo.getDimensionsSpec().getDimensions())
-                                              .add(new 
AutoTypeColumnSchema("nested"))
+                                              .add(new 
AutoTypeColumnSchema("nested", null))
                                               .build();
     DimensionsSpec dimsSpec = new DimensionsSpec(dims);
 
diff --git 
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
 
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
index 817b0c06ab1..a13eb9b5645 100644
--- 
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
+++ 
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
@@ -303,15 +303,15 @@ public class AvroStreamInputFormatTest extends 
InitializedNullHandlingTest
 
     DimensionsSpec dimensionsSpec = new DimensionsSpec(
         ImmutableList.of(
-            new AutoTypeColumnSchema("someIntValueMap"),
-            new AutoTypeColumnSchema("someStringValueMap"),
-            new AutoTypeColumnSchema("someRecord"),
-            new AutoTypeColumnSchema("someRecordArray"),
+            new AutoTypeColumnSchema("someIntValueMap", null),
+            new AutoTypeColumnSchema("someStringValueMap", null),
+            new AutoTypeColumnSchema("someRecord", null),
+            new AutoTypeColumnSchema("someRecordArray", null),
             new LongDimensionSchema("tSomeIntValueMap8"),
             new LongDimensionSchema("tSomeIntValueMap8_2"),
             new StringDimensionSchema("tSomeStringValueMap8"),
             new LongDimensionSchema("tSomeRecordSubLong"),
-            new AutoTypeColumnSchema("tSomeRecordArray0"),
+            new AutoTypeColumnSchema("tSomeRecordArray0", null),
             new StringDimensionSchema("tSomeRecordArray0nestedString")
         )
     );
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ExternalColumnSelectorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ExternalColumnSelectorFactory.java
index fc9f59ad32c..ed76066af4e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ExternalColumnSelectorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ExternalColumnSelectorFactory.java
@@ -107,7 +107,7 @@ public class ExternalColumnSelectorFactory implements 
ColumnSelectorFactory
           if (expressionType == null) {
             return delegateDimensionSelector.getObject();
           }
-          return ExprEval.ofType(expressionType, 
delegateDimensionSelector.getObject()).value();
+          return 
ExprEval.bestEffortOf(delegateDimensionSelector.getObject()).castTo(expressionType).value();
         }
         catch (Exception e) {
           throw createException(e, dimensionSpec.getDimension(), inputSource, 
offset);
@@ -211,7 +211,7 @@ public class ExternalColumnSelectorFactory implements 
ColumnSelectorFactory
           if (expressionType == null) {
             return delegateColumnValueSelector.getObject();
           }
-          return ExprEval.ofType(expressionType, 
delegateColumnValueSelector.getObject()).value();
+          return 
ExprEval.bestEffortOf(delegateColumnValueSelector.getObject()).castTo(expressionType).value();
         }
         catch (Exception e) {
           throw createException(e, columnName, inputSource, offset);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java
index e1fbbeb3453..748d411c97c 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java
@@ -73,7 +73,10 @@ public class DimensionSchemaUtils
                                     .getDimensionSchema(capabilities);
       }
 
-      return new AutoTypeColumnSchema(column);
+      if (type != null && (type.isPrimitive() || type.isPrimitiveArray())) {
+        return new AutoTypeColumnSchema(column, type);
+      }
+      return new AutoTypeColumnSchema(column, null);
     } else {
       // if schema information is not available, create a string dimension
       if (type == null) {
@@ -102,12 +105,12 @@ public class DimensionSchemaUtils
             return new StringDimensionSchema(column, 
DimensionSchema.MultiValueHandling.ARRAY, null);
           } else {
             // arrayIngestMode == ArrayIngestMode.ARRAY would be true
-            return new AutoTypeColumnSchema(column);
+            return new AutoTypeColumnSchema(column, type);
           }
         } else if (elementType.isNumeric()) {
           // ValueType == LONG || ValueType == FLOAT || ValueType == DOUBLE
           if (arrayIngestMode == ArrayIngestMode.ARRAY) {
-            return new AutoTypeColumnSchema(column);
+            return new AutoTypeColumnSchema(column, type);
           } else {
             throw InvalidInput.exception(
                 "Numeric arrays can only be ingested when '%s' is set to 
'array' in the MSQ query's context. "
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/DimensionSchemaUtilsTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/DimensionSchemaUtilsTest.java
new file mode 100644
index 00000000000..a82f5a35f9c
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/DimensionSchemaUtilsTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.util;
+
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.AutoTypeColumnSchema;
+import org.apache.druid.segment.column.ColumnType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DimensionSchemaUtilsTest
+{
+
+  @Test
+  public void testSchemaScalars()
+  {
+    for (ArrayIngestMode mode : ArrayIngestMode.values()) {
+      DimensionSchema dimensionSchema = 
DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.LONG,
+          false,
+          mode
+      );
+      DimensionSchema expected = new LongDimensionSchema("x");
+      Assert.assertEquals(expected, dimensionSchema);
+
+      dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.DOUBLE,
+          false,
+          mode
+      );
+      expected = new DoubleDimensionSchema("x");
+      Assert.assertEquals(expected, dimensionSchema);
+
+      dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.FLOAT,
+          false,
+          mode
+      );
+      expected = new FloatDimensionSchema("x");
+      Assert.assertEquals(expected, dimensionSchema);
+
+      dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.STRING,
+          false,
+          mode
+      );
+      expected = new StringDimensionSchema("x");
+      Assert.assertEquals(expected, dimensionSchema);
+    }
+  }
+
+  @Test
+  public void testSchemaForceAuto()
+  {
+    for (ArrayIngestMode mode : ArrayIngestMode.values()) {
+      DimensionSchema dimensionSchema = 
DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.LONG,
+          true,
+          mode
+      );
+      DimensionSchema expected = new AutoTypeColumnSchema("x", 
ColumnType.LONG);
+      Assert.assertEquals(expected, dimensionSchema);
+
+      dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.DOUBLE,
+          true,
+          mode
+      );
+      expected = new AutoTypeColumnSchema("x", ColumnType.DOUBLE);
+      Assert.assertEquals(expected, dimensionSchema);
+
+      dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.FLOAT,
+          true,
+          mode
+      );
+      expected = new AutoTypeColumnSchema("x", ColumnType.FLOAT);
+      Assert.assertEquals(expected, dimensionSchema);
+
+      dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.STRING,
+          true,
+          mode
+      );
+      expected = new AutoTypeColumnSchema("x", ColumnType.STRING);
+      Assert.assertEquals(expected, dimensionSchema);
+
+
+      dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.LONG_ARRAY,
+          true,
+          mode
+      );
+      expected = new AutoTypeColumnSchema("x", ColumnType.LONG_ARRAY);
+      Assert.assertEquals(expected, dimensionSchema);
+
+      dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.DOUBLE_ARRAY,
+          true,
+          mode
+      );
+      expected = new AutoTypeColumnSchema("x", ColumnType.DOUBLE_ARRAY);
+      Assert.assertEquals(expected, dimensionSchema);
+
+      dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.FLOAT_ARRAY,
+          true,
+          mode
+      );
+      expected = new AutoTypeColumnSchema("x", ColumnType.FLOAT_ARRAY);
+      Assert.assertEquals(expected, dimensionSchema);
+
+      dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.STRING_ARRAY,
+          true,
+          mode
+      );
+      expected = new AutoTypeColumnSchema("x", ColumnType.STRING_ARRAY);
+      Assert.assertEquals(expected, dimensionSchema);
+
+      dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+          "x",
+          ColumnType.NESTED_DATA,
+          true,
+          mode
+      );
+      expected = new AutoTypeColumnSchema("x", null);
+      Assert.assertEquals(expected, dimensionSchema);
+    }
+  }
+
+  @Test
+  public void testSchemaMvdMode()
+  {
+    DimensionSchema dimensionSchema = 
DimensionSchemaUtils.createDimensionSchema(
+        "x",
+        ColumnType.STRING_ARRAY,
+        false,
+        ArrayIngestMode.MVD
+    );
+    DimensionSchema expected = new StringDimensionSchema("x", 
DimensionSchema.MultiValueHandling.ARRAY, null);
+    Assert.assertEquals(expected, dimensionSchema);
+
+    Throwable t = Assert.assertThrows(
+        DruidException.class,
+        () -> DimensionSchemaUtils.createDimensionSchema("x", 
ColumnType.LONG_ARRAY, false, ArrayIngestMode.MVD)
+    );
+    Assert.assertEquals("Numeric arrays can only be ingested when 
'arrayIngestMode' is set to 'array' in the MSQ query's context. Current value 
of the parameter [mvd]", t.getMessage());
+
+    t = Assert.assertThrows(
+        DruidException.class,
+        () -> DimensionSchemaUtils.createDimensionSchema("x", 
ColumnType.DOUBLE_ARRAY, false, ArrayIngestMode.MVD)
+    );
+    Assert.assertEquals("Numeric arrays can only be ingested when 
'arrayIngestMode' is set to 'array' in the MSQ query's context. Current value 
of the parameter [mvd]", t.getMessage());
+
+    t = Assert.assertThrows(
+        DruidException.class,
+        () -> DimensionSchemaUtils.createDimensionSchema("x", 
ColumnType.FLOAT_ARRAY, false, ArrayIngestMode.MVD)
+    );
+    Assert.assertEquals("Numeric arrays can only be ingested when 
'arrayIngestMode' is set to 'array' in the MSQ query's context. Current value 
of the parameter [mvd]", t.getMessage());
+  }
+
+  @Test
+  public void testSchemaArrayMode()
+  {
+    DimensionSchema dimensionSchema = 
DimensionSchemaUtils.createDimensionSchema(
+        "x",
+        ColumnType.STRING_ARRAY,
+        false,
+        ArrayIngestMode.ARRAY
+    );
+    DimensionSchema expected = new AutoTypeColumnSchema("x", 
ColumnType.STRING_ARRAY);
+    Assert.assertEquals(expected, dimensionSchema);
+
+    dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+        "x",
+        ColumnType.LONG_ARRAY,
+        false,
+        ArrayIngestMode.ARRAY
+    );
+    expected = new AutoTypeColumnSchema("x", ColumnType.LONG_ARRAY);
+    Assert.assertEquals(expected, dimensionSchema);
+
+    dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+        "x",
+        ColumnType.DOUBLE_ARRAY,
+        false,
+        ArrayIngestMode.ARRAY
+    );
+    expected = new AutoTypeColumnSchema("x", ColumnType.DOUBLE_ARRAY);
+    Assert.assertEquals(expected, dimensionSchema);
+
+    dimensionSchema = DimensionSchemaUtils.createDimensionSchema(
+        "x",
+        ColumnType.FLOAT_ARRAY,
+        false,
+        ArrayIngestMode.ARRAY
+    );
+    expected = new AutoTypeColumnSchema("x", ColumnType.FLOAT_ARRAY);
+    Assert.assertEquals(expected, dimensionSchema);
+  }
+}
diff --git 
a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
 
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
index 9a9767673f5..c7338e1a28f 100644
--- 
a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
+++ 
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
@@ -333,9 +333,9 @@ public class OrcReaderTest extends 
InitializedNullHandlingTest
         new TimestampSpec("ts", "millis", null),
         new DimensionsSpec(
             ImmutableList.of(
-                new AutoTypeColumnSchema("middle"),
-                new AutoTypeColumnSchema("list"),
-                new AutoTypeColumnSchema("map")
+                new AutoTypeColumnSchema("middle", null),
+                new AutoTypeColumnSchema("list", null),
+                new AutoTypeColumnSchema("map", null)
             )
         ),
         inputFormat,
@@ -542,8 +542,8 @@ public class OrcReaderTest extends 
InitializedNullHandlingTest
         new TimestampSpec("timestamp", "auto", null),
         new DimensionsSpec(
             ImmutableList.of(
-                new AutoTypeColumnSchema("a"),
-                new AutoTypeColumnSchema("b")
+                new AutoTypeColumnSchema("a", null),
+                new AutoTypeColumnSchema("b", null)
             )
         ),
         inputFormat,
@@ -608,11 +608,11 @@ public class OrcReaderTest extends 
InitializedNullHandlingTest
         new TimestampSpec("timestamp", "auto", null),
         new DimensionsSpec(
             ImmutableList.of(
-                new AutoTypeColumnSchema("a"),
-                new AutoTypeColumnSchema("b"),
-                new AutoTypeColumnSchema("c"),
-                new AutoTypeColumnSchema("d"),
-                new AutoTypeColumnSchema("t_d_0")
+                new AutoTypeColumnSchema("a", null),
+                new AutoTypeColumnSchema("b", null),
+                new AutoTypeColumnSchema("c", null),
+                new AutoTypeColumnSchema("d", null),
+                new AutoTypeColumnSchema("t_d_0", null)
             )
         ),
         inputFormat,
diff --git 
a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java
 
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java
index e1e6508a187..24205993eb6 100644
--- 
a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java
+++ 
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java
@@ -51,8 +51,8 @@ public class NestedColumnParquetReaderTest extends 
BaseParquetReaderTest
         new TimestampSpec("timestamp", "auto", null),
         new DimensionsSpec(
             ImmutableList.of(
-                new AutoTypeColumnSchema("nestedData"),
-                new AutoTypeColumnSchema("t_nestedData_listDim"),
+                new AutoTypeColumnSchema("nestedData", null),
+                new AutoTypeColumnSchema("t_nestedData_listDim", null),
                 new StringDimensionSchema("t_nestedData_listDim_string"),
                 new StringDimensionSchema("t_nestedData_dim2"),
                 new LongDimensionSchema("t_nestedData_dim3"),
@@ -105,10 +105,10 @@ public class NestedColumnParquetReaderTest extends 
BaseParquetReaderTest
         new TimestampSpec("timestamp", "auto", null),
         new DimensionsSpec(
             ImmutableList.of(
-                new AutoTypeColumnSchema("a1"),
-                new AutoTypeColumnSchema("a2"),
-                new AutoTypeColumnSchema("t_a2"),
-                new AutoTypeColumnSchema("t_a1_b1"),
+                new AutoTypeColumnSchema("a1", null),
+                new AutoTypeColumnSchema("a2", null),
+                new AutoTypeColumnSchema("t_a2", null),
+                new AutoTypeColumnSchema("t_a1_b1", null),
                 new LongDimensionSchema("t_a1_b1_c1"),
                 new LongDimensionSchema("t_e2_0_b1"),
                 new LongDimensionSchema("tt_a2_0_b1")
diff --git 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
index 8fd280ca7e4..0735e80964c 100644
--- 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
+++ 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
@@ -271,14 +271,14 @@ public class ProtobufInputFormatTest
             timestampSpec,
             new DimensionsSpec(
                 Lists.newArrayList(
-                    new AutoTypeColumnSchema("event"),
-                    new AutoTypeColumnSchema("id"),
-                    new AutoTypeColumnSchema("someOtherId"),
-                    new AutoTypeColumnSchema("isValid"),
-                    new AutoTypeColumnSchema("eventType"),
-                    new AutoTypeColumnSchema("foo"),
-                    new AutoTypeColumnSchema("bar"),
-                    new AutoTypeColumnSchema("someBytesColumn")
+                    new AutoTypeColumnSchema("event", null),
+                    new AutoTypeColumnSchema("id", null),
+                    new AutoTypeColumnSchema("someOtherId", null),
+                    new AutoTypeColumnSchema("isValid", null),
+                    new AutoTypeColumnSchema("eventType", null),
+                    new AutoTypeColumnSchema("foo", null),
+                    new AutoTypeColumnSchema("bar", null),
+                    new AutoTypeColumnSchema("someBytesColumn", null)
                 )
             ),
             null
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index e78068086f2..78c4f1fabbf 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -69,6 +69,7 @@ import 
org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.segment.AutoTypeColumnSchema;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.Cursor;
@@ -76,6 +77,7 @@ import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndexStorageAdapter;
 import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.join.NoopJoinableFactory;
 import org.apache.druid.segment.loading.LocalDataSegmentPuller;
@@ -1722,6 +1724,143 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     Assert.assertEquals(spatialrows, rowsFromSegment);
   }
 
+  @Test
+  public void testRunWithAutoCastDimensions() throws Exception
+  {
+    final List<String> rows = ImmutableList.of(
+        "2014-01-01T00:00:10Z,a,10,100,1\n",
+        "2014-01-01T00:00:10Z,b,20,110,2\n",
+        "2014-01-01T00:00:10Z,c,30,120,3\n",
+        "2014-01-01T01:00:20Z,a,10,100,1\n",
+        "2014-01-01T01:00:20Z,b,20,110,2\n",
+        "2014-01-01T01:00:20Z,c,30,120,3\n"
+    );
+    final ParseSpec spec = new CSVParseSpec(
+        new TimestampSpec("ts", "auto", null),
+        DimensionsSpec.builder()
+                      .setDimensions(Arrays.asList(
+                          new AutoTypeColumnSchema("ts", ColumnType.STRING),
+                          new AutoTypeColumnSchema("dim", null),
+                          new AutoTypeColumnSchema("x", ColumnType.LONG),
+                          new AutoTypeColumnSchema("y", ColumnType.LONG)
+                      ))
+                      .build(),
+        "|",
+        Arrays.asList("ts", "dim", "x", "y", "val"),
+        false,
+        0
+    );
+    runIndexTask(null, null, spec, rows, false);
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentCacheManagerFactory,
+        RETRY_POLICY_FACTORY
+    );
+
+    final CompactionTask compactionTask = builder
+        .interval(Intervals.of("2014-01-01/2014-01-02"))
+        .build();
+
+    final Pair<TaskStatus, List<DataSegment>> resultPair = 
runTask(compactionTask);
+
+    Assert.assertTrue(resultPair.lhs.isSuccess());
+
+    final List<DataSegment> segments = resultPair.rhs;
+    Assert.assertEquals(2, segments.size());
+
+    for (int i = 0; i < 2; i++) {
+      Assert.assertEquals(
+          Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
+          segments.get(i).getInterval()
+      );
+      Map<String, String> expectedLongSumMetric = new HashMap<>();
+      expectedLongSumMetric.put("name", "val");
+      expectedLongSumMetric.put("type", "longSum");
+      expectedLongSumMetric.put("fieldName", "val");
+      Assert.assertEquals(
+          getDefaultCompactionState(
+              Granularities.HOUR,
+              Granularities.MINUTE,
+              
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1)),
+              DimensionsSpec.builder()
+                            .setDimensions(Arrays.asList(
+                                // check explicitly specified types are 
preserved
+                                new AutoTypeColumnSchema("ts", 
ColumnType.STRING),
+                                new AutoTypeColumnSchema("dim", null),
+                                new AutoTypeColumnSchema("x", ColumnType.LONG),
+                                new AutoTypeColumnSchema("y", ColumnType.LONG)
+                            ))
+                            .build(),
+              expectedLongSumMetric
+          ),
+          segments.get(i).getLastCompactionState()
+      );
+      if (lockGranularity == LockGranularity.SEGMENT) {
+        Assert.assertEquals(
+            new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
+            segments.get(i).getShardSpec()
+        );
+      } else {
+        Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(i).getShardSpec());
+      }
+    }
+
+    final File cacheDir = temporaryFolder.newFolder();
+    final SegmentCacheManager segmentCacheManager = 
segmentCacheManagerFactory.manufacturate(cacheDir);
+
+    List<String> rowsFromSegment = new ArrayList<>();
+    for (DataSegment segment : segments) {
+      final File segmentFile = segmentCacheManager.getSegmentFiles(segment);
+
+      final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
+          new 
QueryableIndexStorageAdapter(testUtils.getTestIndexIO().loadIndex(segmentFile)),
+          segment.getInterval()
+      );
+      final Sequence<Cursor> cursorSequence = adapter.getAdapter().makeCursors(
+          null,
+          segment.getInterval(),
+          VirtualColumns.EMPTY,
+          Granularities.ALL,
+          false,
+          null
+      );
+
+      cursorSequence.accumulate(rowsFromSegment, (accumulated, cursor) -> {
+        cursor.reset();
+        final ColumnSelectorFactory factory = 
cursor.getColumnSelectorFactory();
+        Assert.assertEquals(ColumnType.STRING, 
factory.getColumnCapabilities("ts").toColumnType());
+        Assert.assertEquals(ColumnType.STRING, 
factory.getColumnCapabilities("dim").toColumnType());
+        Assert.assertEquals(ColumnType.LONG, 
factory.getColumnCapabilities("x").toColumnType());
+        Assert.assertEquals(ColumnType.LONG, 
factory.getColumnCapabilities("y").toColumnType());
+        while (!cursor.isDone()) {
+          final ColumnValueSelector<?> selector1 = 
factory.makeColumnValueSelector("ts");
+          final DimensionSelector selector2 = 
factory.makeDimensionSelector(new DefaultDimensionSpec("dim", "dim"));
+          final DimensionSelector selector3 = 
factory.makeDimensionSelector(new DefaultDimensionSpec("x", "x"));
+          final DimensionSelector selector4 = 
factory.makeDimensionSelector(new DefaultDimensionSpec("y", "y"));
+          final DimensionSelector selector5 = 
factory.makeDimensionSelector(new DefaultDimensionSpec("val", "val"));
+
+
+          rowsFromSegment.add(
+              StringUtils.format(
+                  "%s,%s,%s,%s,%s\n",
+                  selector1.getObject(),
+                  selector2.getObject(),
+                  selector3.getObject(),
+                  selector4.getObject(),
+                  selector5.getObject()
+              )
+          );
+
+          cursor.advance();
+        }
+
+        return accumulated;
+      });
+    }
+    Assert.assertEquals(rows, rowsFromSegment);
+  }
+
   private Pair<TaskStatus, List<DataSegment>> runIndexTask() throws Exception
   {
     return runIndexTask(null, null, false);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
index 09cec378b09..767f2b3ebab 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
@@ -56,6 +56,7 @@ import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.loading.NoopSegmentCacheManager;
@@ -693,7 +694,7 @@ public class DruidSegmentReaderTest extends 
InitializedNullHandlingTest
         ImmutableList.of(
             StringDimensionSchema.create("strCol"),
             new DoubleDimensionSchema("dblCol"),
-            new AutoTypeColumnSchema("arrayCol")
+            new AutoTypeColumnSchema("arrayCol", null)
         )
     );
     List<AggregatorFactory> metrics = ImmutableList.of(
@@ -767,7 +768,7 @@ public class DruidSegmentReaderTest extends 
InitializedNullHandlingTest
             ImmutableList.of(
                 StringDimensionSchema.create("strCol"),
                 new DoubleDimensionSchema("dblCol"),
-                new AutoTypeColumnSchema("arrayCol")
+                new AutoTypeColumnSchema("arrayCol", null)
             )
         ),
         ColumnsFilter.all(),
@@ -796,6 +797,117 @@ public class DruidSegmentReaderTest extends 
InitializedNullHandlingTest
 
   }
 
+  @Test
+  public void testArrayColumnsCast() throws IOException
+  {
+    // make our own stuff here so that we don't pollute the shared spec, rows, 
and segment defined in setup and
+    // break all the other tests
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        ImmutableList.of(
+            StringDimensionSchema.create("strCol"),
+            new DoubleDimensionSchema("dblCol"),
+            new AutoTypeColumnSchema("arrayCol", ColumnType.STRING_ARRAY)
+        )
+    );
+    List<AggregatorFactory> metrics = ImmutableList.of(
+        new CountAggregatorFactory("cnt"),
+        new HyperUniquesAggregatorFactory("met_s", "strCol")
+    );
+    final List<InputRow> rows = ImmutableList.of(
+        new MapBasedInputRow(
+            DateTimes.of("2000"),
+            ImmutableList.of("strCol", "dblCol", "arrayCol"),
+            ImmutableMap.<String, Object>builder()
+                        .put("strCol", "foo")
+                        .put("dblCol", 1.23)
+                        .put("arrayCol", ImmutableList.of("a", "b", "c"))
+                        .build()
+        ),
+        new MapBasedInputRow(
+            DateTimes.of("2000T01"),
+            ImmutableList.of("strCol", "dblCol", "arrayCol"),
+            ImmutableMap.<String, Object>builder()
+                        .put("strCol", "bar")
+                        .put("dblCol", 4.56)
+                        .put("arrayCol", ImmutableList.of(1L, 2L, 3L))
+                        .build()
+        )
+    );
+
+    InputStats inputStats = new InputStatsImpl();
+    final IncrementalIndex incrementalIndex =
+        IndexBuilder.create()
+                    .schema(
+                        new IncrementalIndexSchema.Builder()
+                            .withDimensionsSpec(dimensionsSpec)
+                            .withMetrics(metrics.toArray(new 
AggregatorFactory[0]))
+                            .withRollup(false)
+                            .build()
+                    )
+                    .rows(rows)
+                    .buildIncrementalIndex();
+
+    File segmentDirectory = temporaryFolder.newFolder();
+    long segmentSize;
+    try {
+      TestHelper.getTestIndexMergerV9(
+          OnHeapMemorySegmentWriteOutMediumFactory.instance()
+      ).persist(
+          incrementalIndex,
+          segmentDirectory,
+          IndexSpec.DEFAULT,
+          null
+      );
+      segmentSize = FileUtils.getFileSize(segmentDirectory);
+    }
+    finally {
+      incrementalIndex.close();
+    }
+    InputEntity entity = new BytesCountingInputEntity(
+        makeInputEntity(
+            Intervals.of("2000/P1D"),
+            segmentDirectory,
+            ImmutableList.of("strCol", "dblCol", "arrayCol"),
+            ImmutableList.of("cnt", "met_s")
+        ),
+        inputStats
+    );
+    final DruidSegmentReader reader = new DruidSegmentReader(
+        entity,
+        indexIO,
+        new TimestampSpec("__time", "millis", DateTimes.of("1971")),
+        new DimensionsSpec(
+            ImmutableList.of(
+                StringDimensionSchema.create("strCol"),
+                new DoubleDimensionSchema("dblCol"),
+                new AutoTypeColumnSchema("arrayCol", ColumnType.STRING_ARRAY)
+            )
+        ),
+        ColumnsFilter.all(),
+        null,
+        temporaryFolder.newFolder()
+    );
+
+    List<InputRow> readRows = readRows(reader);
+
+    Assert.assertEquals(ImmutableList.of("strCol", "dblCol", "arrayCol"), 
readRows.get(0).getDimensions());
+    Assert.assertEquals(DateTimes.of("2000T").getMillis(), 
readRows.get(0).getTimestampFromEpoch());
+    Assert.assertEquals("foo", readRows.get(0).getRaw("strCol"));
+    Assert.assertEquals(1.23, readRows.get(0).getRaw("dblCol"));
+    Assert.assertArrayEquals(new Object[]{"a", "b", "c"}, (Object[]) 
readRows.get(0).getRaw("arrayCol"));
+    Assert.assertEquals(1L, readRows.get(0).getRaw("cnt"));
+    Assert.assertEquals(makeHLLC("foo"), readRows.get(0).getRaw("met_s"));
+
+    Assert.assertEquals(DateTimes.of("2000T1").getMillis(), 
readRows.get(1).getTimestampFromEpoch());
+    Assert.assertEquals("bar", readRows.get(1).getRaw("strCol"));
+    Assert.assertEquals(4.56, readRows.get(1).getRaw("dblCol"));
+    Assert.assertArrayEquals(new Object[]{"1", "2", "3"}, (Object[]) 
readRows.get(1).getRaw("arrayCol"));
+    Assert.assertEquals(1L, readRows.get(1).getRaw("cnt"));
+    Assert.assertEquals(makeHLLC("bar"), readRows.get(1).getRaw("met_s"));
+
+    Assert.assertEquals(segmentSize, inputStats.getProcessedBytes());
+  }
+
   private InputEntity makeInputEntity(final Interval interval)
   {
     return new BytesCountingInputEntity(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerDiscoveryTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerDiscoveryTest.java
index 63949ae54b9..c486c15f0f2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerDiscoveryTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerDiscoveryTest.java
@@ -87,21 +87,21 @@ public class InputSourceSamplerDiscoveryTest extends 
InitializedNullHandlingTest
               new DoubleDimensionSchema("double"),
               new StringDimensionSchema("bool"),
               new StringDimensionSchema("variant"),
-              new AutoTypeColumnSchema("array"),
-              new AutoTypeColumnSchema("nested")
+              new AutoTypeColumnSchema("array", null),
+              new AutoTypeColumnSchema("nested", null)
           ),
           response.getLogicalDimensions()
       );
 
       Assert.assertEquals(
           ImmutableList.of(
-              new AutoTypeColumnSchema("string"),
-              new AutoTypeColumnSchema("long"),
-              new AutoTypeColumnSchema("double"),
-              new AutoTypeColumnSchema("bool"),
-              new AutoTypeColumnSchema("variant"),
-              new AutoTypeColumnSchema("array"),
-              new AutoTypeColumnSchema("nested")
+              new AutoTypeColumnSchema("string", null),
+              new AutoTypeColumnSchema("long", null),
+              new AutoTypeColumnSchema("double", null),
+              new AutoTypeColumnSchema("bool", null),
+              new AutoTypeColumnSchema("variant", null),
+              new AutoTypeColumnSchema("array", null),
+              new AutoTypeColumnSchema("nested", null)
           ),
           response.getPhysicalDimensions()
       );
@@ -152,21 +152,21 @@ public class InputSourceSamplerDiscoveryTest extends 
InitializedNullHandlingTest
             new DoubleDimensionSchema("double"),
             new LongDimensionSchema("bool"),
             new StringDimensionSchema("variant"),
-            new AutoTypeColumnSchema("array"),
-            new AutoTypeColumnSchema("nested")
+            new AutoTypeColumnSchema("array", null),
+            new AutoTypeColumnSchema("nested", null)
         ),
         response.getLogicalDimensions()
     );
 
     Assert.assertEquals(
         ImmutableList.of(
-            new AutoTypeColumnSchema("string"),
-            new AutoTypeColumnSchema("long"),
-            new AutoTypeColumnSchema("double"),
-            new AutoTypeColumnSchema("bool"),
-            new AutoTypeColumnSchema("variant"),
-            new AutoTypeColumnSchema("array"),
-            new AutoTypeColumnSchema("nested")
+            new AutoTypeColumnSchema("string", null),
+            new AutoTypeColumnSchema("long", null),
+            new AutoTypeColumnSchema("double", null),
+            new AutoTypeColumnSchema("bool", null),
+            new AutoTypeColumnSchema("variant", null),
+            new AutoTypeColumnSchema("array", null),
+            new AutoTypeColumnSchema("nested", null)
         ),
         response.getPhysicalDimensions()
     );
@@ -256,9 +256,9 @@ public class InputSourceSamplerDiscoveryTest extends 
InitializedNullHandlingTest
                              new LongDimensionSchema("long"),
                              new DoubleDimensionSchema("double"),
                              new StringDimensionSchema("bool"),
-                             new AutoTypeColumnSchema("variant"),
-                             new AutoTypeColumnSchema("array"),
-                             new AutoTypeColumnSchema("nested")
+                             new AutoTypeColumnSchema("variant", null),
+                             new AutoTypeColumnSchema("array", null),
+                             new AutoTypeColumnSchema("nested", null)
             )
         ).build(),
         null,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java
index b8693b86dcd..f552ed076f4 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java
@@ -65,7 +65,7 @@ public class SamplerResponseTest
                 new StringDimensionSchema("dim1")
             ),
             ImmutableList.of(
-                new AutoTypeColumnSchema("dim1")
+                new AutoTypeColumnSchema("dim1", null)
             ),
             RowSignature.builder().addTimeColumn().add("dim1", 
ColumnType.STRING).add("met1", ColumnType.LONG).build(),
             data
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java
index 9b692ecb7c5..a65ddb7b9ae 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java
@@ -69,7 +69,7 @@ public abstract class DimensionSchema
         return new DoubleDimensionSchema(name);
       default:
         // the auto column indexer can handle any type
-        return new AutoTypeColumnSchema(name);
+        return new AutoTypeColumnSchema(name, null);
     }
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java 
b/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java
index 247d83af81e..daaf4ff2f65 100644
--- a/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java
+++ b/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java
@@ -112,7 +112,7 @@ public class NestedDataModule implements DruidModule
     @Override
     public DimensionHandler<StructuredData, StructuredData, StructuredData> 
get(String dimensionName)
     {
-      return new NestedCommonFormatColumnHandler(dimensionName);
+      return new NestedCommonFormatColumnHandler(dimensionName, null);
     }
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnIndexer.java 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnIndexer.java
index 799b1293d16..3ccde4221ae 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnIndexer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnIndexer.java
@@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.math.expr.Evals;
 import org.apache.druid.math.expr.ExprEval;
 import org.apache.druid.math.expr.ExpressionType;
@@ -77,7 +78,14 @@ public class AutoTypeColumnIndexer implements 
DimensionIndexer<StructuredData, S
   protected SortedMap<String, FieldIndexer> fieldIndexers = new TreeMap<>();
   protected final ValueDictionary globalDictionary = new ValueDictionary();
 
-  int estimatedFieldKeySize = 0;
+  protected int estimatedFieldKeySize = 0;
+
+  private final String columnName;
+  @Nullable
+  protected final ColumnType castToType;
+  @Nullable
+  protected final ExpressionType castToExpressionType;
+
 
   protected final StructuredDataProcessor indexerProcessor = new 
StructuredDataProcessor()
   {
@@ -121,6 +129,18 @@ public class AutoTypeColumnIndexer implements 
DimensionIndexer<StructuredData, S
     }
   };
 
+  public AutoTypeColumnIndexer(String name, @Nullable ColumnType castToType)
+  {
+    this.columnName = name;
+    if (castToType != null && (castToType.isPrimitive() || 
castToType.isPrimitiveArray())) {
+      this.castToType = castToType;
+      this.castToExpressionType = 
ExpressionType.fromColumnTypeStrict(castToType);
+    } else {
+      this.castToType = null;
+      this.castToExpressionType = null;
+    }
+  }
+
   @Override
   public EncodedKeyComponent<StructuredData> 
processRowValsToUnsortedEncodedKeyComponent(
       @Nullable Object dimValues,
@@ -133,6 +153,52 @@ public class AutoTypeColumnIndexer implements 
DimensionIndexer<StructuredData, S
     } else if (isConstant) {
       isConstant = Objects.equals(dimValues, constantValue);
     }
+
+    if (castToExpressionType != null) {
+      return processCast(dimValues);
+    } else {
+      return processAuto(dimValues);
+    }
+  }
+
+  /**
+   * Process values which will all be cast to {@link #castToExpressionType}. 
This method should not be used for
+   * and does not handle actual nested data structures, use {@link 
#processAuto(Object)} instead.
+   */
+  private EncodedKeyComponent<StructuredData> processCast(@Nullable Object 
dimValues)
+  {
+    final long oldDictSizeInBytes = globalDictionary.sizeInBytes();
+    final int oldFieldKeySize = estimatedFieldKeySize;
+    ExprEval<?> eval = ExprEval.bestEffortOf(dimValues);
+    try {
+      eval = eval.castTo(castToExpressionType);
+    }
+    catch (IAE invalidCast) {
+      throw new ParseException(eval.asString(), invalidCast, "Cannot coerce 
column [%s] input to requested type [%s]", columnName, castToType);
+    }
+
+    FieldIndexer fieldIndexer = 
fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT);
+    if (fieldIndexer == null) {
+      estimatedFieldKeySize += 
StructuredDataProcessor.estimateStringSize(NestedPathFinder.JSON_PATH_ROOT);
+      fieldIndexer = new FieldIndexer(globalDictionary);
+      fieldIndexers.put(NestedPathFinder.JSON_PATH_ROOT, fieldIndexer);
+    }
+    StructuredDataProcessor.ProcessedValue<?> rootValue = 
fieldIndexer.processValue(eval);
+    long effectiveSizeBytes = rootValue.getSize();
+    // then, we add the delta of size change to the global dictionaries to 
account for any new space added by the
+    // 'raw' data
+    effectiveSizeBytes += (globalDictionary.sizeInBytes() - 
oldDictSizeInBytes);
+    effectiveSizeBytes += (estimatedFieldKeySize - oldFieldKeySize);
+    return new EncodedKeyComponent<>(StructuredData.wrap(eval.value()), 
effectiveSizeBytes);
+  }
+
+  /**
+   * Process potentially nested data using {@link #indexerProcessor}, a {@link 
StructuredDataProcessor} which visits
+   * all children to catalog values into the {@link #globalDictionary}, 
building {@link FieldIndexer} along the way
+   * for each primitive or array primitive value encountered.
+   */
+  private EncodedKeyComponent<StructuredData> processAuto(@Nullable Object 
dimValues)
+  {
     final long oldDictSizeInBytes = globalDictionary.sizeInBytes();
     final int oldFieldKeySize = estimatedFieldKeySize;
     final StructuredData data;
@@ -332,6 +398,9 @@ public class AutoTypeColumnIndexer implements 
DimensionIndexer<StructuredData, S
 
   public ColumnType getLogicalType()
   {
+    if (castToType != null) {
+      return castToType;
+    }
     if (hasNestedData) {
       return ColumnType.NESTED_DATA;
     }
@@ -370,7 +439,7 @@ public class AutoTypeColumnIndexer implements 
DimensionIndexer<StructuredData, S
   @Override
   public ColumnFormat getFormat()
   {
-    return new Format(getLogicalType(), hasNulls);
+    return new Format(getLogicalType(), hasNulls, castToType != null);
   }
 
   @Override
@@ -604,27 +673,31 @@ public class AutoTypeColumnIndexer implements 
DimensionIndexer<StructuredData, S
           }
 
           final Object[] theArray = eval.asArray();
-          switch (columnType.getElementType().getType()) {
-            case LONG:
-              typeSet.add(ColumnType.LONG_ARRAY);
-              sizeEstimate = valueDictionary.addLongArray(theArray);
-              return new StructuredDataProcessor.ProcessedValue<>(theArray, 
sizeEstimate);
-            case DOUBLE:
-              typeSet.add(ColumnType.DOUBLE_ARRAY);
-              sizeEstimate = valueDictionary.addDoubleArray(theArray);
-              return new StructuredDataProcessor.ProcessedValue<>(theArray, 
sizeEstimate);
-            case STRING:
-              // empty arrays and arrays with all nulls are detected as string 
arrays, but don't count them as part of
-              // the type set yet, we'll handle that later when serializing
-              if (theArray.length == 0 || 
Arrays.stream(theArray).allMatch(Objects::isNull)) {
-                typeSet.addUntypedArray();
-              } else {
-                typeSet.add(ColumnType.STRING_ARRAY);
-              }
-              sizeEstimate = valueDictionary.addStringArray(theArray);
-              return new StructuredDataProcessor.ProcessedValue<>(theArray, 
sizeEstimate);
-            default:
-              throw new IAE("Unhandled type: %s", columnType);
+          if (theArray == null) {
+            typeSet.addUntypedArray();
+          } else {
+            switch (columnType.getElementType().getType()) {
+              case LONG:
+                typeSet.add(ColumnType.LONG_ARRAY);
+                sizeEstimate = valueDictionary.addLongArray(theArray);
+                return new StructuredDataProcessor.ProcessedValue<>(theArray, 
sizeEstimate);
+              case DOUBLE:
+                typeSet.add(ColumnType.DOUBLE_ARRAY);
+                sizeEstimate = valueDictionary.addDoubleArray(theArray);
+                return new StructuredDataProcessor.ProcessedValue<>(theArray, 
sizeEstimate);
+              case STRING:
+                // empty arrays and arrays with all nulls are detected as 
string arrays, but don't count them as part of
+                // the type set yet, we'll handle that later when serializing
+                if (theArray.length == 0 || 
Arrays.stream(theArray).allMatch(Objects::isNull)) {
+                  typeSet.addUntypedArray();
+                } else {
+                  typeSet.add(ColumnType.STRING_ARRAY);
+                }
+                sizeEstimate = valueDictionary.addStringArray(theArray);
+                return new StructuredDataProcessor.ProcessedValue<>(theArray, 
sizeEstimate);
+              default:
+                throw new IAE("Unhandled type: %s", columnType);
+            }
           }
         case STRING:
           typeSet.add(ColumnType.STRING);
@@ -651,11 +724,13 @@ public class AutoTypeColumnIndexer implements 
DimensionIndexer<StructuredData, S
   {
     private final ColumnType logicalType;
     private final boolean hasNulls;
+    private final boolean enforceLogicalType;
 
-    Format(ColumnType logicalType, boolean hasNulls)
+    Format(ColumnType logicalType, boolean hasNulls, boolean 
enforceLogicalType)
     {
       this.logicalType = logicalType;
       this.hasNulls = hasNulls;
+      this.enforceLogicalType = enforceLogicalType;
     }
 
     @Override
@@ -667,13 +742,13 @@ public class AutoTypeColumnIndexer implements 
DimensionIndexer<StructuredData, S
     @Override
     public DimensionHandler getColumnHandler(String columnName)
     {
-      return new NestedCommonFormatColumnHandler(columnName);
+      return new NestedCommonFormatColumnHandler(columnName, 
enforceLogicalType ? logicalType : null);
     }
 
     @Override
     public DimensionSchema getColumnSchema(String columnName)
     {
-      return new AutoTypeColumnSchema(columnName);
+      return new AutoTypeColumnSchema(columnName, enforceLogicalType ? 
logicalType : null);
     }
 
     @Override
@@ -683,11 +758,11 @@ public class AutoTypeColumnIndexer implements 
DimensionIndexer<StructuredData, S
         return this;
       }
       if (otherFormat instanceof Format) {
-        final boolean otherHasNulls = ((Format) otherFormat).hasNulls;
-        if (!getLogicalType().equals(otherFormat.getLogicalType())) {
-          return new Format(ColumnType.NESTED_DATA, hasNulls || otherHasNulls);
+        final Format other = (Format) otherFormat;
+        if (!getLogicalType().equals(other.getLogicalType())) {
+          return new Format(ColumnType.NESTED_DATA, hasNulls || 
other.hasNulls, false);
         }
-        return new Format(logicalType, hasNulls || otherHasNulls);
+        return new Format(logicalType, hasNulls || other.hasNulls, 
enforceLogicalType || other.enforceLogicalType);
       }
       throw new ISE(
           "Cannot merge columns of type[%s] and format[%s] and with [%s] and 
[%s]",
diff --git 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
index 7c978f63fec..5d1198f5460 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
@@ -81,10 +81,13 @@ public class AutoTypeColumnMerger implements 
DimensionMergerV9
   private NestedCommonFormatColumnSerializer serializer;
 
   private ColumnType logicalType;
+  @Nullable
+  private final ColumnType castToType;
   private boolean isVariantType = false;
 
   public AutoTypeColumnMerger(
       String name,
+      @Nullable ColumnType castToType,
       IndexSpec indexSpec,
       SegmentWriteOutMedium segmentWriteOutMedium,
       Closer closer
@@ -92,6 +95,7 @@ public class AutoTypeColumnMerger implements DimensionMergerV9
   {
 
     this.name = name;
+    this.castToType = castToType;
     this.indexSpec = indexSpec;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.closer = closer;
@@ -148,11 +152,17 @@ public class AutoTypeColumnMerger implements 
DimensionMergerV9
       final FieldTypeInfo.MutableTypeSet rootTypes = 
mergedFields.get(NestedPathFinder.JSON_PATH_ROOT);
       final boolean rootOnly = mergedFields.size() == 1 && rootTypes != null;
 
+      final ColumnType explicitType;
+      if (castToType != null && (castToType.isPrimitive() || 
castToType.isPrimitiveArray())) {
+        explicitType = castToType;
+      } else {
+        explicitType = null;
+      }
 
       // for backwards compat; remove this constant handling in druid 28 along 
with
       // indexSpec.optimizeJsonConstantColumns in favor of always writing 
constant columns
       // we also handle the numMergeIndex == 0 here, which also indicates that 
the column is a null constant
-      if (!forceNested && ((isConstant && constantValue == null) || 
numMergeIndex == 0)) {
+      if (explicitType == null && !forceNested && ((isConstant && 
constantValue == null) || numMergeIndex == 0)) {
         logicalType = ColumnType.STRING;
         serializer = new ScalarStringColumnSerializer(
             name,
@@ -160,8 +170,8 @@ public class AutoTypeColumnMerger implements 
DimensionMergerV9
             segmentWriteOutMedium,
             closer
         );
-      } else if (!forceNested && rootOnly && rootTypes.getSingleType() != 
null) {
-        logicalType = rootTypes.getSingleType();
+      } else if (explicitType != null || (!forceNested && rootOnly && 
rootTypes.getSingleType() != null)) {
+        logicalType = explicitType != null ? explicitType : 
rootTypes.getSingleType();
         // empty arrays can be missed since they don't have a type, so handle 
them here
         if (!logicalType.isArray() && hasArrays) {
           logicalType = ColumnTypeFactory.getInstance().ofArray(logicalType);
@@ -194,6 +204,7 @@ public class AutoTypeColumnMerger implements 
DimensionMergerV9
           case ARRAY:
             serializer = new VariantColumnSerializer(
                 name,
+                logicalType,
                 null,
                 indexSpec,
                 segmentWriteOutMedium,
@@ -220,6 +231,7 @@ public class AutoTypeColumnMerger implements 
DimensionMergerV9
         }
         serializer = new VariantColumnSerializer(
             name,
+            null,
             rootTypes.getByteValue(),
             indexSpec,
             segmentWriteOutMedium,
@@ -339,14 +351,16 @@ public class AutoTypeColumnMerger implements 
DimensionMergerV9
   {
     ColumnDescriptor.Builder descriptorBuilder = new 
ColumnDescriptor.Builder();
 
-    final NestedCommonFormatColumnPartSerde partSerde = 
NestedCommonFormatColumnPartSerde.serializerBuilder()
-                                                                               
          .withLogicalType(logicalType)
-                                                                               
          .withHasNulls(serializer.hasNulls())
-                                                                               
          .isVariantType(isVariantType)
-                                                                               
          .withByteOrder(ByteOrder.nativeOrder())
-                                                                               
          .withBitmapSerdeFactory(indexSpec.getBitmapSerdeFactory())
-                                                                               
          .withSerializer(serializer)
-                                                                               
          .build();
+    final NestedCommonFormatColumnPartSerde partSerde =
+        NestedCommonFormatColumnPartSerde.serializerBuilder()
+                                         .withLogicalType(logicalType)
+                                         .withHasNulls(serializer.hasNulls())
+                                         .isVariantType(isVariantType)
+                                         .withEnforceLogicalType(castToType != 
null)
+                                         
.withByteOrder(ByteOrder.nativeOrder())
+                                         
.withBitmapSerdeFactory(indexSpec.getBitmapSerdeFactory())
+                                         .withSerializer(serializer)
+                                         .build();
     descriptorBuilder.setValueType(ValueType.COMPLEX) // this doesn't really 
matter... you could say.. its complicated..
                      .setHasMultipleValues(false)
                      .addSerde(partSerde);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnSchema.java 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnSchema.java
index aff3e673d5f..a72ae37d874 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnSchema.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnSchema.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.segment.column.ColumnType;
@@ -33,6 +34,9 @@ import org.apache.druid.segment.nested.StructuredData;
 import org.apache.druid.segment.nested.VariantColumnSerializer;
 import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
 
+import javax.annotation.Nullable;
+import java.util.Objects;
+
 /**
  * Common {@link DimensionSchema} for ingestion of 'standard' Druid built-in 
{@link ColumnType} datatypes.
  *
@@ -62,12 +66,17 @@ public class AutoTypeColumnSchema extends DimensionSchema
 {
   public static final String TYPE = "auto";
 
+  @Nullable
+  private final ColumnType castToType;
+
   @JsonCreator
   public AutoTypeColumnSchema(
-      @JsonProperty("name") String name
+      @JsonProperty("name") String name,
+      @JsonProperty("castToType") @Nullable ColumnType castToType
   )
   {
     super(name, null, true);
+    this.castToType = castToType;
   }
 
   @Override
@@ -79,12 +88,55 @@ public class AutoTypeColumnSchema extends DimensionSchema
   @Override
   public ColumnType getColumnType()
   {
-    return ColumnType.NESTED_DATA;
+    return castToType != null ? castToType : ColumnType.NESTED_DATA;
+  }
+
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public ColumnType getCastToType()
+  {
+    return castToType;
   }
 
   @Override
   public DimensionHandler<StructuredData, StructuredData, StructuredData> 
getDimensionHandler()
   {
-    return new NestedCommonFormatColumnHandler(getName());
+    return new NestedCommonFormatColumnHandler(getName(), castToType);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    AutoTypeColumnSchema that = (AutoTypeColumnSchema) o;
+    return Objects.equals(castToType, that.castToType);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), castToType);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DimensionSchema{" +
+           "name='" + getName() + '\'' +
+           ", valueType=" + getColumnType() +
+           ", typeName=" + getTypeName() +
+           ", multiValueHandling=" + getMultiValueHandling() +
+           ", createBitmapIndex=" + hasBitmapIndex() +
+           ", castToType=" + castToType +
+           '}';
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
 
b/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
index d267af8b85e..5e811e22995 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
@@ -30,6 +30,7 @@ import 
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
 import 
org.apache.druid.segment.selector.settable.SettableObjectColumnValueSelector;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import javax.annotation.Nullable;
 import java.util.Comparator;
 
 public class NestedCommonFormatColumnHandler implements 
DimensionHandler<StructuredData, StructuredData, StructuredData>
@@ -41,10 +42,13 @@ public class NestedCommonFormatColumnHandler implements 
DimensionHandler<Structu
       );
 
   private final String name;
+  @Nullable
+  private final ColumnType castTo;
 
-  public NestedCommonFormatColumnHandler(String name)
+  public NestedCommonFormatColumnHandler(String name, @Nullable ColumnType 
castTo)
   {
     this.name = name;
+    this.castTo = castTo;
   }
 
   @Override
@@ -56,19 +60,19 @@ public class NestedCommonFormatColumnHandler implements 
DimensionHandler<Structu
   @Override
   public DimensionSpec getDimensionSpec()
   {
-    return new DefaultDimensionSpec(name, name, ColumnType.NESTED_DATA);
+    return new DefaultDimensionSpec(name, name, castTo != null ? castTo : 
ColumnType.NESTED_DATA);
   }
 
   @Override
   public DimensionSchema getDimensionSchema(ColumnCapabilities capabilities)
   {
-    return new AutoTypeColumnSchema(name);
+    return new AutoTypeColumnSchema(name, castTo);
   }
 
   @Override
   public DimensionIndexer<StructuredData, StructuredData, StructuredData> 
makeIndexer(boolean useMaxMemoryEstimates)
   {
-    return new AutoTypeColumnIndexer();
+    return new AutoTypeColumnIndexer(name, castTo);
   }
 
   @Override
@@ -80,7 +84,7 @@ public class NestedCommonFormatColumnHandler implements 
DimensionHandler<Structu
       Closer closer
   )
   {
-    return new AutoTypeColumnMerger(name, indexSpec, segmentWriteOutMedium, 
closer);
+    return new AutoTypeColumnMerger(name, castTo, indexSpec, 
segmentWriteOutMedium, closer);
   }
 
   @Override
@@ -94,6 +98,13 @@ public class NestedCommonFormatColumnHandler implements 
DimensionHandler<Structu
   @Override
   public Comparator<ColumnValueSelector> getEncodedValueSelectorComparator()
   {
+    if (castTo != null) {
+      return (s1, s2) ->
+          castTo.getStrategy().compare(
+              StructuredData.unwrap(s1.getObject()),
+              StructuredData.unwrap(s2.getObject())
+          );
+    }
     return COMPARATOR;
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java 
b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java
index acde3af49bf..bd1e7bf5724 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java
@@ -93,7 +93,7 @@ public class NestedDataColumnSchema extends DimensionSchema
     if (formatVersion == 4) {
       return new NestedDataColumnHandlerV4(getName());
     } else {
-      return new NestedCommonFormatColumnHandler(getName());
+      return new NestedCommonFormatColumnHandler(getName(), null);
     }
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
index 1269fe1e6b3..aa739b3f744 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
@@ -100,12 +100,6 @@ public abstract class AppendableIndexBuilder
     return this;
   }
 
-  public AppendableIndexBuilder setSortFacts(final boolean sortFacts)
-  {
-    this.sortFacts = sortFacts;
-    return this;
-  }
-
   public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
   {
     this.maxRowCount = maxRowCount;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index 1f45f202cdb..db72ed37171 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -584,7 +584,7 @@ public abstract class IncrementalIndex implements 
Iterable<Row>, Closeable, Colu
           wasNewDim = true;
           final DimensionHandler<?, ?, ?> handler;
           if (useSchemaDiscovery) {
-            handler = new NestedCommonFormatColumnHandler(dimension);
+            handler = new NestedCommonFormatColumnHandler(dimension, null);
           } else {
             // legacy behavior: for schemaless type discovery, everything is a 
String
             handler = DimensionHandlerUtils.getHandlerFromCapabilities(
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumn.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumn.java
index abb91fc1483..fe65fc90444 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumn.java
@@ -89,11 +89,13 @@ public interface NestedCommonFormatColumn extends BaseColumn
   {
     private final ColumnType logicalType;
     private final boolean hasNulls;
+    private final boolean enforceLogicalType;
 
-    public Format(ColumnType logicalType, boolean hasNulls)
+    public Format(ColumnType logicalType, boolean hasNulls, boolean 
enforceLogicalType)
     {
       this.logicalType = logicalType;
       this.hasNulls = hasNulls;
+      this.enforceLogicalType = enforceLogicalType;
     }
 
     @Override
@@ -105,13 +107,13 @@ public interface NestedCommonFormatColumn extends 
BaseColumn
     @Override
     public DimensionHandler getColumnHandler(String columnName)
     {
-      return new NestedCommonFormatColumnHandler(columnName);
+      return new NestedCommonFormatColumnHandler(columnName, 
enforceLogicalType ? logicalType : null);
     }
 
     @Override
     public DimensionSchema getColumnSchema(String columnName)
     {
-      return new AutoTypeColumnSchema(columnName);
+      return new AutoTypeColumnSchema(columnName, enforceLogicalType ? 
logicalType : null);
     }
 
     @Override
@@ -122,11 +124,11 @@ public interface NestedCommonFormatColumn extends 
BaseColumn
       }
 
       if (otherFormat instanceof Format) {
-        final boolean otherHasNulls = ((Format) otherFormat).hasNulls;
-        if (!getLogicalType().equals(otherFormat.getLogicalType())) {
-          return new Format(ColumnType.NESTED_DATA, hasNulls || otherHasNulls);
+        final Format other = (Format) otherFormat;
+        if (!getLogicalType().equals(other.getLogicalType())) {
+          return new Format(ColumnType.NESTED_DATA, hasNulls || 
other.hasNulls, false);
         }
-        return new Format(logicalType, hasNulls || otherHasNulls);
+        return new Format(logicalType, hasNulls || other.hasNulls, 
enforceLogicalType || other.enforceLogicalType);
       }
       throw new ISE(
           "Cannot merge columns of type[%s] and format[%s] and with [%s] and 
[%s]",
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
index ef99d5a7331..e077282f98f 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
@@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExpressionType;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.data.ColumnarDoublesSerializer;
@@ -56,7 +57,7 @@ public class ScalarDoubleColumnSerializer extends 
ScalarNestedCommonFormatColumn
   @Override
   protected int processValue(@Nullable Object rawValue) throws IOException
   {
-    final ExprEval<?> eval = ExprEval.bestEffortOf(rawValue);
+    final ExprEval<?> eval = 
ExprEval.bestEffortOf(rawValue).castTo(ExpressionType.DOUBLE);
     final double val = eval.asDouble();
     final int dictId = eval.isNumericNull() ? 0 : 
dictionaryIdLookup.lookupDouble(val);
     doublesSerializer.add(dictId == 0 ? 0.0 : val);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
index 0af923ced99..bfb966365e2 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
@@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExpressionType;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.data.ColumnarLongsSerializer;
@@ -56,7 +57,7 @@ public class ScalarLongColumnSerializer extends 
ScalarNestedCommonFormatColumnSe
   @Override
   protected int processValue(@Nullable Object rawValue) throws IOException
   {
-    final ExprEval<?> eval = ExprEval.bestEffortOf(rawValue);
+    final ExprEval<?> eval = 
ExprEval.bestEffortOf(rawValue).castTo(ExpressionType.LONG);
 
     final long val = eval.asLong();
     final int dictId = eval.isNumericNull() ? 0 : 
dictionaryIdLookup.lookupLong(val);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
index f342effb768..58464b2c9e6 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
@@ -27,6 +27,7 @@ import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
@@ -34,6 +35,7 @@ import 
org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExpressionType;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.column.ColumnType;
@@ -81,10 +83,13 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
   private ByteBuffer columnNameBytes = null;
   private boolean hasNulls;
   @Nullable
+  private final ExpressionType expectedExpressionType;
+  @Nullable
   private final Byte variantTypeSetByte;
 
   public VariantColumnSerializer(
       String name,
+      @Nullable ColumnType logicalType,
       @Nullable Byte variantTypeSetByte,
       IndexSpec indexSpec,
       SegmentWriteOutMedium segmentWriteOutMedium,
@@ -92,6 +97,7 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
   )
   {
     this.name = name;
+    this.expectedExpressionType = logicalType != null ? 
ExpressionType.fromColumnTypeStrict(logicalType) : null;
     this.variantTypeSetByte = variantTypeSetByte;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.indexSpec = indexSpec;
@@ -228,8 +234,24 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
     }
 
     ExprEval eval = 
ExprEval.bestEffortOf(StructuredData.unwrap(selector.getObject()));
+    if (expectedExpressionType != null) {
+      try {
+        eval = eval.castTo(expectedExpressionType);
+      }
+      catch (IAE invalidCast) {
+        // write null
+        intermediateValueWriter.write(0);
+        hasNulls = true;
+        return;
+      }
+    }
     if (eval.isArray()) {
       Object[] array = eval.asArray();
+      if (array == null) {
+        intermediateValueWriter.write(0);
+        hasNulls = true;
+        return;
+      }
       int[] globalIds = new int[array.length];
       for (int i = 0; i < array.length; i++) {
         if (array[i] == null) {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
 
b/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
index f33d820d3b5..5e3d74d0d95 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/serde/NestedCommonFormatColumnPartSerde.java
@@ -78,6 +78,7 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
       @JsonProperty("logicalType") ColumnType logicalType,
       @JsonProperty("hasNulls") boolean hasNulls,
       @JsonProperty("isVariantType") boolean isVariantType,
+      @JsonProperty("enforceLogicalType") boolean enforceLogicalType,
       @JsonProperty("byteOrder") ByteOrder byteOrder,
       @JsonProperty("bitmapSerdeFactory") BitmapSerdeFactory bitmapSerdeFactory
   )
@@ -86,6 +87,7 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
         logicalType,
         hasNulls,
         isVariantType,
+        enforceLogicalType,
         byteOrder,
         bitmapSerdeFactory,
         null
@@ -95,6 +97,7 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
   private final ColumnType logicalType;
   private final boolean hasNulls;
   private final boolean isVariantType;
+  private final boolean enforceLogicalType;
   private final ByteOrder byteOrder;
   private final BitmapSerdeFactory bitmapSerdeFactory;
 
@@ -106,6 +109,7 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
       ColumnType logicalType,
       boolean hasNulls,
       boolean isVariant,
+      boolean enforceLogicalType,
       ByteOrder byteOrder,
       BitmapSerdeFactory bitmapSerdeFactory,
       @Nullable Serializer serializer
@@ -114,6 +118,7 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
     this.logicalType = logicalType;
     this.hasNulls = hasNulls;
     this.isVariantType = isVariant;
+    this.enforceLogicalType = enforceLogicalType;
     this.byteOrder = byteOrder;
     this.bitmapSerdeFactory = bitmapSerdeFactory;
     this.serializer = serializer;
@@ -163,6 +168,12 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
     return isVariantType;
   }
 
+  @JsonProperty("enforceLogicalType")
+  public boolean enforceLogicalType()
+  {
+    return enforceLogicalType;
+  }
+
   @JsonProperty
   public ByteOrder getByteOrder()
   {
@@ -194,7 +205,7 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
       builder.setType(logicalType);
       builder.setNestedCommonFormatColumnSupplier(supplier);
       builder.setIndexSupplier(supplier, true, false);
-      builder.setColumnFormat(new NestedCommonFormatColumn.Format(logicalType, 
capabilitiesBuilder.hasNulls().isTrue()));
+      builder.setColumnFormat(new NestedCommonFormatColumn.Format(logicalType, 
capabilitiesBuilder.hasNulls().isTrue(), enforceLogicalType));
     }
   }
 
@@ -216,7 +227,7 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
       builder.setType(logicalType);
       builder.setNestedCommonFormatColumnSupplier(supplier);
       builder.setIndexSupplier(supplier, true, false);
-      builder.setColumnFormat(new NestedCommonFormatColumn.Format(logicalType, 
capabilitiesBuilder.hasNulls().isTrue()));
+      builder.setColumnFormat(new NestedCommonFormatColumn.Format(logicalType, 
capabilitiesBuilder.hasNulls().isTrue(), enforceLogicalType));
     }
   }
 
@@ -238,7 +249,7 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
       builder.setType(logicalType);
       builder.setNestedCommonFormatColumnSupplier(supplier);
       builder.setIndexSupplier(supplier, true, false);
-      builder.setColumnFormat(new NestedCommonFormatColumn.Format(logicalType, 
capabilitiesBuilder.hasNulls().isTrue()));
+      builder.setColumnFormat(new NestedCommonFormatColumn.Format(logicalType, 
capabilitiesBuilder.hasNulls().isTrue(), enforceLogicalType));
     }
   }
 
@@ -268,7 +279,8 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
       builder.setNestedCommonFormatColumnSupplier(supplier);
       builder.setColumnFormat(new NestedCommonFormatColumn.Format(
           logicalType,
-          capabilitiesBuilder.hasNulls().isTrue()
+          capabilitiesBuilder.hasNulls().isTrue(),
+          enforceLogicalType
       ));
     }
   }
@@ -295,7 +307,7 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
       ColumnType logicalType = simpleType == null ? ColumnType.NESTED_DATA : 
simpleType;
       builder.setType(logicalType);
       builder.setNestedCommonFormatColumnSupplier(supplier);
-      builder.setColumnFormat(new NestedCommonFormatColumn.Format(logicalType, 
hasNulls));
+      builder.setColumnFormat(new NestedCommonFormatColumn.Format(logicalType, 
hasNulls, enforceLogicalType));
     }
   }
 
@@ -304,6 +316,8 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
     private ColumnType logicalType = ColumnType.NESTED_DATA;
     private boolean hasNulls;
     private boolean isVariantType;
+    private boolean enforceLogicalType;
+
     private ByteOrder byteOrder = ByteOrder.nativeOrder();
     BitmapSerdeFactory bitmapSerdeFactory = 
RoaringBitmapSerdeFactory.getInstance();
     @Nullable
@@ -345,12 +359,19 @@ public class NestedCommonFormatColumnPartSerde implements 
ColumnPartSerde
       return this;
     }
 
+    public SerializerBuilder withEnforceLogicalType(boolean enforceLogicalType)
+    {
+      this.enforceLogicalType = enforceLogicalType;
+      return this;
+    }
+
     public NestedCommonFormatColumnPartSerde build()
     {
       return new NestedCommonFormatColumnPartSerde(
           logicalType,
           hasNulls,
           isVariantType,
+          enforceLogicalType,
           byteOrder,
           bitmapSerdeFactory,
           serializer
diff --git 
a/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java 
b/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java
index e5de6343735..1ff564106c5 100644
--- a/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java
+++ b/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java
@@ -105,12 +105,12 @@ public class NestedDataTestUtils
       DimensionsSpec.builder()
                     .setDimensions(
                         Arrays.asList(
-                            new AutoTypeColumnSchema("dim"),
-                            new AutoTypeColumnSchema("nest_json"),
-                            new AutoTypeColumnSchema("nester_json"),
-                            new AutoTypeColumnSchema("variant_json"),
-                            new AutoTypeColumnSchema("list_json"),
-                            new AutoTypeColumnSchema("nonexistent")
+                            new AutoTypeColumnSchema("dim", null),
+                            new AutoTypeColumnSchema("nest_json", null),
+                            new AutoTypeColumnSchema("nester_json", null),
+                            new AutoTypeColumnSchema("variant_json", null),
+                            new AutoTypeColumnSchema("list_json", null),
+                            new AutoTypeColumnSchema("nonexistent", null)
                         )
                     )
                     .build();
diff --git 
a/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java
 
b/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java
index fe8900c0dd8..6d2d29806df 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/AutoTypeColumnIndexerTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.dimension.DimensionSpec;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
@@ -67,7 +68,7 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
   @Test
   public void testKeySizeEstimation()
   {
-    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer();
+    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer("test", null);
     int baseCardinality = NullHandling.sqlCompatible() ? 0 : 2;
     Assert.assertEquals(baseCardinality, indexer.getCardinality());
 
@@ -198,6 +199,8 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
     Assert.assertEquals(1, dimensionSelector.getRow().size());
     
Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
     Assert.assertNull(dimensionSelector.getObject());
+
+    Assert.assertEquals(ColumnType.STRING, 
storageAdapter.getColumnCapabilities(STRING_COL).toColumnType());
   }
 
   @Test
@@ -294,6 +297,7 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
       );
       Assert.assertEquals(String.valueOf(NullHandling.defaultLongValue()), 
dimensionSelector.getObject());
     }
+    Assert.assertEquals(ColumnType.LONG, 
storageAdapter.getColumnCapabilities(LONG_COL).toColumnType());
   }
 
   @Test
@@ -389,6 +393,7 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
       );
       Assert.assertEquals(String.valueOf(NullHandling.defaultDoubleValue()), 
dimensionSelector.getObject());
     }
+    Assert.assertEquals(ColumnType.DOUBLE, 
storageAdapter.getColumnCapabilities(DOUBLE_COL).toColumnType());
   }
 
   @Test
@@ -454,6 +459,7 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
         () -> 
cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec)
     );
     Assert.assertNull(valueSelector.getObject());
+    Assert.assertEquals(ColumnType.STRING_ARRAY, 
storageAdapter.getColumnCapabilities(STRING_ARRAY_COL).toColumnType());
   }
 
   @Test
@@ -512,6 +518,7 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
     dimensionSelector = 
cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec);
     Assert.assertNull(valueSelector.getObject());
     Assert.assertNull(dimensionSelector.getObject());
+    Assert.assertEquals(ColumnType.STRING, 
storageAdapter.getColumnCapabilities(VARIANT_COL).toColumnType());
   }
 
   @Test
@@ -579,13 +586,91 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
         () -> 
cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec)
     );
     Assert.assertNull(valueSelector.getObject());
+    Assert.assertEquals(ColumnType.NESTED_DATA, 
storageAdapter.getColumnCapabilities(NESTED_COL).toColumnType());
+  }
+
+  @Test
+  public void testNestedColumnIndexerSchemaDiscoveryTypeCoercion() throws 
IndexSizeExceededException
+  {
+    // coerce nested column to STRING type, throwing parse exceptions for 
nested data
+    // and casting anything else to string
+    long minTimestamp = System.currentTimeMillis();
+    IncrementalIndex index = new OnheapIncrementalIndex.Builder()
+        .setIndexSchema(
+            new IncrementalIndexSchema(
+                minTimestamp,
+                new TimestampSpec(TIME_COL, "millis", null),
+                Granularities.NONE,
+                VirtualColumns.EMPTY,
+                DimensionsSpec.builder()
+                              .setDimensions(ImmutableList.of(new 
AutoTypeColumnSchema(NESTED_COL, ColumnType.STRING)))
+                              .useSchemaDiscovery(true)
+                              .build(),
+                new AggregatorFactory[0],
+                false
+            )
+        )
+        .setMaxRowCount(1000)
+        .build();
+
+    index.add(makeInputRow(minTimestamp + 1, true, NESTED_COL, "a"));
+    index.add(makeInputRow(minTimestamp + 2, true, NESTED_COL, 2L));
+    IncrementalIndexAddResult result = index.add(makeInputRow(minTimestamp + 
3, true, NESTED_COL, ImmutableMap.of("x", 1.1, "y", 2L)));
+    Assert.assertTrue(result.hasParseException());
+    index.add(makeInputRow(minTimestamp + 4, true, NESTED_COL, null));
+    index.add(makeInputRow(minTimestamp + 5, false, NESTED_COL, null));
+
+    IncrementalIndexStorageAdapter storageAdapter = new 
IncrementalIndexStorageAdapter(index);
+    Sequence<Cursor> cursorSequence = storageAdapter.makeCursors(
+        null,
+        Intervals.ETERNITY,
+        VirtualColumns.EMPTY,
+        Granularities.NONE,
+        false,
+        null
+    );
+    final DimensionSpec dimensionSpec = new DefaultDimensionSpec(NESTED_COL, 
NESTED_COL, ColumnType.STRING);
+    List<Cursor> cursorList = cursorSequence.toList();
+    ColumnSelectorFactory columnSelectorFactory = 
cursorList.get(0).getColumnSelectorFactory();
+
+    ColumnValueSelector valueSelector = 
columnSelectorFactory.makeColumnValueSelector(NESTED_COL);
+    DimensionSelector dimensionSelector = 
cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec);
+    Assert.assertEquals("a", valueSelector.getObject());
+    Assert.assertEquals("a", dimensionSelector.getObject());
+
+    columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory();
+    valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL);
+    dimensionSelector = 
cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec);
+    Assert.assertEquals("2", valueSelector.getObject());
+    Assert.assertFalse(valueSelector.isNull());
+    Assert.assertEquals("2", dimensionSelector.getObject());
+
+    columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory();
+    valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL);
+    dimensionSelector = 
cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec);
+    Assert.assertNull(valueSelector.getObject());
+    Assert.assertNull(dimensionSelector.getObject());
+
+    columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory();
+    valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL);
+    dimensionSelector = 
cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec);
+    Assert.assertNull(valueSelector.getObject());
+    Assert.assertNull(dimensionSelector.getObject());
+
+    columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory();
+    valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL);
+    dimensionSelector = 
cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec);
+    Assert.assertNull(valueSelector.getObject());
+    Assert.assertNull(dimensionSelector.getObject());
+
+    Assert.assertEquals(ColumnType.STRING, 
storageAdapter.getColumnCapabilities(NESTED_COL).toColumnType());
   }
 
   @Test
   public void testConstantNull()
   {
     int baseCardinality = NullHandling.sqlCompatible() ? 0 : 2;
-    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer();
+    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer("test", null);
     EncodedKeyComponent<StructuredData> key;
 
     key = indexer.processRowValsToUnsortedEncodedKeyComponent(null, true);
@@ -603,13 +688,14 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
     Assert.assertTrue(indexer.hasNulls);
     Assert.assertFalse(indexer.hasNestedData);
     Assert.assertTrue(indexer.isConstant());
+    Assert.assertEquals(ColumnType.STRING, indexer.getLogicalType());
   }
 
   @Test
   public void testConstantString()
   {
     int baseCardinality = NullHandling.sqlCompatible() ? 0 : 2;
-    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer();
+    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer("test", null);
     EncodedKeyComponent<StructuredData> key;
 
     key = indexer.processRowValsToUnsortedEncodedKeyComponent("abcd", true);
@@ -626,13 +712,14 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
     Assert.assertFalse(indexer.hasNulls);
     Assert.assertFalse(indexer.hasNestedData);
     Assert.assertTrue(indexer.isConstant());
+    Assert.assertEquals(ColumnType.STRING, indexer.getLogicalType());
   }
 
   @Test
   public void testConstantLong()
   {
     int baseCardinality = NullHandling.sqlCompatible() ? 0 : 2;
-    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer();
+    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer("test", null);
     EncodedKeyComponent<StructuredData> key;
 
     key = indexer.processRowValsToUnsortedEncodedKeyComponent(1234L, true);
@@ -649,13 +736,14 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
     Assert.assertFalse(indexer.hasNulls);
     Assert.assertFalse(indexer.hasNestedData);
     Assert.assertTrue(indexer.isConstant());
+    Assert.assertEquals(ColumnType.LONG, indexer.getLogicalType());
   }
 
   @Test
   public void testConstantEmptyArray()
   {
     int baseCardinality = NullHandling.sqlCompatible() ? 0 : 2;
-    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer();
+    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer("test", null);
     EncodedKeyComponent<StructuredData> key;
 
     key = 
indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableList.of(), true);
@@ -672,13 +760,14 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
     Assert.assertFalse(indexer.hasNulls);
     Assert.assertFalse(indexer.hasNestedData);
     Assert.assertTrue(indexer.isConstant());
+    Assert.assertEquals(ColumnType.NESTED_DATA, indexer.getLogicalType());
   }
 
   @Test
   public void testConstantArray()
   {
     int baseCardinality = NullHandling.sqlCompatible() ? 0 : 2;
-    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer();
+    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer("test", null);
     EncodedKeyComponent<StructuredData> key;
 
     key = 
indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableList.of(1L, 2L, 
3L), true);
@@ -695,13 +784,14 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
     Assert.assertFalse(indexer.hasNulls);
     Assert.assertFalse(indexer.hasNestedData);
     Assert.assertTrue(indexer.isConstant());
+    Assert.assertEquals(ColumnType.LONG_ARRAY, indexer.getLogicalType());
   }
 
   @Test
   public void testConstantEmptyObject()
   {
     int baseCardinality = NullHandling.sqlCompatible() ? 0 : 2;
-    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer();
+    AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer("test", null);
     EncodedKeyComponent<StructuredData> key;
 
     key = 
indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of(), true);
@@ -718,6 +808,7 @@ public class AutoTypeColumnIndexerTest extends 
InitializedNullHandlingTest
     Assert.assertFalse(indexer.hasNulls);
     Assert.assertTrue(indexer.hasNestedData);
     Assert.assertTrue(indexer.isConstant());
+    Assert.assertEquals(ColumnType.NESTED_DATA, indexer.getLogicalType());
   }
 
   @Nonnull
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/BaseFilterTest.java 
b/processing/src/test/java/org/apache/druid/segment/filter/BaseFilterTest.java
index 723bdd07f06..a4a45c2355d 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/BaseFilterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/BaseFilterTest.java
@@ -166,10 +166,10 @@ public abstract class BaseFilterTest extends 
InitializedNullHandlingTest
                    .add(new DoubleDimensionSchema("d0"))
                    .add(new FloatDimensionSchema("f0"))
                    .add(new LongDimensionSchema("l0"))
-                   .add(new AutoTypeColumnSchema("arrayString"))
-                   .add(new AutoTypeColumnSchema("arrayLong"))
-                   .add(new AutoTypeColumnSchema("arrayDouble"))
-                   .add(new AutoTypeColumnSchema("variant"))
+                   .add(new AutoTypeColumnSchema("arrayString", 
ColumnType.STRING_ARRAY))
+                   .add(new AutoTypeColumnSchema("arrayLong", 
ColumnType.LONG_ARRAY))
+                   .add(new AutoTypeColumnSchema("arrayDouble", 
ColumnType.DOUBLE_ARRAY))
+                   .add(new AutoTypeColumnSchema("variant", null))
                    .build()
   );
 
@@ -441,7 +441,7 @@ public abstract class BaseFilterTest extends 
InitializedNullHandlingTest
                                                 .getDimensions()
                                                 .stream()
                                                 .map(
-                                                    dimensionSchema -> new 
AutoTypeColumnSchema(dimensionSchema.getName())
+                                                    dimensionSchema -> new 
AutoTypeColumnSchema(dimensionSchema.getName(), null)
                                                 )
                                                 .collect(Collectors.toList())
                                       ),
@@ -469,7 +469,7 @@ public abstract class BaseFilterTest extends 
InitializedNullHandlingTest
                                                 .getDimensions()
                                                 .stream()
                                                 .map(
-                                                    dimensionSchema -> new 
AutoTypeColumnSchema(dimensionSchema.getName())
+                                                    dimensionSchema -> new 
AutoTypeColumnSchema(dimensionSchema.getName(), null)
                                                 )
                                                 .collect(Collectors.toList())
                                       ),
@@ -498,7 +498,7 @@ public abstract class BaseFilterTest extends 
InitializedNullHandlingTest
                                                         .getDimensions()
                                                         .stream()
                                                         .map(
-                                                            dimensionSchema -> 
new AutoTypeColumnSchema(dimensionSchema.getName())
+                                                            dimensionSchema -> 
new AutoTypeColumnSchema(dimensionSchema.getName(), null)
                                                         )
                                                         
.collect(Collectors.toList())
                                               ),
diff --git 
a/processing/src/test/java/org/apache/druid/segment/generator/DataGeneratorTest.java
 
b/processing/src/test/java/org/apache/druid/segment/generator/DataGeneratorTest.java
index 5eaacf0bf94..002a631e344 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/generator/DataGeneratorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/generator/DataGeneratorTest.java
@@ -660,7 +660,6 @@ public class DataGeneratorTest extends 
InitializedNullHandlingTest
 
     IncrementalIndex index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
-        .setSortFacts(false)
         .setMaxRowCount(1_000_000)
         .build();
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
index 2517ce1388f..0b45026e829 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
@@ -60,24 +60,18 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
 {
   public final IncrementalIndexCreator indexCreator;
 
+  private final String mode;
+
   @Rule
   public final CloserRule closer = new CloserRule(false);
 
-  public IncrementalIndexTest(String indexType, String mode, boolean 
deserializeComplexMetrics,
-                              IncrementalIndexSchema schema) throws 
JsonProcessingException
-  {
-    indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, 
(builder, args) -> builder
-        .setIndexSchema(schema)
-        .setDeserializeComplexMetrics(deserializeComplexMetrics)
-        .setSortFacts("rollup".equals(mode))
-        .setMaxRowCount(1_000_000)
-        .build())
-    );
-  }
-
-  @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}")
-  public static Collection<?> constructorFeeder()
+  public IncrementalIndexTest(
+      String indexType,
+      String mode,
+      boolean deserializeComplexMetrics
+  ) throws JsonProcessingException
   {
+    this.mode = mode;
     DimensionsSpec dimensions = new DimensionsSpec(
         Arrays.asList(
             new StringDimensionSchema("string"),
@@ -86,11 +80,11 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
             new DoubleDimensionSchema("double"),
             new StringDimensionSchema("bool_string"),
             new LongDimensionSchema("bool_long"),
-            new AutoTypeColumnSchema("bool_auto"),
-            new AutoTypeColumnSchema("array_string"),
-            new AutoTypeColumnSchema("array_double"),
-            new AutoTypeColumnSchema("array_long"),
-            new AutoTypeColumnSchema("nested")
+            new AutoTypeColumnSchema("bool_auto", null),
+            new AutoTypeColumnSchema("array_string", ColumnType.STRING_ARRAY),
+            new AutoTypeColumnSchema("array_double", ColumnType.DOUBLE_ARRAY),
+            new AutoTypeColumnSchema("array_long", ColumnType.LONG_ARRAY),
+            new AutoTypeColumnSchema("nested", null)
         )
     );
     AggregatorFactory[] metrics = {
@@ -103,12 +97,22 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
         .withQueryGranularity(Granularities.MINUTE)
         .withDimensionsSpec(dimensions)
         .withMetrics(metrics)
+        .withRollup("rollup".equals(mode))
         .build();
+    indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, 
(builder, args) -> builder
+        .setIndexSchema(schema)
+        .setDeserializeComplexMetrics(deserializeComplexMetrics)
+        .setMaxRowCount(1_000_000)
+        .build())
+    );
+  }
 
+  @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}")
+  public static Collection<?> constructorFeeder()
+  {
     return IncrementalIndexCreator.indexTypeCartesianProduct(
         ImmutableList.of("rollup", "plain"),
-        ImmutableList.of(true, false),
-        ImmutableList.of(schema)
+        ImmutableList.of(true, false)
     );
   }
 
@@ -331,7 +335,7 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
     index.add(row);
     index.add(row);
 
-    Assert.assertEquals(1, index.size());
+    Assert.assertEquals("rollup".equals(mode) ? 1 : 3, index.size());
   }
 
   @Test
@@ -399,7 +403,6 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
     Assert.assertEquals(ColumnType.DOUBLE, 
index.getColumnCapabilities("double").toColumnType());
     Assert.assertEquals(ColumnType.STRING, 
index.getColumnCapabilities("bool_string").toColumnType());
     Assert.assertEquals(ColumnType.LONG, 
index.getColumnCapabilities("bool_long").toColumnType());
-    // depends on value of 'druid.expressions.useStrictBooleans', current 
default is false which parses as strings
     Assert.assertEquals(ColumnType.LONG, 
index.getColumnCapabilities("bool_auto").toColumnType());
     Assert.assertEquals(ColumnType.STRING_ARRAY, 
index.getColumnCapabilities("array_string").toColumnType());
     Assert.assertEquals(ColumnType.LONG_ARRAY, 
index.getColumnCapabilities("array_long").toColumnType());
@@ -416,9 +419,9 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
     Assert.assertEquals("true", row.getRaw("bool_string"));
     Assert.assertEquals(1L, row.getRaw("bool_long"));
     Assert.assertEquals(StructuredData.wrap(true), row.getRaw("bool_auto"));
-    Assert.assertEquals(StructuredData.wrap(ImmutableList.of("a", "b", "c")), 
row.getRaw("array_string"));
-    Assert.assertEquals(StructuredData.wrap(ImmutableList.of(1, 2, 3)), 
row.getRaw("array_long"));
-    Assert.assertEquals(StructuredData.wrap(ImmutableList.of(1.1, 2.2, 3.3)), 
row.getRaw("array_double"));
+    Assert.assertEquals(StructuredData.wrap(new Object[]{"a", "b", "c"}), 
row.getRaw("array_string"));
+    Assert.assertEquals(StructuredData.wrap(new Object[]{1L, 2L, 3L}), 
row.getRaw("array_long"));
+    Assert.assertEquals(StructuredData.wrap(new Object[]{1.1, 2.2, 3.3}), 
row.getRaw("array_double"));
     Assert.assertEquals(StructuredData.wrap(ImmutableMap.of("x", 1, "y", 
ImmutableList.of("a", "b"))), row.getRaw("nested"));
 
     row = rowIterator.next();
@@ -429,9 +432,9 @@ public class IncrementalIndexTest extends 
InitializedNullHandlingTest
     Assert.assertEquals("false", row.getRaw("bool_string"));
     Assert.assertEquals(0L, row.getRaw("bool_long"));
     Assert.assertEquals(StructuredData.wrap(false), row.getRaw("bool_auto"));
-    Assert.assertEquals(StructuredData.wrap(ImmutableList.of("d", "e", "f")), 
row.getRaw("array_string"));
-    Assert.assertEquals(StructuredData.wrap(ImmutableList.of(4, 5, 6)), 
row.getRaw("array_long"));
-    Assert.assertEquals(StructuredData.wrap(ImmutableList.of(4.4, 5.5, 6.6)), 
row.getRaw("array_double"));
+    Assert.assertEquals(StructuredData.wrap(new Object[]{"d", "e", "f"}), 
row.getRaw("array_string"));
+    Assert.assertEquals(StructuredData.wrap(new Object[]{4L, 5L, 6L}), 
row.getRaw("array_long"));
+    Assert.assertEquals(StructuredData.wrap(new Object[]{4.4, 5.5, 6.6}), 
row.getRaw("array_double"));
     Assert.assertEquals(StructuredData.wrap(ImmutableMap.of("x", 2, "y", 
ImmutableList.of("c", "d"))), row.getRaw("nested"));
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
index d223926101b..a97fd378635 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
@@ -195,7 +195,7 @@ public class NestedDataColumnSupplierTest extends 
InitializedNullHandlingTest
           closer
       );
 
-      AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer();
+      AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer("test", null);
       for (Object o : data) {
         indexer.processRowValsToUnsortedEncodedKeyComponent(o, false);
       }
@@ -254,6 +254,7 @@ public class NestedDataColumnSupplierTest extends 
InitializedNullHandlingTest
         ColumnType.NESTED_DATA,
         false,
         false,
+        false,
         ByteOrder.nativeOrder(),
         RoaringBitmapSerdeFactory.getInstance()
     );
@@ -277,6 +278,7 @@ public class NestedDataColumnSupplierTest extends 
InitializedNullHandlingTest
         ColumnType.NESTED_DATA,
         false,
         false,
+        false,
         ByteOrder.nativeOrder(),
         RoaringBitmapSerdeFactory.getInstance()
     );
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
index cebb40e7756..2e2a8a21e29 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
@@ -131,7 +131,7 @@ public class ScalarDoubleColumnSupplierTest extends 
InitializedNullHandlingTest
           closer
       );
 
-      AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer();
+      AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer("test", null);
       for (Object o : data) {
         indexer.processRowValsToUnsortedEncodedKeyComponent(o, false);
       }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
index 945bb1e9e9e..801b88c3beb 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
@@ -131,7 +131,7 @@ public class ScalarLongColumnSupplierTest extends 
InitializedNullHandlingTest
           closer
       );
 
-      AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer();
+      AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer("test", null);
       for (Object o : data) {
         indexer.processRowValsToUnsortedEncodedKeyComponent(o, false);
       }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
index 3f59521be73..cc9edb6a357 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
@@ -132,7 +132,7 @@ public class ScalarStringColumnSupplierTest extends 
InitializedNullHandlingTest
           closer
       );
 
-      AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer();
+      AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer("test", null);
       for (Object o : data) {
         indexer.processRowValsToUnsortedEncodedKeyComponent(o, false);
       }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
index e394668ed80..b1eb65ea4ae 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
@@ -232,7 +232,7 @@ public class VariantColumnSupplierTest extends 
InitializedNullHandlingTest
     SegmentWriteOutMediumFactory writeOutMediumFactory = 
TmpFileSegmentWriteOutMediumFactory.instance();
     try (final FileSmoosher smoosher = new FileSmoosher(tmpFile)) {
 
-      AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer();
+      AutoTypeColumnIndexer indexer = new AutoTypeColumnIndexer("test", null);
       for (Object o : data) {
         indexer.processRowValsToUnsortedEncodedKeyComponent(o, false);
       }
@@ -256,6 +256,7 @@ public class VariantColumnSupplierTest extends 
InitializedNullHandlingTest
       }
       VariantColumnSerializer serializer = new VariantColumnSerializer(
           fileNameBase,
+          expectedTypes.getSingleType() == null ? null : expectedLogicalType,
           expectedTypes.getSingleType() == null ? expectedTypes.getByteValue() 
: null,
           indexSpec,
           
writeOutMediumFactory.makeSegmentWriteOutMedium(tempFolder.newFolder()),
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
index 65316b8a760..7a2a9809930 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
@@ -145,11 +145,11 @@ public class CalciteNestedDataQueryTest extends 
BaseCalciteQueryTest
       new TimestampSpec("t", "iso", null),
       DimensionsSpec.builder().setDimensions(
           ImmutableList.<DimensionSchema>builder()
-                       .add(new AutoTypeColumnSchema("string"))
-                       .add(new AutoTypeColumnSchema("nest"))
-                       .add(new AutoTypeColumnSchema("nester"))
-                       .add(new AutoTypeColumnSchema("long"))
-                       .add(new AutoTypeColumnSchema("string_sparse"))
+                       .add(new AutoTypeColumnSchema("string", null))
+                       .add(new AutoTypeColumnSchema("nest", null))
+                       .add(new AutoTypeColumnSchema("nester", null))
+                       .add(new AutoTypeColumnSchema("long", null))
+                       .add(new AutoTypeColumnSchema("string_sparse", null))
                        .build()
       ).build(),
       null
@@ -160,8 +160,8 @@ public class CalciteNestedDataQueryTest extends 
BaseCalciteQueryTest
       DimensionsSpec.builder().setDimensions(
           ImmutableList.<DimensionSchema>builder()
                        .add(new StringDimensionSchema("string"))
-                       .add(new AutoTypeColumnSchema("nest"))
-                       .add(new AutoTypeColumnSchema("nester"))
+                       .add(new AutoTypeColumnSchema("nest", null))
+                       .add(new AutoTypeColumnSchema("nester", null))
                        .add(new LongDimensionSchema("long"))
                        .add(new StringDimensionSchema("string_sparse"))
                        .build()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to