Au-Miner commented on code in PR #28074:
URL: https://github.com/apache/flink/pull/28074#discussion_r3394338085


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java:
##########
@@ -50,11 +57,59 @@ public final class LogicalTypeUtils {
     private static final String ATOMIC_FIELD_NAME = "f0";
 
     private static final TimeAttributeRemover TIME_ATTRIBUTE_REMOVER = new 
TimeAttributeRemover();
+    private static final NullabilityNormalizer NULLABILITY_NORMALIZER = new 
NullabilityNormalizer();
 
     public static LogicalType removeTimeAttributes(LogicalType logicalType) {
         return logicalType.accept(TIME_ATTRIBUTE_REMOVER);
     }
 
+    public static LogicalType normalizeNullability(LogicalType logicalType) {
+        return logicalType.accept(NULLABILITY_NORMALIZER);
+    }
+
+    /**
+     * Returns true when new types are structurally equal to old types 
ignoring nullability, and no
+     * type narrows from nullable to non-nullable.
+     */
+    public static boolean areTypesCompatibleAfterNullabilityWidening(
+            LogicalType[] newTypes, LogicalType[] oldTypes) {
+        if (newTypes == oldTypes) {
+            return true;
+        }
+        if (newTypes == null || oldTypes == null || newTypes.length != 
oldTypes.length) {
+            return false;
+        }
+        for (int i = 0; i < newTypes.length; i++) {
+            if (!isTypeCompatibleAfterNullabilityWidening(newTypes[i], 
oldTypes[i])) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static boolean isTypeCompatibleAfterNullabilityWidening(
+            LogicalType newType, LogicalType oldType) {
+        if (newType == oldType) {
+            return true;
+        }
+        if (newType == null || oldType == null) {
+            return false;
+        }
+        return 
normalizeNullability(newType).equals(normalizeNullability(oldType))
+                && !hasNullabilityNarrowing(newType, oldType);
+    }
+
+    public static boolean isSerializerCompatibleAfterNullabilityWidening(

Review Comment:
   Done. Moved the serializer helpers to a new 
SerializerSchemaCompatibilityUtils in flink-table-type-utils. Type-level 
helpers stay in LogicalTypeUtils.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to