abhishekrb19 commented on code in PR #14319:
URL: https://github.com/apache/druid/pull/14319#discussion_r1199575200


##########
processing/src/main/java/org/apache/druid/segment/column/ColumnType.java:
##########
@@ -152,7 +152,7 @@ public static ColumnType ofComplex(@Nullable String 
complexTypeName)
    *                                                                   
inference
    */
   @Nullable
-  public static ColumnType leastRestrictiveType(@Nullable ColumnType type, 
@Nullable ColumnType other)
+  public static ColumnType leastRestrictiveType(@Nullable ColumnType type, 
@Nullable ColumnType other) throws IncompatibleTypeException

Review Comment:
   Good call on the new exception type. May also want to update the javadoc to 
reflect `IllegalArgumentException` -> `IncompatibleTypeException`



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java:
##########
@@ -995,4 +986,117 @@ void doInLock(Runnable runnable)
       runnable.run();
     }
   }
+
+
+  /**
+   * ColumnTypeMergePolicy defines the rules of which type to use when faced 
with the possibility of different types
+   * for the same column from segment to segment. It is used to help compute a 
{@link RowSignature} for a table in
+   * Druid based on the segment metadata of all segments, merging the types of 
each column encountered to end up with
+   * a single type to represent it globally.
+   */
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = 
FirstTypeMergePolicy.class)
+  @JsonSubTypes(value = {
+      @JsonSubTypes.Type(name = FirstTypeMergePolicy.NAME, value = 
FirstTypeMergePolicy.class),
+      @JsonSubTypes.Type(name = LeastRestrictiveTypeMergePolicy.NAME, value = 
LeastRestrictiveTypeMergePolicy.class)
+  })
+  @FunctionalInterface
+  public interface ColumnTypeMergePolicy
+  {
+    ColumnType merge(ColumnType existingType, ColumnType newType);
+  }
+
+  /**
+   * Classic logic, we use the first type we encounter. This policy is 
effectively 'newest first' because we iterated
+   * segments starting from the most recent time chunk, so this typically 
results in the most recently used type being
+   * chosen, at least for systems that are continuously updated with 'current' 
data.
+   *
+   * Since {@link ColumnTypeMergePolicy} are used to compute the SQL schema, 
at least in systems using SQL schemas which
+   * are poartially or fully computed by this cache, this merge policy can 
result in query time errors if incompatible
+   * types are mixed if the chosen type is more restrictive than the types of 
some segments. If data is likely to vary
+   * in type across segments, consider using {@link 
LeastRestrictiveTypeMergePolicy} instead.
+   */
+  public static class FirstTypeMergePolicy implements ColumnTypeMergePolicy
+  {
+    public static final String NAME = "newestFirst";
+
+    @Override
+    public ColumnType merge(ColumnType existingType, ColumnType newType)
+    {
+      if (existingType == null) {
+        return newType;
+      }
+      if (newType == null) {
+        return existingType;
+      }
+      // if any are json, are all json
+      if (ColumnType.NESTED_DATA.equals(newType) || 
ColumnType.NESTED_DATA.equals(existingType)) {
+        return ColumnType.NESTED_DATA;
+      }
+      // "existing type" is the 'newest' type, since we iterate the segments 
list by newest start time
+      return existingType;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(NAME);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public String toString()
+    {
+      return NAME;
+    }
+  }
+
+  /**
+   * Resolves types using {@link ColumnType#leastRestrictiveType(ColumnType, 
ColumnType)} to find the ColumnType that
+   * can best represent all data contained across all segments.

Review Comment:
   Is it actually type resolved from all the segments or limited to the most 
recent `MAX_SEGMENTS_PER_QUERY` segments per query? (hardcoded to 15000)



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java:
##########
@@ -808,20 +812,7 @@ DatasourceTable.PhysicalDatasourceMetadata 
buildDruidTable(final String dataSour
                 rowSignature.getColumnType(column)
                             .orElseThrow(() -> new ISE("Encountered null type 
for column [%s]", column));
 
-            columnTypes.compute(column, (c, existingType) -> {

Review Comment:
   With the new `leastRestrictive` merge policy, the above comment `// Newer 
column types should override older ones.` may not necessarily be true? 



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java:
##########
@@ -995,4 +986,117 @@ void doInLock(Runnable runnable)
       runnable.run();
     }
   }
+
+
+  /**
+   * ColumnTypeMergePolicy defines the rules of which type to use when faced 
with the possibility of different types
+   * for the same column from segment to segment. It is used to help compute a 
{@link RowSignature} for a table in
+   * Druid based on the segment metadata of all segments, merging the types of 
each column encountered to end up with
+   * a single type to represent it globally.
+   */
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = 
FirstTypeMergePolicy.class)
+  @JsonSubTypes(value = {
+      @JsonSubTypes.Type(name = FirstTypeMergePolicy.NAME, value = 
FirstTypeMergePolicy.class),
+      @JsonSubTypes.Type(name = LeastRestrictiveTypeMergePolicy.NAME, value = 
LeastRestrictiveTypeMergePolicy.class)
+  })
+  @FunctionalInterface
+  public interface ColumnTypeMergePolicy
+  {
+    ColumnType merge(ColumnType existingType, ColumnType newType);
+  }
+
+  /**
+   * Classic logic, we use the first type we encounter. This policy is 
effectively 'newest first' because we iterated
+   * segments starting from the most recent time chunk, so this typically 
results in the most recently used type being
+   * chosen, at least for systems that are continuously updated with 'current' 
data.
+   *
+   * Since {@link ColumnTypeMergePolicy} are used to compute the SQL schema, 
at least in systems using SQL schemas which
+   * are poartially or fully computed by this cache, this merge policy can 
result in query time errors if incompatible
+   * types are mixed if the chosen type is more restrictive than the types of 
some segments. If data is likely to vary
+   * in type across segments, consider using {@link 
LeastRestrictiveTypeMergePolicy} instead.
+   */
+  public static class FirstTypeMergePolicy implements ColumnTypeMergePolicy

