This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 209f8a95468 Deserialize complex dimensions in group by queries to 
their respective types when reading from spilled files and cached results 
(#16620)
209f8a95468 is described below

commit 209f8a95468e8450e3d1365d7f0eda98a35732df
Author: Laksh Singla <lakshsin...@gmail.com>
AuthorDate: Mon Jul 15 15:00:17 2024 +0530

    Deserialize complex dimensions in group by queries to their respective 
types when reading from spilled files and cached results (#16620)
    
    Like #16511, but for keys that have been spilled or cached during the 
grouping process
---
 .../apache/druid/jackson/AggregatorsModule.java    |  15 +-
 .../org/apache/druid/query/QueryToolChest.java     |  21 ++-
 .../DataSourceQueryQueryToolChest.java             |   8 +
 .../query/groupby/GroupByQueryQueryToolChest.java  | 106 ++++++++-----
 .../druid/query/groupby/epinephelinae/Grouper.java |  11 ++
 .../epinephelinae/RowBasedGrouperHelper.java       | 135 ++++++++++++++---
 .../epinephelinae/RowBasedKeySerdeHelper.java      |   5 +
 .../groupby/epinephelinae/SpillingGrouper.java     |   2 +-
 .../SegmentMetadataQueryQueryToolChest.java        |  11 ++
 .../query/search/SearchQueryQueryToolChest.java    |  10 ++
 .../TimeBoundaryQueryQueryToolChest.java           |  12 ++
 .../timeseries/TimeseriesQueryQueryToolChest.java  |  12 ++
 .../druid/query/topn/TopNQueryQueryToolChest.java  |  13 +-
 .../column/ObjectStrategyComplexTypeStrategy.java  |   6 +-
 .../druid/segment/column/TypeStrategies.java       |  30 ++++
 .../apache/druid/segment/column/TypeStrategy.java  |   2 +-
 .../query/aggregation/AggregationTestHelper.java   |   8 +-
 .../groupby/ComplexDimensionGroupByQueryTest.java  | 164 +++++++++++++++++++++
 .../groupby/GroupByQueryQueryToolChestTest.java    |  94 +++++++++---
 .../query/groupby/GroupByQueryRunnerTest.java      |   5 +-
 .../druid/client/CachingClusteredClient.java       |   2 +-
 .../apache/druid/client/CachingQueryRunner.java    |   2 +-
 .../druid/query/ResultLevelCachingQueryRunner.java |   2 +-
 .../druid/client/CachingQueryRunnerTest.java       |  15 +-
 24 files changed, 594 insertions(+), 97 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java 
b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
index f7aca511e17..200e6fcb139 100644
--- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
+++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
@@ -83,6 +83,16 @@ public class AggregatorsModule extends SimpleModule
   {
     super("AggregatorFactories");
 
+    registerComplexMetricsAndSerde();
+
+    setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
+    setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
+
+    addSerializer(DoubleMeanHolder.class, 
DoubleMeanHolder.Serializer.INSTANCE);
+  }
+
+  public static void registerComplexMetricsAndSerde()
+  {
     ComplexMetrics.registerSerde(HyperUniquesSerde.TYPE_NAME, new 
HyperUniquesSerde());
     ComplexMetrics.registerSerde(PreComputedHyperUniquesSerde.TYPE_NAME, new 
PreComputedHyperUniquesSerde());
     ComplexMetrics.registerSerde(
@@ -102,11 +112,6 @@ public class AggregatorsModule extends SimpleModule
         SerializablePairLongLongComplexMetricSerde.TYPE_NAME,
         new SerializablePairLongLongComplexMetricSerde()
     );
-
-    setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
-    setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
-
-    addSerializer(DoubleMeanHolder.class, 
DoubleMeanHolder.Serializer.INSTANCE);
   }
 
   @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
diff --git 
a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java 
b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java
index b0678f247c9..fa394beec43 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java
@@ -251,19 +251,36 @@ public abstract class QueryToolChest<ResultType, 
QueryType extends Query<ResultT
    */
   public abstract TypeReference<ResultType> getResultTypeReference();
 
+  /**
+   * Like {@link #getCacheStrategy(Query, ObjectMapper)} but the caller 
doesn't supply the object mapper for deserializing
+   * and converting the cached data to desired type. It's upto the individual 
implementations to decide the appropriate action in that case.
+   * It can either throw an exception outright or decide if the query requires 
the object mapper for proper downstream processing and
+   * work with the generic java types if not.
+   * <p>
+   * @deprecated Use {@link #getCacheStrategy(Query, ObjectMapper)} instead
+   */
+  @Deprecated
+  @Nullable
+  public <T> CacheStrategy<ResultType, T, QueryType> 
getCacheStrategy(QueryType query)
+  {
+    return null;
+  }
+
   /**
    * Returns a CacheStrategy to be used to load data into the cache and remove 
it from the cache.
    * <p>
    * This is optional.  If it returns null, caching is effectively disabled 
for the query.
    *
    * @param query The query whose results might be cached
+   * @param mapper Object mapper to convert the deserialized generic java 
objects to desired types. It can be nullable
+   *               to preserve backward compatibility.
    * @param <T>   The type of object that will be stored in the cache
    * @return A CacheStrategy that can be used to populate and read from the 
Cache
    */
   @Nullable
-  public <T> CacheStrategy<ResultType, T, QueryType> 
getCacheStrategy(QueryType query)
+  public <T> CacheStrategy<ResultType, T, QueryType> 
getCacheStrategy(QueryType query, @Nullable ObjectMapper mapper)
   {
-    return null;
+    return getCacheStrategy(query);
   }
 
   /**
diff --git 
a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java
index dbe8922f2e9..21fb5c53afc 100644
--- 
a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.query.datasourcemetadata;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.inject.Inject;
@@ -38,6 +39,7 @@ import 
org.apache.druid.query.aggregation.MetricManipulationFn;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.timeline.LogicalSegment;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -119,4 +121,10 @@ public class DataSourceQueryQueryToolChest
   {
     return null;
   }
+
+  @Override
+  public CacheStrategy getCacheStrategy(DataSourceMetadataQuery query, 
@Nullable ObjectMapper mapper)
+  {
+    return null;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
index b19b479c26d..d69e09c9ff0 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
@@ -77,8 +77,10 @@ import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.NullableTypeStrategy;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.nested.StructuredData;
 import org.joda.time.DateTime;
 
+import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -471,7 +473,7 @@ public class GroupByQueryQueryToolChest extends 
QueryToolChest<ResultRow, GroupB
     // Deserializer that can deserialize either array- or map-based rows.
     final JsonDeserializer<ResultRow> deserializer = new 
JsonDeserializer<ResultRow>()
     {
-      final Class<?>[] dimensionClasses = createDimensionClasses();
+      final Class<?>[] dimensionClasses = createDimensionClasses(query);
       boolean containsComplexDimensions = query.getDimensions()
                                                .stream()
                                                .anyMatch(
@@ -524,30 +526,6 @@ public class GroupByQueryQueryToolChest extends 
QueryToolChest<ResultRow, GroupB
           return ResultRow.of(objectArray);
         }
       }
-
-      private Class<?>[] createDimensionClasses()
-      {
-        final List<DimensionSpec> queryDimensions = query.getDimensions();
-        final Class<?>[] classes = new Class[queryDimensions.size()];
-        for (int i = 0; i < queryDimensions.size(); ++i) {
-          final ColumnType dimensionOutputType = 
queryDimensions.get(i).getOutputType();
-          if (dimensionOutputType.is(ValueType.COMPLEX)) {
-            NullableTypeStrategy nullableTypeStrategy = 
dimensionOutputType.getNullableStrategy();
-            if (!nullableTypeStrategy.groupable()) {
-              throw DruidException.defensive(
-                  "Ungroupable dimension [%s] with type [%s] found in the 
query.",
-                  queryDimensions.get(i).getDimension(),
-                  dimensionOutputType
-              );
-            }
-            classes[i] = nullableTypeStrategy.getClazz();
-          } else {
-            classes[i] = Object.class;
-          }
-        }
-        return classes;
-      }
-
     };
 
     class GroupByResultRowModule extends SimpleModule
@@ -597,9 +575,32 @@ public class GroupByQueryQueryToolChest extends 
QueryToolChest<ResultRow, GroupB
     );
   }
 
+  @Nullable
   @Override
-  public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(final 
GroupByQuery query)
+  public CacheStrategy<ResultRow, Object, GroupByQuery> 
getCacheStrategy(GroupByQuery query)
   {
+    return getCacheStrategy(query, null);
+  }
+
+  @Override
+  public CacheStrategy<ResultRow, Object, GroupByQuery> getCacheStrategy(
+      final GroupByQuery query,
+      @Nullable final ObjectMapper mapper
+  )
+  {
+
+    for (DimensionSpec dimension : query.getDimensions()) {
+      if (dimension.getOutputType().is(ValueType.COMPLEX) && 
!dimension.getOutputType().equals(ColumnType.NESTED_DATA)) {
+        if (mapper == null) {
+          throw DruidException.defensive(
+              "Cannot deserialize complex dimension of type[%s] from result 
cache if object mapper is not provided",
+              dimension.getOutputType().getComplexTypeName()
+          );
+        }
+      }
+    }
+    final Class<?>[] dimensionClasses = createDimensionClasses(query);
+
     return new CacheStrategy<ResultRow, Object, GroupByQuery>()
     {
       private static final byte CACHE_STRATEGY_VERSION = 0x1;
@@ -726,13 +727,29 @@ public class GroupByQueryQueryToolChest extends 
QueryToolChest<ResultRow, GroupB
             int dimPos = 0;
             while (dimsIter.hasNext() && results.hasNext()) {
               final DimensionSpec dimensionSpec = dimsIter.next();
-
-              // Must convert generic Jackson-deserialized type into the 
proper type.
-              resultRow.set(
-                  dimensionStart + dimPos,
-                  DimensionHandlerUtils.convertObjectToType(results.next(), 
dimensionSpec.getOutputType())
-              );
-
+              final Object dimensionObject = results.next();
+              final Object dimensionObjectCasted;
+
+              final ColumnType outputType = dimensionSpec.getOutputType();
+
+              // Must convert generic Jackson-deserialized type into the 
proper type. The downstream functions expect the
+              // dimensions to be of appropriate types for further processing 
like merging and comparing.
+              if (outputType.is(ValueType.COMPLEX)) {
+                // Json columns can interpret generic data objects 
appropriately, hence they are wrapped as is in StructuredData.
+                // They don't need to converted them from Object.class to 
StructuredData.class using object mapper as that is an
+                // expensive operation that will be wasteful.
+                if (outputType.equals(ColumnType.NESTED_DATA)) {
+                  dimensionObjectCasted = StructuredData.wrap(dimensionObject);
+                } else {
+                  dimensionObjectCasted = mapper.convertValue(dimensionObject, 
dimensionClasses[dimPos]);
+                }
+              } else {
+                dimensionObjectCasted = 
DimensionHandlerUtils.convertObjectToType(
+                    dimensionObject,
+                    dimensionSpec.getOutputType()
+                );
+              }
+              resultRow.set(dimensionStart + dimPos, dimensionObjectCasted);
               dimPos++;
             }
 
@@ -861,4 +878,27 @@ public class GroupByQueryQueryToolChest extends 
QueryToolChest<ResultRow, GroupB
 
     return retVal;
   }
+
+  private static Class<?>[] createDimensionClasses(final GroupByQuery query)
+  {
+    final List<DimensionSpec> queryDimensions = query.getDimensions();
+    final Class<?>[] classes = new Class[queryDimensions.size()];
+    for (int i = 0; i < queryDimensions.size(); ++i) {
+      final ColumnType dimensionOutputType = 
queryDimensions.get(i).getOutputType();
+      if (dimensionOutputType.is(ValueType.COMPLEX)) {
+        NullableTypeStrategy nullableTypeStrategy = 
dimensionOutputType.getNullableStrategy();
+        if (!nullableTypeStrategy.groupable()) {
+          throw DruidException.defensive(
+              "Ungroupable dimension [%s] with type [%s] found in the query.",
+              queryDimensions.get(i).getDimension(),
+              dimensionOutputType
+          );
+        }
+        classes[i] = nullableTypeStrategy.getClazz();
+      } else {
+        classes[i] = Object.class;
+      }
+    }
+    return classes;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java
index 591624f1ab8..0f3faedb707 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Grouper.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query.groupby.epinephelinae;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -232,6 +233,16 @@ public interface Grouper<KeyType> extends Closeable
      */
     BufferComparator bufferComparatorWithAggregators(AggregatorFactory[] 
aggregatorFactories, int[] aggregatorOffsets);
 
+    /**
+     * Decorates the object mapper enabling it to read and write query 
results' grouping keys. It is used by the
+     * {@link SpillingGrouper} to preserve the types of the dimensions after 
serializing and deserializing them on the
+     * spilled files.
+     */
+    default ObjectMapper decorateObjectMapper(ObjectMapper spillMapper)
+    {
+      return spillMapper;
+    }
+
     /**
      * Reset the keySerde to its initial state. After this method is called, 
{@link #readFromByteBuffer}
      * and {@link #bufferComparator()} may no longer work properly on 
previously-serialized keys.
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
index 491c28d4142..da8a0e04623 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
@@ -19,9 +19,14 @@
 
 package org.apache.druid.query.groupby.epinephelinae;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonValue;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.ObjectCodec;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.primitives.Ints;
@@ -84,6 +89,7 @@ import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.io.Closeable;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -666,22 +672,6 @@ public class RowBasedGrouperHelper
       this.key = key;
     }
 
-    @JsonCreator
-    public static RowBasedKey fromJsonArray(final Object[] key)
-    {
-      // Type info is lost during serde:
-      // Floats may be deserialized as doubles, Longs may be deserialized as 
integers, convert them back
-      for (int i = 0; i < key.length; i++) {
-        if (key[i] instanceof Integer) {
-          key[i] = ((Integer) key[i]).longValue();
-        } else if (key[i] instanceof Double) {
-          key[i] = ((Double) key[i]).floatValue();
-        }
-      }
-
-      return new RowBasedKey(key);
-    }
-
     @JsonValue
     public Object[] getKey()
     {
@@ -1371,6 +1361,65 @@ public class RowBasedGrouperHelper
       );
     }
 
+    @Override
+    public ObjectMapper decorateObjectMapper(ObjectMapper spillMapper)
+    {
+
+      final JsonDeserializer<RowBasedKey> deserializer = new 
JsonDeserializer<RowBasedKey>()
+      {
+        @Override
+        public RowBasedKey deserialize(
+            JsonParser jp,
+            DeserializationContext deserializationContext
+        ) throws IOException
+        {
+          if (!jp.isExpectedStartArrayToken()) {
+            throw DruidException.defensive("Expected array start token, 
received [%s]", jp.getCurrentToken());
+          }
+          jp.nextToken();
+
+          final ObjectCodec codec = jp.getCodec();
+          final int timestampAdjustment = includeTimestamp ? 1 : 0;
+          final int dimsToRead = timestampAdjustment + serdeHelpers.length;
+          int dimsReadSoFar = 0;
+          final Object[] objects = new Object[dimsToRead];
+
+          if (includeTimestamp) {
+            DruidException.conditionalDefensive(
+                jp.currentToken() != JsonToken.END_ARRAY,
+                "Unexpected end of array when deserializing timestamp from the 
spilled files"
+            );
+            objects[dimsReadSoFar] = codec.readValue(jp, Long.class);
+
+            ++dimsReadSoFar;
+            jp.nextToken();
+          }
+
+          while (jp.currentToken() != JsonToken.END_ARRAY) {
+            objects[dimsReadSoFar] =
+                codec.readValue(jp, serdeHelpers[dimsReadSoFar - 
timestampAdjustment].getClazz());
+
+            ++dimsReadSoFar;
+            jp.nextToken();
+          }
+
+          return new RowBasedKey(objects);
+        }
+      };
+
+      class SpillModule extends SimpleModule
+      {
+        public SpillModule()
+        {
+          addDeserializer(RowBasedKey.class, deserializer);
+        }
+      }
+
+      final ObjectMapper newObjectMapper = spillMapper.copy();
+      newObjectMapper.registerModule(new SpillModule());
+      return newObjectMapper;
+    }
+
     @Override
     public void reset()
     {
@@ -1588,6 +1637,7 @@ public class RowBasedGrouperHelper
     {
       final BufferComparator bufferComparator;
       final String columnTypeName;
+      final Class<?> clazz;
 
       final List<Object> dictionary;
       final Object2IntMap<Object> reverseDictionary;
@@ -1613,6 +1663,7 @@ public class RowBasedGrouperHelper
                 dictionary.get(lhsBuffer.getInt(lhsPosition + 
keyBufferPosition)),
                 dictionary.get(rhsBuffer.getInt(rhsPosition + 
keyBufferPosition))
             );
+        clazz = columnType.getNullableStrategy().getClazz();
       }
 
       // Asserts that we don't entertain any complex types without a typename, 
to prevent intermixing dictionaries of
@@ -1645,6 +1696,12 @@ public class RowBasedGrouperHelper
       {
         return reverseDictionary;
       }
+
+      @Override
+      public Class<?> getClazz()
+      {
+        return clazz;
+      }
     }
 
 
@@ -1726,6 +1783,14 @@ public class RowBasedGrouperHelper
       {
         return reverseDictionary;
       }
+
+      @Override
+      public Class<?> getClazz()
+      {
+        // Jackson deserializes Object[] containing longs to Object[] 
containing string if Object[].class is returned
+        // Therefore we are using Object.class
+        return Object.class;
+      }
     }
 
     private class ArrayStringRowBasedKeySerdeHelper extends 
DictionaryBuildingSingleValuedRowBasedKeySerdeHelper
@@ -1770,6 +1835,12 @@ public class RowBasedGrouperHelper
       {
         return reverseStringArrayDictionary;
       }
+
+      @Override
+      public Class<?> getClazz()
+      {
+        return Object[].class;
+      }
     }
 
     private abstract class AbstractStringRowBasedKeySerdeHelper implements 
RowBasedKeySerdeHelper
@@ -1819,6 +1890,12 @@ public class RowBasedGrouperHelper
       {
         return bufferComparator;
       }
+
+      @Override
+      public Class<?> getClazz()
+      {
+        return String.class;
+      }
     }
 
     private class DynamicDictionaryStringRowBasedKeySerdeHelper extends 
AbstractStringRowBasedKeySerdeHelper
@@ -1937,6 +2014,12 @@ public class RowBasedGrouperHelper
       {
         return bufferComparator;
       }
+
+      @Override
+      public Class<?> getClazz()
+      {
+        return Long.class;
+      }
     }
 
     private class FloatRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
@@ -1982,6 +2065,12 @@ public class RowBasedGrouperHelper
       {
         return bufferComparator;
       }
+
+      @Override
+      public Class<?> getClazz()
+      {
+        return Float.class;
+      }
     }
 
     private class DoubleRowBasedKeySerdeHelper implements 
RowBasedKeySerdeHelper
@@ -2027,6 +2116,12 @@ public class RowBasedGrouperHelper
       {
         return bufferComparator;
       }
+
+      @Override
+      public Class<?> getClazz()
+      {
+        return Double.class;
+      }
     }
 
     // This class is only used when SQL compatible null handling is enabled.
@@ -2082,6 +2177,12 @@ public class RowBasedGrouperHelper
       {
         return comparator;
       }
+
+      @Override
+      public Class<?> getClazz()
+      {
+        return delegate.getClazz();
+      }
     }
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java
index 1cb29d23bc0..71372ca238b 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedKeySerdeHelper.java
@@ -65,4 +65,9 @@ interface RowBasedKeySerdeHelper
    * Return a {@link BufferComparator} to compare keys stored in ByteBuffer.
    */
   BufferComparator getBufferComparator();
+
+  /**
+   * Returns the expected class of the key which used to deserialize the 
objects correctly from the spilled files.
+   */
+  Class<?> getClazz();
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
index 4e9b96102a1..d8a7760c11d 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -152,7 +152,7 @@ public class SpillingGrouper<KeyType> implements 
Grouper<KeyType>
     }
     this.aggregatorFactories = aggregatorFactories;
     this.temporaryStorage = temporaryStorage;
-    this.spillMapper = spillMapper;
+    this.spillMapper = keySerde.decorateObjectMapper(spillMapper);
     this.spillingAllowed = spillingAllowed;
     this.sortHasNonGroupingFields = sortHasNonGroupingFields;
   }
