danny0405 commented on code in PR #9133:
URL: https://github.com/apache/hudi/pull/9133#discussion_r1255291076


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java:
##########
@@ -165,21 +192,132 @@ void add(int pos, LogicalType fromType, LogicalType 
toType) {
         }
         break;
       }
+      case ARRAY: {
+        if (from == ARRAY) {
+          LogicalType fromElementType =  fromType.getChildren().get(0);
+          LogicalType toElementType = toType.getChildren().get(0);
+          return array -> doArrayConversion((ArrayData) array, 
fromElementType, toElementType);
+        }
+        break;
+      }
+      case MAP: {
+        if (from == MAP) {
+          return map -> doMapConversion((MapData) map, fromType, toType);
+        }
+        break;
+      }
+      case ROW: {
+        if (from == ROW) {
+          // Assumption: InternalSchemaManager should produce a cast that is 
of the same size
+          return row -> doRowConversion((RowData) row, fromType, toType);
+        }
+        break;
+      }
       default:
     }
-    return null;
+    throw new IllegalArgumentException(String.format("Unsupported conversion 
for %s => %s", fromType, toType));
   }
 
-  private void add(int pos, Cast cast) {
-    castMap.put(pos, cast);
+  /**
+   * Helper function to perform convert an arrayData from one LogicalType to 
another.
+   *
+   * @param array    Non-null array data to be converted; however 
array-elements are allowed to be null
+   * @param fromType The input LogicalType of the row data to be converted from
+   * @param toType   The output LogicalType of the row data to be converted to
+   * @return Converted array that has the structure/specifications of that 
defined by the output LogicalType
+   */
+  private static ArrayData doArrayConversion(@Nonnull ArrayData array, 
LogicalType fromType, LogicalType toType) {
+    // using Object type here as primitives are not allowed to be null
+    Object[] objects = new Object[array.size()];
+    for (int i = 0; i < array.size(); i++) {
+      Object fromObject = 
ArrayData.createElementGetter(fromType).getElementOrNull(array, i);
+      // need to handle nulls to prevent NullPointerException in 
#getConversion()
+      Object toObject = fromObject != null ? getConversion(fromType, 
toType).apply(fromObject) : null;
+      objects[i] = toObject;
+    }
+    return new GenericArrayData(objects);
+  }
+
+  /**
+   * Helper function to perform convert a MapData from one LogicalType to 
another.
+   *
+   * @param map      Non-null map data to be converted; however, values are 
allowed to be null
+   * @param fromType The input LogicalType of the row data to be converted from
+   * @param toType   The output LogicalType of the row data to be converted to
+   * @return Converted map that has the structure/specifications of that 
defined by the output LogicalType
+   */
+  private static MapData doMapConversion(@Nonnull MapData map, LogicalType 
fromType, LogicalType toType) {
+    // no schema evolution is allowed on the keyType, hence, we only need to 
care about the valueType
+    LogicalType fromValueType = fromType.getChildren().get(1);
+    LogicalType toValueType = toType.getChildren().get(1);
+    LogicalType keyType = fromType.getChildren().get(0);
+
+    final Map<Object, Object> result = new HashMap<>();
+    for (int i = 0; i < map.size(); i++) {
+      Object keyObject = 
ArrayData.createElementGetter(keyType).getElementOrNull(map.keyArray(), i);
+      Object fromObject = 
ArrayData.createElementGetter(fromValueType).getElementOrNull(map.valueArray(), 
i);
+      // need to handle nulls to prevent NullPointerException in 
#getConversion()
+      Object toObject = fromObject != null ? getConversion(fromValueType, 
toValueType).apply(fromObject) : null;
+      result.put(keyObject, toObject);
+    }
+    return new GenericMapData(result);
+  }
+
+  /**
+   * Helper function to perform convert a RowData from one LogicalType to 
another.
+   *
+   * @param row      Non-null row data to be converted; however, fields might 
contain nulls
+   * @param fromType The input LogicalType of the row data to be converted from
+   * @param toType   The output LogicalType of the row data to be converted to
+   * @return Converted row that has the structure/specifications of that 
defined by the output LogicalType
+   */
+  private static RowData doRowConversion(@Nonnull RowData row, LogicalType 
fromType, LogicalType toType) {
+    // note: InternalSchema.merge guarantees that the schema to be read 
fromType is orientated in the same order as toType
+    // hence, we can match types by position as it is guaranteed that it is 
referencing the same field
+    List<LogicalType> fromChildren = fromType.getChildren();
+    List<LogicalType> toChildren = toType.getChildren();
+    ValidationUtils.checkArgument(fromChildren.size() == toChildren.size(),
+        "fromType [" + fromType + "] size: != toType [" + toType + "] size");
+
+    GenericRowData rowData = new GenericRowData(toType.getChildren().size());
+    for (int i = 0; i < toChildren.size(); i++) {
+      Object fromVal = RowData.createFieldGetter(fromChildren.get(i), 
i).getFieldOrNull(row);
+      Object toVal;

Review Comment:
   caution for the performance, because you are constructing the field getter 
for each row->row conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to