Review Comment:
   nit: should we align this class name with the policy name  - 
`NewestFirstTypeMergePolicy` or `LatestTypeMergePolicy`?



##########
processing/src/main/java/org/apache/druid/segment/column/ColumnType.java:
##########
@@ -218,13 +220,21 @@ public static ColumnType leastRestrictiveType(@Nullable 
ColumnType type, @Nullab
     }
 
     // all numbers win over longs
-    // floats vs doubles would be handled here, but we currently only support 
doubles...
     if (Types.is(type, ValueType.LONG) && Types.isNullOr(other, 
ValueType.LONG)) {
       return ColumnType.LONG;
     }
+    // doubles win over floats
     if (Types.is(type, ValueType.FLOAT) && Types.isNullOr(other, 
ValueType.FLOAT)) {
       return ColumnType.FLOAT;
     }
     return ColumnType.DOUBLE;
   }
+
+  public static class IncompatibleTypeException extends IAE
+  {
+    public IncompatibleTypeException(ColumnType type, ColumnType other)
+    {
+      super("Cannot implicitly cast %s to %s", type, other);

Review Comment:
   ```suggestion
         super("Cannot implicitly cast [%s] to [%s]", type, other);
   ```
   
   nit:  I think it'll also be helpful to pass in the column name so it's clear 
which column has incompatible types



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java:
##########
@@ -995,4 +986,117 @@ void doInLock(Runnable runnable)
       runnable.run();
     }
   }