diff --git 
a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
index 912ecb1ac32..fd8d7e7009c 100644
--- 
a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.query.metadata;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
@@ -62,6 +63,7 @@ import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -184,6 +186,15 @@ public class SegmentMetadataQueryQueryToolChest extends 
QueryToolChest<SegmentAn
 
   @Override
   public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> 
getCacheStrategy(final SegmentMetadataQuery query)
+  {
+    return getCacheStrategy(query, null);
+  }
+
+  @Override
+  public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> 
getCacheStrategy(
+      final SegmentMetadataQuery query,
+      @Nullable final ObjectMapper objectMapper
+  )
   {
     return new CacheStrategy<SegmentAnalysis, SegmentAnalysis, 
SegmentMetadataQuery>()
     {
diff --git 
a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java
index b390cd83a58..c15e1d0d99c 100644
--- 
a/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/search/SearchQueryQueryToolChest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.query.search;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
@@ -124,6 +125,15 @@ public class SearchQueryQueryToolChest extends 
QueryToolChest<Result<SearchResul
 
   @Override
   public CacheStrategy<Result<SearchResultValue>, Object, SearchQuery> 
getCacheStrategy(final SearchQuery query)
+  {
+    return getCacheStrategy(query, null);
+  }
+
+  @Override
+  public CacheStrategy<Result<SearchResultValue>, Object, SearchQuery> 
getCacheStrategy(
+      final SearchQuery query,
+      @Nullable final ObjectMapper objectMapper
+  )
   {
 
     return new CacheStrategy<Result<SearchResultValue>, Object, SearchQuery>()
diff --git 
a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
index 9087dd26a88..eab5e0f5abc 100644
--- 
a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.query.timeboundary;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
@@ -47,6 +48,7 @@ import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.timeline.LogicalSegment;
 
+import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.List;
@@ -163,6 +165,16 @@ public class TimeBoundaryQueryQueryToolChest
 
   @Override
   public CacheStrategy<Result<TimeBoundaryResultValue>, Object, 
TimeBoundaryQuery> getCacheStrategy(final TimeBoundaryQuery query)
+  {
+    return getCacheStrategy(query, null);
+  }
+
+
+  @Override
+  public CacheStrategy<Result<TimeBoundaryResultValue>, Object, 
TimeBoundaryQuery> getCacheStrategy(
+      final TimeBoundaryQuery query,
+      @Nullable final ObjectMapper objectMapper
+  )
   {
     return new CacheStrategy<Result<TimeBoundaryResultValue>, Object, 
TimeBoundaryQuery>()
     {
diff --git 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index 17a2f8be956..67c36fe7603 100644
--- 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.query.timeseries;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
@@ -65,6 +66,7 @@ import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.joda.time.DateTime;
 
+import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -276,6 +278,16 @@ public class TimeseriesQueryQueryToolChest extends 
QueryToolChest<Result<Timeser
 
   @Override
   public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery> 
getCacheStrategy(final TimeseriesQuery query)
+  {
+    return getCacheStrategy(query, null);
+  }
+
+
+  @Override
+  public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery> 
getCacheStrategy(
+      final TimeseriesQuery query,
+      @Nullable final ObjectMapper objectMapper
+  )
   {
     return new CacheStrategy<Result<TimeseriesResultValue>, Object, 
TimeseriesQuery>()
     {
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
index 25a4284aa42..21bc336438a 100644
--- 
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.query.topn;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
@@ -64,6 +65,7 @@ import org.apache.druid.segment.DimensionHandlerUtils;
 import org.apache.druid.segment.column.RowSignature;
 import org.joda.time.DateTime;
 
+import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -268,9 +270,18 @@ public class TopNQueryQueryToolChest extends 
QueryToolChest<Result<TopNResultVal
     return TYPE_REFERENCE;
   }
 
+  @Nullable
+  @Override
+  public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> 
getCacheStrategy(TopNQuery query)
+  {
+    return getCacheStrategy(query, null);
+  }
 
   @Override
-  public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> 
getCacheStrategy(final TopNQuery query)
+  public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> 
getCacheStrategy(
+      final TopNQuery query,
+      @Nullable final ObjectMapper objectMapper
+  )
   {
     return new CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>()
     {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java
 
b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java
index b274e55282e..f80a1cdcf8d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/column/ObjectStrategyComplexTypeStrategy.java
@@ -123,7 +123,7 @@ public class ObjectStrategyComplexTypeStrategy<T> 
implements TypeStrategy<T>
   public int hashCode(T o)
   {
     if (hashStrategy == null) {
-      throw DruidException.defensive("hashStrategy not provided");
+      throw DruidException.defensive("Type [%s] is not groupable", 
typeSignature.asTypeString());
     }
     return hashStrategy.hashCode(o);
   }
@@ -132,7 +132,7 @@ public class ObjectStrategyComplexTypeStrategy<T> 
implements TypeStrategy<T>
   public boolean equals(T a, T b)
   {
     if (hashStrategy == null) {
-      throw DruidException.defensive("hashStrategy not provided");
+      throw DruidException.defensive("Type [%s] is not groupable", 
typeSignature.asTypeString());
     }
     return hashStrategy.equals(a, b);
   }
@@ -141,7 +141,7 @@ public class ObjectStrategyComplexTypeStrategy<T> 
implements TypeStrategy<T>
   public Class<?> getClazz()
   {
     if (clazz == null) {
-      throw DruidException.defensive("hashStrategy not provided");
+      throw DruidException.defensive("Type [%s] is not groupable", 
typeSignature.asTypeString());
     }
     return clazz;
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java 
b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
index bae29179b4d..7ac8def99ec 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategies.java
@@ -299,6 +299,12 @@ public class TypeStrategies
     {
       return a.equals(b);
     }
+
+    @Override
+    public Class<?> getClazz()
+    {
+      return Long.class;
+    }
   }
 
   /**
@@ -368,6 +374,12 @@ public class TypeStrategies
     {
       return a.equals(b);
     }
+
+    @Override
+    public Class<?> getClazz()
+    {
+      return Float.class;
+    }
   }
 
   /**
@@ -438,6 +450,12 @@ public class TypeStrategies
     {
       return a.equals(b);
     }
+
+    @Override
+    public Class<?> getClazz()
+    {
+      return Double.class;
+    }
   }
 
   /**
@@ -519,6 +537,12 @@ public class TypeStrategies
     {
       return a.equals(b);
     }
+
+    @Override
+    public Class<?> getClazz()
+    {
+      return String.class;
+    }
   }
 
   /**
@@ -664,5 +688,11 @@ public class TypeStrategies
         return false;
       }
     }
+
+    @Override
+    public Class<?> getClazz()
+    {
+      return Object[].class;
+    }
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java 
b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java
index c5cff1a0b2f..075fceca473 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/TypeStrategy.java
@@ -225,6 +225,6 @@ public interface TypeStrategy<T> extends 
Comparator<Object>, Hash.Strategy<T>
    */
   default Class<?> getClazz()
   {
-    throw DruidException.defensive("Not implemented. It is only implemented 
for complex dimensions which are groupable()");
+    throw DruidException.defensive("Not implemented. Check groupable() first");
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index 526a62c813f..2ad9f90148a 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -766,7 +766,7 @@ public class AggregationTestHelper implements Closeable
           String resultStr = mapper.writer().writeValueAsString(yielder);
 
           List<ResultRow> resultRows = Lists.transform(
-              readQueryResultArrayFromString(resultStr),
+              readQueryResultArrayFromString(resultStr, queryPlus.getQuery()),
               toolChest.makePreComputeManipulatorFn(
                   queryPlus.getQuery(),
                   MetricManipulatorFns.deserializing()
@@ -798,11 +798,13 @@ public class AggregationTestHelper implements Closeable
     };
   }
 
-  private List readQueryResultArrayFromString(String str) throws Exception
+  private List readQueryResultArrayFromString(String str, Query query) throws 
Exception
   {
     List result = new ArrayList();
 
-    JsonParser jp = mapper.getFactory().createParser(str);
+    ObjectMapper decoratedMapper = toolChest.decorateObjectMapper(mapper, 
query);
+
+    JsonParser jp = decoratedMapper.getFactory().createParser(str);
 
     if (jp.nextToken() != JsonToken.START_ARRAY) {
       throw new IAE("not an array [%s]", str);
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java
new file mode 100644
index 00000000000..bc1ecbb0ddc
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/ComplexDimensionGroupByQueryTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.aggregation.AggregationTestHelper;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import 
org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.segment.RowBasedSegment;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class ComplexDimensionGroupByQueryTest
+{
+  private final QueryContexts.Vectorize vectorize;
+  private final AggregationTestHelper helper;
+  private final List<Segment> segments;
+
+  @Rule
+  public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+  public ComplexDimensionGroupByQueryTest(GroupByQueryConfig config, String 
vectorize)
+  {
+    this.vectorize = QueryContexts.Vectorize.fromString(vectorize);
+    this.helper = 
AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+        Collections.emptyList(),
+        config,
+        tempFolder
+    );
+    Sequence<Object[]> rows = Sequences.simple(
+        ImmutableList.of(
+            new Object[]{new SerializablePairLongString(1L, "abc")},
+            new Object[]{new SerializablePairLongString(1L, "abc")},
+            new Object[]{new SerializablePairLongString(1L, "def")},
+            new Object[]{new SerializablePairLongString(1L, "abc")},
+            new Object[]{new SerializablePairLongString(1L, "ghi")},
+            new Object[]{new SerializablePairLongString(1L, "def")},
+            new Object[]{new SerializablePairLongString(1L, "abc")},
+            new Object[]{new SerializablePairLongString(1L, "pqr")},
+            new Object[]{new SerializablePairLongString(1L, "xyz")},
+            new Object[]{new SerializablePairLongString(1L, "foo")},
+            new Object[]{new SerializablePairLongString(1L, "bar")}
+        )
+    );
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add(
+                                                "pair",
+                                                
ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME)
+                                            )
+                                            .build();
+
+    this.segments = Collections.singletonList(
+        new RowBasedSegment<>(
+            SegmentId.dummy("dummy"),
+            rows,
+            columnName -> {
+              final int columnNumber = rowSignature.indexOf(columnName);
+              return row -> columnNumber >= 0 ? row[columnNumber] : null;
+            },
+            rowSignature
+        )
+    );
+  }
+
+  @Parameterized.Parameters(name = "config = {0}, vectorize = {1}")
+  public static Collection<?> constructorFeeder()
+  {
+    final List<Object[]> constructors = new ArrayList<>();
+    for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
+      for (String vectorize : new String[]{"false", "force"}) {
+        constructors.add(new Object[]{config, vectorize});
+      }
+    }
+    return constructors;
+  }
+
+  public Map<String, Object> getContext()
+  {
+    return ImmutableMap.of(
+        QueryContexts.VECTORIZE_KEY, vectorize.toString(),
+        QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, "true"
+    );
+  }
+
+  @Test
+  public void testGroupByOnPairClass()
+  {
+    GroupByQuery groupQuery = GroupByQuery.builder()
+                                          .setDataSource("test_datasource")
+                                          .setGranularity(Granularities.ALL)
+                                          .setInterval(Intervals.ETERNITY)
+                                          .setDimensions(new 
DefaultDimensionSpec(
+                                              "pair",
+                                              "pair",
+                                              
ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME)
+                                          ))
+                                          .setAggregatorSpecs(new 
CountAggregatorFactory("count"))
+                                          .setContext(getContext())
+                                          .build();
+
+    if (vectorize == QueryContexts.Vectorize.FORCE) {
+      // Cannot vectorize group by on complex dimension
+      Assert.assertThrows(
+          RuntimeException.class,
+          () -> helper.runQueryOnSegmentsObjs(segments, groupQuery).toList()
+      );
+    } else {
+      List<ResultRow> resultRows = helper.runQueryOnSegmentsObjs(segments, 
groupQuery).toList();
+
+      Assert.assertArrayEquals(
+          new ResultRow[]{
+              ResultRow.of(new SerializablePairLongString(1L, "abc"), 4L),
+              ResultRow.of(new SerializablePairLongString(1L, "bar"), 1L),
+              ResultRow.of(new SerializablePairLongString(1L, "def"), 2L),
+              ResultRow.of(new SerializablePairLongString(1L, "foo"), 1L),
+              ResultRow.of(new SerializablePairLongString(1L, "ghi"), 1L),
+              ResultRow.of(new SerializablePairLongString(1L, "pqr"), 1L),
+              ResultRow.of(new SerializablePairLongString(1L, "xyz"), 1L)
+          },
+          resultRows.toArray()
+      );
+    }
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
index f43bbce9d97..7279ca938bd 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
@@ -33,6 +33,7 @@ import org.apache.druid.collections.SerializablePair;
 import org.apache.druid.collections.StupidPool;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.Row;
+import org.apache.druid.jackson.AggregatorsModule;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -97,6 +98,7 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
   public static void setUpClass()
   {
     NullHandling.initializeForTests();
+    AggregatorsModule.registerComplexMetricsAndSerde();
   }
 
   @Test
@@ -130,11 +132,13 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
         .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
         .build();
 
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
+
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, 
mapper);
 
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, 
mapper);
 
     Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), 
strategy2.computeCacheKey(query2)));
     Assert.assertFalse(Arrays.equals(
@@ -190,11 +194,12 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
         )
         .build();
 
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, 
mapper);
 
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, 
mapper);
 
     Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), 
strategy2.computeCacheKey(query2)));
     Assert.assertFalse(Arrays.equals(
@@ -252,11 +257,12 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
         .setHavingSpec(new 
GreaterThanHavingSpec(QueryRunnerTestHelper.UNIQUE_METRIC, 10))
         .build();
 
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, 
mapper);
 
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, 
mapper);
 
     Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), 