+
+
+  /**
+   * ColumnTypeMergePolicy defines the rules of which type to use when faced 
with the possibility of different types
+   * for the same column from segment to segment. It is used to help compute a 
{@link RowSignature} for a table in
+   * Druid based on the segment metadata of all segments, merging the types of 
each column encountered to end up with
+   * a single type to represent it globally.
+   */
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = 
FirstTypeMergePolicy.class)
+  @JsonSubTypes(value = {
+      @JsonSubTypes.Type(name = FirstTypeMergePolicy.NAME, value = 
FirstTypeMergePolicy.class),
+      @JsonSubTypes.Type(name = LeastRestrictiveTypeMergePolicy.NAME, value = 
LeastRestrictiveTypeMergePolicy.class)
+  })
+  @FunctionalInterface
+  public interface ColumnTypeMergePolicy
+  {
+    ColumnType merge(ColumnType existingType, ColumnType newType);
+  }
+
+  /**
+   * Classic logic, we use the first type we encounter. This policy is 
effectively 'newest first' because we iterated
+   * segments starting from the most recent time chunk, so this typically 
results in the most recently used type being
+   * chosen, at least for systems that are continuously updated with 'current' 
data.
+   *
+   * Since {@link ColumnTypeMergePolicy} are used to compute the SQL schema, 
at least in systems using SQL schemas which
+   * are poartially or fully computed by this cache, this merge policy can 
result in query time errors if incompatible

Review Comment:
   ```suggestion
      * are partially or fully computed by this cache, this merge policy can 
result in query time errors if incompatible
   ```



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java:
##########
@@ -995,4 +986,117 @@ void doInLock(Runnable runnable)
       runnable.run();
     }
   }
+
+
+  /**
+   * ColumnTypeMergePolicy defines the rules of which type to use when faced 
with the possibility of different types

Review Comment:
   nice commentary, the code is easy to follow 👍 



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/SegmentMetadataCacheConfig.java:
##########
@@ -113,6 +94,7 @@ public String toString()
            ", metadataSegmentCacheEnable=" + metadataSegmentCacheEnable +
            ", metadataSegmentPollPeriod=" + metadataSegmentPollPeriod +
            ", awaitInitializationOnStart=" + awaitInitializationOnStart +
+           ", columnTypeMergePolicy=" + metadataColumnTypeMergePolicy +

Review Comment:
   ```suggestion
              ", metadataColumnTypeMergePolicy=" + 
metadataColumnTypeMergePolicy +
   ```



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java:
##########
@@ -995,4 +986,117 @@ void doInLock(Runnable runnable)
       runnable.run();
     }
   }
+
+
+  /**
+   * ColumnTypeMergePolicy defines the rules of which type to use when faced 
with the possibility of different types
+   * for the same column from segment to segment. It is used to help compute a 
{@link RowSignature} for a table in
+   * Druid based on the segment metadata of all segments, merging the types of 
each column encountered to end up with
+   * a single type to represent it globally.
+   */
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = 
FirstTypeMergePolicy.class)
+  @JsonSubTypes(value = {
+      @JsonSubTypes.Type(name = FirstTypeMergePolicy.NAME, value = 
FirstTypeMergePolicy.class),
+      @JsonSubTypes.Type(name = LeastRestrictiveTypeMergePolicy.NAME, value = 
LeastRestrictiveTypeMergePolicy.class)
+  })
+  @FunctionalInterface
+  public interface ColumnTypeMergePolicy
+  {
+    ColumnType merge(ColumnType existingType, ColumnType newType);
+  }
+
+  /**
+   * Classic logic, we use the first type we encounter. This policy is 
effectively 'newest first' because we iterated
+   * segments starting from the most recent time chunk, so this typically 
results in the most recently used type being
+   * chosen, at least for systems that are continuously updated with 'current' 
data.
+   *
+   * Since {@link ColumnTypeMergePolicy} are used to compute the SQL schema, 
at least in systems using SQL schemas which
+   * are poartially or fully computed by this cache, this merge policy can 
result in query time errors if incompatible
+   * types are mixed if the chosen type is more restrictive than the types of 
some segments. If data is likely to vary
+   * in type across segments, consider using {@link 
LeastRestrictiveTypeMergePolicy} instead.
+   */
+  public static class FirstTypeMergePolicy implements ColumnTypeMergePolicy
+  {
+    public static final String NAME = "newestFirst";
+
+    @Override
+    public ColumnType merge(ColumnType existingType, ColumnType newType)
+    {
+      if (existingType == null) {
+        return newType;
+      }
+      if (newType == null) {
+        return existingType;
+      }
+      // if any are json, are all json
+      if (ColumnType.NESTED_DATA.equals(newType) || 
ColumnType.NESTED_DATA.equals(existingType)) {
+        return ColumnType.NESTED_DATA;
+      }
+      // "existing type" is the 'newest' type, since we iterate the segments 
list by newest start time
+      return existingType;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(NAME);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public String toString()
+    {
+      return NAME;
+    }
+  }
+
+  /**
+   * Resolves types using {@link ColumnType#leastRestrictiveType(ColumnType, 
ColumnType)} to find the ColumnType that
+   * can best represent all data contained across all segments.

Review Comment:
   Also, I wonder what the performance implications for choosing the 
`leastRestrictive` strategy are, given that this policy has to scan many/all 
segments per data source. Should we call out any gotchas in the documentation 
explicitly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to