strategy2.computeCacheKey(query2)));
     Assert.assertFalse(Arrays.equals(
@@ -336,11 +342,12 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
         .setHavingSpec(andHavingSpec2)
         .build();
 
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, 
mapper);
 
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, 
mapper);
 
     Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), 
strategy2.computeCacheKey(query2)));
     Assert.assertFalse(Arrays.equals(
@@ -427,11 +434,12 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
         .setHavingSpec(havingSpec2)
         .build();
 
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, 
mapper);
 
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, 
mapper);
 
     Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), 
strategy2.computeCacheKey(query2)));
     Assert.assertFalse(Arrays.equals(
@@ -490,11 +498,12 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
         ))
         .build();
 
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, 
mapper);
 
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, 
mapper);
 
     Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), 
strategy2.computeCacheKey(query2)));
     Assert.assertFalse(Arrays.equals(
@@ -512,6 +521,48 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
     doTestCacheStrategy(ColumnType.LONG, 2L);
   }
 
+  @Test
+  public void testComplexDimensionCacheStrategy() throws IOException
+  {
+    final GroupByQuery query1 = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(ImmutableList.of(
+            new DefaultDimensionSpec(
+                "test",
+                "test",
+                
ColumnType.ofComplex(SerializablePairLongStringComplexMetricSerde.TYPE_NAME)
+            )
+        ))
+        .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
+        .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
+        .build();
+
+    ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+
+    CacheStrategy<ResultRow, Object, GroupByQuery> strategy =
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, 
objectMapper);
+
+    // test timestamps that result in integer size millis
+    final ResultRow result1 = ResultRow.of(
+        123L,
+        new SerializablePairLongString(123L, "abc"),
+        1
+    );
+
+    Object preparedValue = 
strategy.prepareForSegmentLevelCache().apply(result1);
+
+    Object fromCacheValue = objectMapper.readValue(
+        objectMapper.writeValueAsBytes(preparedValue),
+        strategy.getCacheObjectClazz()
+    );
+
+    ResultRow fromCacheResult = 
strategy.pullFromSegmentLevelCache().apply(fromCacheValue);
+
+    Assert.assertEquals(result1, fromCacheResult);
+  }
+
   @Test
   public void testMultiColumnCacheStrategy() throws Exception
   {
@@ -538,8 +589,9 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
         .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
         .build();
 
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
     CacheStrategy<ResultRow, Object, GroupByQuery> strategy =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, 
mapper);
 
     // test timestamps that result in integer size millis
     final ResultRow result1 = ResultRow.of(
@@ -1054,8 +1106,9 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
         .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
         .build();
 
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
     CacheStrategy<ResultRow, Object, GroupByQuery> strategy =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, 
mapper);
 
     // test timestamps that result in integer size millis
     final ResultRow result1 = ResultRow.of(
@@ -1147,11 +1200,12 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
         .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
         .build();
 
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, 
mapper);
 
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, 
mapper);
 
     Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), 
strategy2.computeCacheKey(query2)));
     Assert.assertFalse(Arrays.equals(
@@ -1183,11 +1237,12 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
         
.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN,
 "false"))
         .build();
 
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query1, 
mapper);
 
     final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 =
-        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2);
+        new GroupByQueryQueryToolChest(null, null).getCacheStrategy(query2, 
mapper);
 
     Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), 
strategy2.computeCacheKey(query2)));
     Assert.assertTrue(
@@ -1245,7 +1300,8 @@ public class GroupByQueryQueryToolChestTest extends 
InitializedNullHandlingTest
         QueryRunnerTestHelper.NOOP_QUERYWATCHER
     );
     final GroupByQueryQueryToolChest queryToolChest = new 
GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool);
-    CacheStrategy<ResultRow, Object, GroupByQuery> cacheStrategy = 
queryToolChest.getCacheStrategy(query);
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
+    CacheStrategy<ResultRow, Object, GroupByQuery> cacheStrategy = 
queryToolChest.getCacheStrategy(query, mapper);
     Assert.assertTrue(
         "result level cache on broker server for GroupByStrategyV2 should be 
enabled",
         cacheStrategy.isCacheable(query, false, false)
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index d4dc8734130..a5dbb49bca5 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Sets;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.Row;
 import org.apache.druid.data.input.Rows;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.java.util.common.IAE;
@@ -9965,7 +9966,6 @@ public class GroupByQueryRunnerTest extends 
InitializedNullHandlingTest
   @Test
   public void testGroupByComplexColumn()
   {
-    cannotVectorize();
     GroupByQuery query = makeQueryBuilder()
         .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
         .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
@@ -9979,7 +9979,8 @@ public class GroupByQueryRunnerTest extends 
InitializedNullHandlingTest
         .setGranularity(QueryRunnerTestHelper.ALL_GRAN)
         .build();
 
-    expectedException.expect(RuntimeException.class);
+    expectedException.expect(DruidException.class);
+    expectedException.expectMessage("Type [COMPLEX<hyperUnique>] is not 
groupable");
     GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java 
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 7bcb4c2ce03..5fa34d6699d 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -275,7 +275,7 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
       this.responseContext = responseContext;
       this.query = queryPlus.getQuery();
       this.toolChest = warehouse.getToolChest(query);
-      this.strategy = toolChest.getCacheStrategy(query);
+      this.strategy = toolChest.getCacheStrategy(query, objectMapper);
       this.dataSourceAnalysis = query.getDataSource().getAnalysis();
 
       this.useCache = CacheUtil.isUseSegmentCache(query, strategy, 
cacheConfig, CacheUtil.ServerType.BROKER);
diff --git 
a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java 
b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
index 9bb9f474dd9..41d4bb4ea63 100644
--- a/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
+++ b/server/src/main/java/org/apache/druid/client/CachingQueryRunner.java
@@ -86,7 +86,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
   public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext 
responseContext)
   {
     Query<T> query = queryPlus.getQuery();
-    final CacheStrategy strategy = toolChest.getCacheStrategy(query);
+    final CacheStrategy strategy = toolChest.getCacheStrategy(query, mapper);
     final boolean populateCache = canPopulateCache(query, strategy);
     final boolean useCache = canUseCache(query, strategy);
 
diff --git 
a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
 
b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
index 182faba7a09..0af6ebca3ed 100644
--- 
a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
+++ 
b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
@@ -73,7 +73,7 @@ public class ResultLevelCachingQueryRunner<T> implements 
QueryRunner<T>
     this.cache = cache;
     this.cacheConfig = cacheConfig;
     this.query = query;
-    this.strategy = queryToolChest.getCacheStrategy(query);
+    this.strategy = queryToolChest.getCacheStrategy(query, objectMapper);
     this.populateResultCache = CacheUtil.isPopulateResultCache(
         query,
         strategy,
diff --git 
a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java 
b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
index a4375a61900..7208ab2fc4b 100644
--- a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
@@ -68,6 +68,7 @@ import org.apache.druid.query.topn.TopNQueryBuilder;
 import org.apache.druid.query.topn.TopNQueryConfig;
 import org.apache.druid.query.topn.TopNQueryQueryToolChest;
 import org.apache.druid.query.topn.TopNResultValue;
+import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.easymock.EasyMock;
 import org.joda.time.DateTime;
 import org.junit.Assert;
@@ -90,7 +91,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 @RunWith(Parameterized.class)
-public class CachingQueryRunnerTest
+public class CachingQueryRunnerTest extends InitializedNullHandlingTest
 {
   @Parameterized.Parameters(name = "numBackgroundThreads={0}")
   public static Iterable<Object[]> constructorFeeder()
@@ -222,8 +223,8 @@ public class CachingQueryRunnerTest
     Cache cache = EasyMock.mock(Cache.class);
     EasyMock.replay(cache);
     CachingQueryRunner queryRunner = makeCachingQueryRunner(null, cache, 
toolchest, Sequences.empty());
-    Assert.assertFalse(queryRunner.canPopulateCache(query, 
toolchest.getCacheStrategy(query)));
-    Assert.assertFalse(queryRunner.canUseCache(query, 
toolchest.getCacheStrategy(query)));
+    Assert.assertFalse(queryRunner.canPopulateCache(query, 
toolchest.getCacheStrategy(query, null)));
+    Assert.assertFalse(queryRunner.canUseCache(query, 
toolchest.getCacheStrategy(query, null)));
     queryRunner.run(QueryPlus.wrap(query));
     EasyMock.verifyUnexpectedCalls(cache);
   }
@@ -243,7 +244,7 @@ public class CachingQueryRunnerTest
 
     QueryToolChest toolchest = EasyMock.mock(QueryToolChest.class);
     Cache cache = EasyMock.mock(Cache.class);
-    EasyMock.expect(toolchest.getCacheStrategy(query)).andReturn(null);
+    EasyMock.expect(toolchest.getCacheStrategy(EasyMock.eq(query), 
EasyMock.anyObject())).andReturn(null);
     EasyMock.replay(cache, toolchest);
     CachingQueryRunner queryRunner = makeCachingQueryRunner(new byte[0], 
cache, toolchest, Sequences.empty());
     Assert.assertFalse(queryRunner.canPopulateCache(query, null));
@@ -339,7 +340,7 @@ public class CachingQueryRunnerTest
         resultSeq
     );
 
-    CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query);
+    CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query, null);
     Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey(
         CACHE_ID,
         SEGMENT_DESCRIPTOR,
@@ -383,7 +384,7 @@ public class CachingQueryRunnerTest
 
     byte[] cacheKeyPrefix = RandomUtils.nextBytes(10);
 
-    CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query);
+    CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query, null);
     Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey(
         CACHE_ID,
         SEGMENT_DESCRIPTOR,
@@ -399,7 +400,7 @@ public class CachingQueryRunnerTest
         toolchest,
         Sequences.empty()
     );
-    Assert.assertTrue(runner.canUseCache(query, 
toolchest.getCacheStrategy(query)));
+    Assert.assertTrue(runner.canUseCache(query, 
toolchest.getCacheStrategy(query, null)));
     List<Result> results = runner.run(QueryPlus.wrap(query)).toList();
     Assert.assertEquals(expectedResults.toString(), results.toString());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to