This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d1cbf68bfb [core] Support default values for complex data types (#6046)
d1cbf68bfb is described below

commit d1cbf68bfb91e490d2a5b58875f9780824f0ea64
Author: xiaochen <[email protected]>
AuthorDate: Sat Aug 16 17:24:29 2025 +0800

    [core] Support default values for complex data types (#6046)
---
 docs/content/flink/default-value.md                |  14 +-
 docs/content/spark/default-value.md                |  34 +++-
 .../java/org/apache/paimon/utils/StringUtils.java  |   8 +
 .../org/apache/paimon/casting/CastExecutors.java   |   4 +
 .../paimon/casting/StringToArrayCastRule.java      | 165 ++++++++++++++++
 .../apache/paimon/casting/StringToMapCastRule.java | 207 ++++++++++++++++++++
 .../apache/paimon/casting/StringToRowCastRule.java | 196 +++++++++++++++++++
 .../apache/paimon/casting/CastExecutorTest.java    | 149 +++++++++++++++
 .../apache/paimon/data/DefaultValueRowTest.java    | 159 ++++++++++++++++
 .../org/apache/paimon/flink/BranchSqlITCase.java   | 102 ++++++++++
 .../org/apache/paimon/spark/SparkWriteITCase.java  | 210 +++++++++++++++++++++
 11 files changed, 1244 insertions(+), 4 deletions(-)

diff --git a/docs/content/flink/default-value.md 
b/docs/content/flink/default-value.md
index 9030d6f14b..3da7ef5e7e 100644
--- a/docs/content/flink/default-value.md
+++ b/docs/content/flink/default-value.md
@@ -37,7 +37,10 @@ Flink SQL does not have native support for default values, 
so we can only create
 CREATE TABLE my_table (
     a BIGINT,
     b STRING,
-    c INT
+    c INT,
+    tags ARRAY<STRING>,
+    properties MAP<STRING, STRING>,
+    nested ROW<x INT, y STRING>
 );
 ```
 
@@ -45,8 +48,14 @@ We support the procedure of modifying column default values 
in Flink. You can ad
 creating the table:
 
 ```sql
+-- Set simple type default values
 CALL sys.alter_column_default_value('default.my_table', 'b', 'my_value');
 CALL sys.alter_column_default_value('default.my_table', 'c', '5');
+
+-- Set complex type default values
+CALL sys.alter_column_default_value('default.my_table', 'tags', '[tag1, tag2, 
tag3]');
+CALL sys.alter_column_default_value('default.my_table', 'properties', '{key1 
-> value1, key2 -> value2}');
+CALL sys.alter_column_default_value('default.my_table', 'nested', '{42, 
default_value}');
 ```
 
 ## Insert Table
@@ -60,5 +69,6 @@ For example:
 INSERT INTO my_table (a) VALUES (1), (2);
 
 SELECT * FROM my_table;
--- result: [[1, 5, my_value], [2, 5, my_value]]
+-- result: [[1, my_value, 5, [tag1, tag2, tag3], {key1=value1, key2=value2}, 
+I[42, default_value]],
+--          [2, my_value, 5, [tag1, tag2, tag3], {key1=value1, key2=value2}, 
+I[42, default_value]]]
 ```
diff --git a/docs/content/spark/default-value.md 
b/docs/content/spark/default-value.md
index ba5423133e..a8b4f3c4e6 100644
--- a/docs/content/spark/default-value.md
+++ b/docs/content/spark/default-value.md
@@ -37,7 +37,10 @@ You can create a table with columns with default values 
using the following SQL:
 CREATE TABLE my_table (
     a BIGINT,
     b STRING DEFAULT 'my_value',
-    c INT DEFAULT 5 
+    c INT DEFAULT 5,
+    tags ARRAY<STRING> DEFAULT ARRAY('tag1', 'tag2', 'tag3'),
+    properties MAP<STRING, STRING> DEFAULT MAP('key1', 'value1', 'key2', 
'value2'),
+    nested STRUCT<x: INT, y: STRING> DEFAULT STRUCT(42, 'default_value')
 );
 ```
 
@@ -46,6 +49,16 @@ CREATE TABLE my_table (
 For SQL commands that execute table writes, such as the `INSERT`, `UPDATE`, 
and `MERGE` commands, the `DEFAULT` keyword
 or `NULL` value is parsed into the default value specified for the 
corresponding column.
 
+For example:
+
+```sql
+INSERT INTO my_table (a) VALUES (1), (2);
+
+SELECT * FROM my_table;
+-- result: [[1, my_value, 5, [tag1, tag2, tag3], {'key1' -> 'value1', 'key2' 
-> 'value2'}, {42, default_value}],
+--          [2, my_value, 5, [tag1, tag2, tag3], {'key1' -> 'value1', 'key2' 
-> 'value2'}, {42, default_value}]]
+```
+
 ## Alter Default Value
 
 Paimon supports alter column default value.
@@ -64,7 +77,24 @@ INSERT INTO T (a) VALUES (2);
 -- result: [[1, 2], [2, 3]]
 ```
 
-The default value of `'b'` column has been changed to 3 from 2.
+The default value of `'b'` column has been changed to 3 from 2. You can also 
alter default values for complex types:
+
+```sql
+ALTER TABLE my_table ALTER COLUMN tags SET DEFAULT ARRAY('new_tag1', 
'new_tag2');
+
+INSERT INTO my_table (a) VALUES (3);
+-- result: [[1, my_value, 5, [tag1, tag2, tag3], {'key1' -> 'value1', 'key2' 
-> 'value2'}, {42, default_value}],
+--          [2, my_value, 5, [tag1, tag2, tag3], {'key1' -> 'value1', 'key2' 
-> 'value2'}, {42, default_value}],
+--          [3, my_value, 5, [new_tag1, new_tag2], {'key1' -> 'value1', 'key2' 
-> 'value2'}, {42, default_value}]]
+
+ALTER TABLE my_table ALTER COLUMN properties SET DEFAULT MAP('new_key', 
'new_value');
+
+INSERT INTO my_table (a) VALUES (4);
+-- result: [[1, my_value, 5, [tag1, tag2, tag3], {'key1' -> 'value1', 'key2' 
-> 'value2'}, {42, default_value}],
+--          [2, my_value, 5, [tag1, tag2, tag3], {'key1' -> 'value1', 'key2' 
-> 'value2'}, {42, default_value}],
+--          [3, my_value, 5, [new_tag1, new_tag2], {'key1' -> 'value1', 'key2' 
-> 'value2'}, {42, default_value}],
+--          [4, my_value, 5, [new_tag1, new_tag2], {'new_key' -> 'new_value'}, 
{42, default_value}]]
+```
 
 ## Limitation
 
diff --git a/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java 
b/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
index ca32b50b68..e20151f35e 100644
--- a/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -554,4 +554,12 @@ public class StringUtils {
         }
         return value.toLowerCase();
     }
+
+    public static boolean isOpenBracket(char c) {
+        return c == '[' || c == '{' || c == '(';
+    }
+
+    public static boolean isCloseBracket(char c) {
+        return c == ']' || c == '}' || c == ')';
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java 
b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java
index 5971f80c4d..6822c2c3ce 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java
@@ -70,6 +70,10 @@ public class CastExecutors {
                 .addRule(StringToTimeCastRule.INSTANCE)
                 .addRule(StringToTimestampCastRule.INSTANCE)
                 .addRule(StringToBinaryCastRule.INSTANCE)
+                .addRule(StringToArrayCastRule.INSTANCE)
+                .addRule(StringToMapCastRule.INSTANCE)
+                .addRule(StringToRowCastRule.INSTANCE)
+
                 // Date/Time/Timestamp rules
                 .addRule(TimestampToTimestampCastRule.INSTANCE)
                 .addRule(TimestampToDateCastRule.INSTANCE)
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/casting/StringToArrayCastRule.java
 
b/paimon-common/src/main/java/org/apache/paimon/casting/StringToArrayCastRule.java
new file mode 100644
index 0000000000..2e48e0b825
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/casting/StringToArrayCastRule.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** {@link DataTypeFamily#CHARACTER_STRING} to {@link DataTypeRoot#ARRAY} cast 
rule. */
+class StringToArrayCastRule extends AbstractCastRule<BinaryString, 
InternalArray> {
+
+    static final StringToArrayCastRule INSTANCE = new StringToArrayCastRule();
+
+    // Pattern for bracket format: [element1, element2, element3]
+    private static final Pattern BRACKET_ARRAY_PATTERN = 
Pattern.compile("^\\s*\\[(.*)\\]\\s*$");
+
+    // Pattern for SQL function format: ARRAY(element1, element2, element3)
+    private static final Pattern FUNCTION_ARRAY_PATTERN =
+            Pattern.compile("^\\s*ARRAY\\s*\\((.*)\\)\\s*$", 
Pattern.CASE_INSENSITIVE);
+
+    private StringToArrayCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(DataTypeFamily.CHARACTER_STRING)
+                        .target(DataTypeRoot.ARRAY)
+                        .build());
+    }
+
+    @Override
+    public CastExecutor<BinaryString, InternalArray> create(
+            DataType inputType, DataType targetType) {
+        ArrayType arrayType = (ArrayType) targetType;
+        @SuppressWarnings("unchecked")
+        CastExecutor<BinaryString, Object> elementCastExecutor =
+                (CastExecutor<BinaryString, Object>)
+                        CastExecutors.resolve(VarCharType.STRING_TYPE, 
arrayType.getElementType());
+        if (elementCastExecutor == null) {
+            throw new RuntimeException(
+                    "Cannot cast string to array element type: " + 
arrayType.getElementType());
+        }
+        return value -> parseArray(value, elementCastExecutor);
+    }
+
+    private InternalArray parseArray(
+            BinaryString value, CastExecutor<BinaryString, Object> 
elementCastExecutor) {
+        try {
+            String str = value.toString().trim();
+            if ("[]".equals(str) || "ARRAY()".equalsIgnoreCase(str)) {
+                return new GenericArray(new Object[0]);
+            }
+
+            String content = extractArrayContent(str);
+            if (content.isEmpty()) {
+                return new GenericArray(new Object[0]);
+            }
+
+            List<Object> elements = parseArrayElements(content, 
elementCastExecutor);
+            return new GenericArray(elements.toArray());
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Cannot parse '" + value + "' as ARRAY: " + 
e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Extract content from array string, supporting both bracket format [a, 
b, c] and SQL function
+     * format ARRAY(a, b, c).
+     */
+    private String extractArrayContent(String str) {
+        // Try bracket format first: [element1, element2, element3]
+        Matcher bracketMatcher = BRACKET_ARRAY_PATTERN.matcher(str);
+        if (bracketMatcher.matches()) {
+            return bracketMatcher.group(1).trim();
+        }
+
+        // Try SQL function format: ARRAY(element1, element2, element3)
+        Matcher functionMatcher = FUNCTION_ARRAY_PATTERN.matcher(str);
+        if (functionMatcher.matches()) {
+            return functionMatcher.group(1).trim();
+        }
+
+        throw new RuntimeException(
+                "Invalid array format: " + str + ". Expected format: [a, b, c] 
or ARRAY(a, b, c)");
+    }
+
+    private List<Object> parseArrayElements(
+            String content, CastExecutor<BinaryString, Object> 
elementCastExecutor) {
+        List<Object> elements = new ArrayList<>();
+        for (String token : splitArrayElements(content)) {
+            String trimmedToken = token.trim();
+            Object element =
+                    "null".equals(trimmedToken)
+                            ? null
+                            : 
elementCastExecutor.cast(BinaryString.fromString(trimmedToken));
+            elements.add(element);
+        }
+        return elements;
+    }
+
+    private List<String> splitArrayElements(String content) {
+        List<String> elements = new ArrayList<>();
+        StringBuilder current = new StringBuilder();
+        Stack<Character> bracketStack = new Stack<>();
+        boolean inQuotes = false;
+        boolean escaped = false;
+
+        for (char c : content.toCharArray()) {
+            if (escaped) {
+                escaped = false;
+            } else if (c == '\\') {
+                escaped = true;
+            } else if (c == '"') {
+                inQuotes = !inQuotes;
+            } else if (!inQuotes) {
+                if (StringUtils.isOpenBracket(c)) {
+                    bracketStack.push(c);
+                } else if (StringUtils.isCloseBracket(c) && 
!bracketStack.isEmpty()) {
+                    bracketStack.pop();
+                } else if (c == ',' && bracketStack.isEmpty()) {
+                    addCurrentElement(elements, current);
+                    continue;
+                }
+            }
+            current.append(c);
+        }
+
+        addCurrentElement(elements, current);
+        return elements;
+    }
+
+    private void addCurrentElement(List<String> elements, StringBuilder 
current) {
+        if (current.length() > 0) {
+            elements.add(current.toString());
+            current.setLength(0);
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/casting/StringToMapCastRule.java
 
b/paimon-common/src/main/java/org/apache/paimon/casting/StringToMapCastRule.java
new file mode 100644
index 0000000000..cc366605ee
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/casting/StringToMapCastRule.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** {@link DataTypeFamily#CHARACTER_STRING} to {@link DataTypeRoot#MAP} cast 
rule. */
+class StringToMapCastRule extends AbstractCastRule<BinaryString, InternalMap> {
+
+    static final StringToMapCastRule INSTANCE = new StringToMapCastRule();
+
+    // Pattern for bracket format: {key1 -> value1, key2 -> value2}
+    private static final Pattern BRACKET_MAP_PATTERN = 
Pattern.compile("^\\s*\\{(.*)\\}\\s*$");
+
+    // Pattern for SQL function format: MAP('key1', 'value1', 'key2', 'value2')
+    private static final Pattern FUNCTION_MAP_PATTERN =
+            Pattern.compile("^\\s*MAP\\s*\\((.*)\\)\\s*$", 
Pattern.CASE_INSENSITIVE);
+
+    private static final Pattern ENTRY_PATTERN = 
Pattern.compile("(.+?)\\s*->\\s*(.+)");
+
+    private StringToMapCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(DataTypeFamily.CHARACTER_STRING)
+                        .target(DataTypeRoot.MAP)
+                        .build());
+    }
+
+    @Override
+    public CastExecutor<BinaryString, InternalMap> create(DataType inputType, 
DataType targetType) {
+        MapType mapType = (MapType) targetType;
+        CastExecutor<BinaryString, Object> keyCastExecutor =
+                createCastExecutor(mapType.getKeyType());
+        CastExecutor<BinaryString, Object> valueCastExecutor =
+                createCastExecutor(mapType.getValueType());
+
+        return value -> parseMap(value, keyCastExecutor, valueCastExecutor);
+    }
+
+    private InternalMap parseMap(
+            BinaryString value,
+            CastExecutor<BinaryString, Object> keyCastExecutor,
+            CastExecutor<BinaryString, Object> valueCastExecutor) {
+        try {
+            String str = value.toString().trim();
+            if ("{}".equals(str) || "MAP()".equalsIgnoreCase(str)) {
+                return new GenericMap(new HashMap<>());
+            }
+            return new GenericMap(parseDefaultMap(str, keyCastExecutor, 
valueCastExecutor));
+        } catch (Exception e) {
+            throw new RuntimeException("Cannot parse '" + value + "' as MAP: " 
+ e.getMessage(), e);
+        }
+    }
+
+    private CastExecutor<BinaryString, Object> createCastExecutor(DataType 
targetType) {
+        @SuppressWarnings("unchecked")
+        CastExecutor<BinaryString, Object> executor =
+                (CastExecutor<BinaryString, Object>)
+                        CastExecutors.resolve(VarCharType.STRING_TYPE, 
targetType);
+        if (executor == null) {
+            throw new RuntimeException("Cannot cast string to type: " + 
targetType);
+        }
+        return executor;
+    }
+
+    private Map<Object, Object> parseDefaultMap(
+            String str,
+            CastExecutor<BinaryString, Object> keyCastExecutor,
+            CastExecutor<BinaryString, Object> valueCastExecutor) {
+
+        Map<Object, Object> mapContent = Maps.newHashMap();
+        Matcher bracketMatcher = BRACKET_MAP_PATTERN.matcher(str);
+        if (bracketMatcher.matches()) {
+            // Parse bracket format (arrow-separated entries)
+            String content = bracketMatcher.group(1).trim();
+            return parseMapEntry(content, keyCastExecutor, valueCastExecutor);
+        }
+
+        Matcher functionMatcher = FUNCTION_MAP_PATTERN.matcher(str);
+        if (functionMatcher.matches()) {
+            String functionContent = functionMatcher.group(1).trim();
+            return parseFunctionDefaultMap(functionContent, keyCastExecutor, 
valueCastExecutor);
+        }
+
+        throw new RuntimeException(
+                "Invalid map format: " + str + ". Expected format: {k -> v} or 
MAP(k, v)");
+    }
+
+    private Map<Object, Object> parseFunctionDefaultMap(
+            String content,
+            CastExecutor<BinaryString, Object> keyCastExecutor,
+            CastExecutor<BinaryString, Object> valueCastExecutor) {
+
+        List<String> elements = splitMapEntries(content.trim());
+        if (elements.size() % 2 != 0) {
+            throw new RuntimeException("Invalid Function map format: odd 
number of elements");
+        }
+
+        return IntStream.range(0, elements.size() / 2)
+                .boxed()
+                .collect(
+                        Collectors.toMap(
+                                i -> parseValue(elements.get(i * 2).trim(), 
keyCastExecutor),
+                                i ->
+                                        parseValue(
+                                                elements.get(i * 2 + 1).trim(),
+                                                valueCastExecutor)));
+    }
+
+    private Map<Object, Object> parseMapEntry(
+            String content,
+            CastExecutor<BinaryString, Object> keyCastExecutor,
+            CastExecutor<BinaryString, Object> valueCastExecutor) {
+
+        Map<Object, Object> mapContent = Maps.newHashMap();
+        for (String entry : splitMapEntries(content)) {
+            Matcher entryMatcher = ENTRY_PATTERN.matcher(entry);
+            if (!entryMatcher.matches()) {
+                throw new RuntimeException("Invalid map entry format: " + 
entry);
+            }
+            mapContent.put(
+                    parseValue(entryMatcher.group(1).trim(), keyCastExecutor),
+                    parseValue(entryMatcher.group(2).trim(), 
valueCastExecutor));
+        }
+        return mapContent;
+    }
+
+    private Object parseValue(String valueStr, CastExecutor<BinaryString, 
Object> castExecutor) {
+        return "null".equals(valueStr)
+                ? null
+                : castExecutor.cast(BinaryString.fromString(valueStr));
+    }
+
+    private List<String> splitMapEntries(String content) {
+        List<String> entries = new ArrayList<>();
+        StringBuilder current = new StringBuilder();
+        Stack<Character> bracketStack = new Stack<>();
+        boolean inQuotes = false;
+        boolean escaped = false;
+
+        for (char c : content.toCharArray()) {
+            if (escaped) {
+                escaped = false;
+            } else if (c == '\\') {
+                escaped = true;
+            } else if (c == '"') {
+                inQuotes = !inQuotes;
+            } else if (!inQuotes) {
+                if (StringUtils.isOpenBracket(c)) {
+                    bracketStack.push(c);
+                } else if (StringUtils.isCloseBracket(c) && 
!bracketStack.isEmpty()) {
+                    bracketStack.pop();
+                } else if (c == ',' && bracketStack.isEmpty()) {
+                    addCurrentEntry(entries, current);
+                    continue;
+                }
+            }
+            current.append(c);
+        }
+
+        addCurrentEntry(entries, current);
+        return entries;
+    }
+
+    private void addCurrentEntry(List<String> entries, StringBuilder current) {
+        if (current.length() > 0) {
+            entries.add(current.toString());
+            current.setLength(0);
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/casting/StringToRowCastRule.java
 
b/paimon-common/src/main/java/org/apache/paimon/casting/StringToRowCastRule.java
new file mode 100644
index 0000000000..5a2379d273
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/casting/StringToRowCastRule.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** {@link DataTypeFamily#CHARACTER_STRING} to {@link DataTypeRoot#ROW} cast 
rule. */
+class StringToRowCastRule extends AbstractCastRule<BinaryString, InternalRow> {
+
+    static final StringToRowCastRule INSTANCE = new StringToRowCastRule();
+
+    // Pattern for bracket format: {field1, field2, field3}
+    private static final Pattern BRACKET_ROW_PATTERN = 
Pattern.compile("^\\s*\\{(.*)\\}\\s*$");
+
+    // Pattern for SQL function format: STRUCT(field1, field2, field3)
+    private static final Pattern FUNCTION_ROW_PATTERN =
+            Pattern.compile("^\\s*STRUCT\\s*\\((.*)\\)\\s*$", 
Pattern.CASE_INSENSITIVE);
+
+    private StringToRowCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(DataTypeFamily.CHARACTER_STRING)
+                        .target(DataTypeRoot.ROW)
+                        .build());
+    }
+
+    @Override
+    public CastExecutor<BinaryString, InternalRow> create(DataType inputType, 
DataType targetType) {
+        RowType rowType = (RowType) targetType;
+        CastExecutor<BinaryString, Object>[] fieldCastExecutors = 
createFieldCastExecutors(rowType);
+        return value -> parseRow(value, fieldCastExecutors, 
rowType.getFieldCount());
+    }
+
+    private CastExecutor<BinaryString, Object>[] 
createFieldCastExecutors(RowType rowType) {
+        int fieldCount = rowType.getFieldCount();
+        @SuppressWarnings("unchecked")
+        CastExecutor<BinaryString, Object>[] fieldCastExecutors = new 
CastExecutor[fieldCount];
+
+        for (int i = 0; i < fieldCount; i++) {
+            DataType fieldType = rowType.getTypeAt(i);
+            @SuppressWarnings("unchecked")
+            CastExecutor<BinaryString, Object> executor =
+                    (CastExecutor<BinaryString, Object>)
+                            CastExecutors.resolve(VarCharType.STRING_TYPE, 
fieldType);
+            if (executor == null) {
+                throw new RuntimeException("Cannot cast string to row field 
type: " + fieldType);
+            }
+            fieldCastExecutors[i] = executor;
+        }
+        return fieldCastExecutors;
+    }
+
+    private InternalRow parseRow(
+            BinaryString value,
+            CastExecutor<BinaryString, Object>[] fieldCastExecutors,
+            int fieldCount) {
+        try {
+            String str = value.toString().trim();
+            if ("{}".equals(str) || "STRUCT()".equalsIgnoreCase(str)) {
+                return createNullRow(fieldCount);
+            }
+            String content = extractRowContent(str);
+            if (content.isEmpty()) {
+                return createNullRow(fieldCount);
+            }
+            List<String> fieldValues = splitRowFields(content);
+            if (fieldValues.size() != fieldCount) {
+                throw new RuntimeException(
+                        "Row field count mismatch. Expected: "
+                                + fieldCount
+                                + ", Actual: "
+                                + fieldValues.size());
+            }
+
+            return createRowFromFields(fieldValues, fieldCastExecutors, 
fieldCount);
+        } catch (Exception e) {
+            throw new RuntimeException("Cannot parse '" + value + "' as ROW: " 
+ e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Extract content from row string, supporting both bracket format {f1, 
f2} and function format
+     * STRUCT(f1, f2).
+     */
+    private String extractRowContent(String str) {
+        // Try bracket format first: {field1, field2, field3}
+        Matcher bracketMatcher = BRACKET_ROW_PATTERN.matcher(str);
+        if (bracketMatcher.matches()) {
+            return bracketMatcher.group(1).trim();
+        }
+        // Try SQL function format: STRUCT(field1, field2, field3)
+        Matcher functionMatcher = FUNCTION_ROW_PATTERN.matcher(str);
+        if (functionMatcher.matches()) {
+            return functionMatcher.group(1).trim();
+        }
+
+        throw new RuntimeException(
+                "Invalid row format: " + str + ". Expected format: {f1, f2} or 
STRUCT(f1, f2)");
+    }
+
+    private GenericRow createNullRow(int fieldCount) {
+        GenericRow row = new GenericRow(fieldCount);
+        for (int i = 0; i < fieldCount; i++) {
+            row.setField(i, null);
+        }
+        return row;
+    }
+
+    private GenericRow createRowFromFields(
+            List<String> fieldValues,
+            CastExecutor<BinaryString, Object>[] fieldCastExecutors,
+            int fieldCount) {
+        GenericRow row = new GenericRow(fieldCount);
+        for (int i = 0; i < fieldCount; i++) {
+            String fieldValue = fieldValues.get(i).trim();
+            Object value = parseFieldValue(fieldValue, fieldCastExecutors[i]);
+            row.setField(i, value);
+        }
+        return row;
+    }
+
+    private Object parseFieldValue(
+            String fieldValue, CastExecutor<BinaryString, Object> 
castExecutor) {
+        return "null".equals(fieldValue)
+                ? null
+                : castExecutor.cast(BinaryString.fromString(fieldValue));
+    }
+
+    private List<String> splitRowFields(String content) {
+        List<String> fields = new ArrayList<>();
+        StringBuilder current = new StringBuilder();
+        Stack<Character> bracketStack = new Stack<>();
+        boolean inQuotes = false;
+        boolean escaped = false;
+
+        for (char c : content.toCharArray()) {
+            if (escaped) {
+                escaped = false;
+            } else if (c == '\\') {
+                escaped = true;
+            } else if (c == '"') {
+                inQuotes = !inQuotes;
+            } else if (!inQuotes) {
+                if (StringUtils.isOpenBracket(c)) {
+                    bracketStack.push(c);
+                } else if (StringUtils.isCloseBracket(c) && 
!bracketStack.isEmpty()) {
+                    bracketStack.pop();
+                } else if (c == ',' && bracketStack.isEmpty()) {
+                    addCurrentField(fields, current);
+                    continue;
+                }
+            }
+            current.append(c);
+        }
+
+        addCurrentField(fields, current);
+        return fields;
+    }
+
+    private void addCurrentField(List<String> fields, StringBuilder current) {
+        if (current.length() > 0) {
+            fields.add(current.toString());
+            current.setLength(0);
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/casting/CastExecutorTest.java 
b/paimon-common/src/test/java/org/apache/paimon/casting/CastExecutorTest.java
index cb3e14c536..457895f839 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/casting/CastExecutorTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/casting/CastExecutorTest.java
@@ -22,6 +22,9 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
@@ -54,6 +57,7 @@ import java.util.TimeZone;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Test for {@link CastExecutor}. */
 public class CastExecutorTest {
@@ -569,6 +573,151 @@ public class CastExecutorTest {
                 "12345678".getBytes());
     }
 
+    @Test
+    public void testStringToArray() {
+        CastExecutor<BinaryString, InternalArray> stringToIntArray =
+                (CastExecutor<BinaryString, InternalArray>)
+                        CastExecutors.resolve(
+                                VarCharType.STRING_TYPE, 
DataTypes.ARRAY(DataTypes.INT()));
+
+        InternalArray result = 
stringToIntArray.cast(BinaryString.fromString("[1, 2, 3]"));
+        assertThat(result.size()).isEqualTo(3);
+        assertThat(result.getInt(0)).isEqualTo(1);
+        assertThat(result.getInt(1)).isEqualTo(2);
+        assertThat(result.getInt(2)).isEqualTo(3);
+
+        // Test empty array
+        result = stringToIntArray.cast(BinaryString.fromString("[]"));
+        assertThat(result.size()).isEqualTo(0);
+
+        // Test string to string array
+        CastExecutor<BinaryString, InternalArray> stringToStringArray =
+                (CastExecutor<BinaryString, InternalArray>)
+                        CastExecutors.resolve(
+                                VarCharType.STRING_TYPE, 
DataTypes.ARRAY(DataTypes.STRING()));
+
+        result = stringToStringArray.cast(BinaryString.fromString("[hello, 
world, test]"));
+        assertThat(result.size()).isEqualTo(3);
+        assertThat(result.getString(0).toString()).isEqualTo("hello");
+        assertThat(result.getString(1).toString()).isEqualTo("world");
+        assertThat(result.getString(2).toString()).isEqualTo("test");
+
+        // Test array with null values
+        result = stringToIntArray.cast(BinaryString.fromString("[1, null, 
3]"));
+        assertThat(result.size()).isEqualTo(3);
+        assertThat(result.getInt(0)).isEqualTo(1);
+        assertThat(result.isNullAt(1)).isTrue();
+        assertThat(result.getInt(2)).isEqualTo(3);
+    }
+
+    @Test
+    public void testStringToMap() {
+        // Test string to map<string, int>
+        CastExecutor<BinaryString, InternalMap> stringToMap =
+                (CastExecutor<BinaryString, InternalMap>)
+                        CastExecutors.resolve(
+                                VarCharType.STRING_TYPE,
+                                DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()));
+
+        InternalMap result = stringToMap.cast(BinaryString.fromString("{key1 
-> 1, key2 -> 2}"));
+        assertThat(result.size()).isEqualTo(2);
+
+        InternalArray keyArray = result.keyArray();
+        InternalArray valueArray = result.valueArray();
+        assertThat(keyArray.getString(0).toString()).isEqualTo("key2");
+        assertThat(valueArray.getInt(0)).isEqualTo(2);
+        assertThat(keyArray.getString(1).toString()).isEqualTo("key1");
+        assertThat(valueArray.getInt(1)).isEqualTo(1);
+
+        // Test empty map
+        result = stringToMap.cast(BinaryString.fromString("{}"));
+        assertThat(result.size()).isEqualTo(0);
+
+        // Test map with null values
+        result = stringToMap.cast(BinaryString.fromString("{key1 -> null, key2 
-> 42}"));
+        assertThat(result.size()).isEqualTo(2);
+        keyArray = result.keyArray();
+        valueArray = result.valueArray();
+        assertThat(keyArray.getString(0).toString()).isEqualTo("key2");
+        assertThat(valueArray.getInt(0)).isEqualTo(42);
+        assertThat(keyArray.getString(1).toString()).isEqualTo("key1");
+        assertThat(valueArray.isNullAt(1)).isTrue();
+    }
+
+    @Test
+    public void testStringToRow() {
+        // Test string to row
+        RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING(), 
DataTypes.BOOLEAN());
+        CastExecutor<BinaryString, InternalRow> stringToRow =
+                (CastExecutor<BinaryString, InternalRow>)
+                        CastExecutors.resolve(VarCharType.STRING_TYPE, 
rowType);
+
+        InternalRow result = stringToRow.cast(BinaryString.fromString("{42, 
hello, true}"));
+        assertThat(result.getFieldCount()).isEqualTo(3);
+        assertThat(result.getInt(0)).isEqualTo(42);
+        assertThat(result.getString(1).toString()).isEqualTo("hello");
+        assertThat(result.getBoolean(2)).isTrue();
+
+        // Test empty row
+        result = stringToRow.cast(BinaryString.fromString("{}"));
+        assertThat(result.getFieldCount()).isEqualTo(3);
+        assertThat(result.isNullAt(0)).isTrue();
+        assertThat(result.isNullAt(1)).isTrue();
+        assertThat(result.isNullAt(2)).isTrue();
+
+        // Test row with null values
+        result = stringToRow.cast(BinaryString.fromString("{null, test, 
false}"));
+        assertThat(result.getFieldCount()).isEqualTo(3);
+        assertThat(result.isNullAt(0)).isTrue();
+        assertThat(result.getString(1).toString()).isEqualTo("test");
+        assertThat(result.getBoolean(2)).isFalse();
+    }
+
+    @Test
+    public void testStringToComplexTypesErrorHandling() {
+        // Test invalid array format
+        CastExecutor<BinaryString, InternalArray> stringToIntArray =
+                (CastExecutor<BinaryString, InternalArray>)
+                        CastExecutors.resolve(
+                                VarCharType.STRING_TYPE, 
DataTypes.ARRAY(DataTypes.INT()));
+
+        try {
+            stringToIntArray.cast(BinaryString.fromString("[1, 2, 3")); // 
missing closing bracket
+            fail("Expected RuntimeException for invalid array format");
+        } catch (RuntimeException e) {
+            assertThat(e.getMessage()).contains("Cannot parse");
+            assertThat(e.getMessage()).contains("as ARRAY");
+        }
+
+        // Test invalid map format
+        CastExecutor<BinaryString, InternalMap> stringToMap =
+                (CastExecutor<BinaryString, InternalMap>)
+                        CastExecutors.resolve(
+                                VarCharType.STRING_TYPE,
+                                DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()));
+
+        try {
+            stringToMap.cast(BinaryString.fromString("{key1 -> 1, key2")); // 
incomplete entry
+            fail("Expected RuntimeException for invalid map format");
+        } catch (RuntimeException e) {
+            assertThat(e.getMessage()).contains("Cannot parse");
+            assertThat(e.getMessage()).contains("as MAP");
+        }
+
+        // Test invalid row format
+        RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
+        CastExecutor<BinaryString, InternalRow> stringToRow =
+                (CastExecutor<BinaryString, InternalRow>)
+                        CastExecutors.resolve(VarCharType.STRING_TYPE, 
rowType);
+
+        try {
+            stringToRow.cast(BinaryString.fromString("{42, hello, extra}")); 
// too many fields
+            fail("Expected RuntimeException for field count mismatch");
+        } catch (RuntimeException e) {
+            assertThat(e.getMessage()).contains("field count mismatch");
+        }
+    }
+
     @Test
     public void testBinaryToString() {
         // binary(5) to string(10)
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/DefaultValueRowTest.java 
b/paimon-common/src/test/java/org/apache/paimon/data/DefaultValueRowTest.java
new file mode 100644
index 0000000000..9dcb1061fc
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/DefaultValueRowTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.casting.DefaultValueRow;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link DefaultValueRow} with complex data types support. */
+public class DefaultValueRowTest {
+
+    @Test
+    public void testDefaultValueRowWithArrayType() {
+        // Test with Array default value
+        RowType rowType =
+                RowType.of(
+                        new DataField(0, "id", DataTypes.INT()),
+                        new DataField(
+                                1,
+                                "tags",
+                                DataTypes.ARRAY(DataTypes.STRING()),
+                                "Default tags",
+                                "[tag1, tag2, tag3]"),
+                        new DataField(
+                                2,
+                                "numbers",
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                "Default numbers",
+                                "[1, 2, 3]"));
+
+        DefaultValueRow defaultValueRow = DefaultValueRow.create(rowType);
+
+        GenericRow originalRow = new GenericRow(3);
+        originalRow.setField(0, 100);
+        originalRow.setField(1, null); // Use default tags
+        originalRow.setField(2, null); // Use default numbers
+
+        DefaultValueRow wrappedRow = defaultValueRow.replaceRow(originalRow);
+
+        assertThat(wrappedRow.getInt(0)).isEqualTo(100);
+
+        // Check default array values
+        InternalArray tagsValue = wrappedRow.getArray(1);
+        assertThat(tagsValue).isNotNull();
+
+        InternalArray numbersValue = wrappedRow.getArray(2);
+        assertThat(numbersValue).isNotNull();
+    }
+
+    @Test
+    public void testDefaultValueRowWithMapType() {
+        // Test with Map default value
+        RowType rowType =
+                RowType.of(
+                        new DataField(0, "id", DataTypes.INT()),
+                        new DataField(
+                                1,
+                                "properties",
+                                DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()),
+                                "Default properties",
+                                "{key1 -> value1, key2 -> value2}"));
+
+        DefaultValueRow defaultValueRow = DefaultValueRow.create(rowType);
+
+        GenericRow originalRow = new GenericRow(2);
+        originalRow.setField(0, 200);
+        originalRow.setField(1, null); // Use default properties
+
+        DefaultValueRow wrappedRow = defaultValueRow.replaceRow(originalRow);
+
+        assertThat(wrappedRow.getInt(0)).isEqualTo(200);
+
+        InternalMap propertiesValue = wrappedRow.getMap(1);
+        assertThat(propertiesValue).isNotNull();
+    }
+
+    @Test
+    public void testDefaultValueRowWithRowType() {
+        // Test with Row default value
+        RowType innerRowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
+        RowType rowType =
+                RowType.of(
+                        new DataField(0, "id", DataTypes.INT()),
+                        new DataField(
+                                1,
+                                "nested",
+                                innerRowType,
+                                "Default nested",
+                                "{42, default_value}"));
+
+        DefaultValueRow defaultValueRow = DefaultValueRow.create(rowType);
+
+        GenericRow originalRow = new GenericRow(2);
+        originalRow.setField(0, 300);
+        originalRow.setField(1, null); // Use default nested row
+
+        DefaultValueRow wrappedRow = defaultValueRow.replaceRow(originalRow);
+
+        assertThat(wrappedRow.getInt(0)).isEqualTo(300);
+
+        InternalRow nestedValue = wrappedRow.getRow(1, 2);
+        assertThat(nestedValue).isNotNull();
+    }
+
+    @Test
+    public void testDefaultValueRowErrorHandlingForComplexTypes() {
+        // Test error handling for invalid array default
+        RowType rowType =
+                RowType.of(
+                        new DataField(0, "id", DataTypes.INT()),
+                        new DataField(
+                                1,
+                                "invalid_array",
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                "Invalid array",
+                                "[1, 2, invalid]"));
+
+        assertThatThrownBy(() -> DefaultValueRow.create(rowType))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("Cannot parse");
+
+        // Test error handling for invalid map default
+        RowType mapRowType =
+                RowType.of(
+                        new DataField(0, "id", DataTypes.INT()),
+                        new DataField(
+                                1,
+                                "invalid_map",
+                                DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()),
+                                "Invalid map",
+                                "{key1 -> invalid_value}"));
+
+        assertThatThrownBy(() -> DefaultValueRow.create(mapRowType))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("Cannot parse");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index dd7750b2f0..8430ff3e9e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -52,6 +52,108 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                 .containsExactlyInAnyOrder("+I[1, 5]", "+I[2, 5]");
     }
 
+    @Test
+    public void testArrayDefaultValue() throws Exception {
+        sql("CREATE TABLE T_ARRAY (id INT, tags ARRAY<STRING>, numbers 
ARRAY<INT>)");
+        sql("CALL sys.alter_column_default_value('default.T_ARRAY', 'tags', 
'[tag1, tag2]')");
+        sql("CALL sys.alter_column_default_value('default.T_ARRAY', 'numbers', 
'[1, 2, 3]')");
+
+        sql("INSERT INTO T_ARRAY (id) VALUES (1), (2)");
+        assertThat(collectResult("SELECT * FROM T_ARRAY"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, [tag1, tag2], [1, 2, 3]]", "+I[2, [tag1, tag2], 
[1, 2, 3]]");
+
+        sql("CREATE TABLE T_EMPTY_ARRAY (id INT, empty_tags ARRAY<STRING>)");
+        sql("CALL sys.alter_column_default_value('default.T_EMPTY_ARRAY', 
'empty_tags', '[]')");
+        sql("INSERT INTO T_EMPTY_ARRAY (id) VALUES (1)");
+        assertThat(collectResult("SELECT * FROM T_EMPTY_ARRAY"))
+                .containsExactlyInAnyOrder("+I[1, []]");
+    }
+
+    @Test
+    public void testMapDefaultValue() throws Exception {
+        sql("CREATE TABLE T_MAP (id INT, properties MAP<STRING, STRING>)");
+        sql(
+                "CALL sys.alter_column_default_value('default.T_MAP', 
'properties', '{key1 -> value1, key2 -> value2}')");
+
+        sql("INSERT INTO T_MAP (id) VALUES (1), (2)");
+        assertThat(collectResult("SELECT * FROM T_MAP"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, {key1=value1, key2=value2}]", "+I[2, 
{key1=value1, key2=value2}]");
+
+        sql("CREATE TABLE T_EMPTY_MAP (id INT, empty_map MAP<STRING, INT>)");
+        sql("CALL sys.alter_column_default_value('default.T_EMPTY_MAP', 
'empty_map', '{}')");
+        sql("INSERT INTO T_EMPTY_MAP (id) VALUES (1)");
+        assertThat(collectResult("SELECT * FROM T_EMPTY_MAP"))
+                .containsExactlyInAnyOrder("+I[1, {}]");
+    }
+
+    @Test
+    public void testRowDefaultValue() throws Exception {
+        sql("CREATE TABLE T_STRUCT (id INT, nested ROW<x INT, y STRING>)");
+        sql(
+                "CALL sys.alter_column_default_value('default.T_STRUCT', 
'nested', '{42, default_value}')");
+
+        sql("INSERT INTO T_STRUCT (id) VALUES (1), (2)");
+        assertThat(collectResult("SELECT * FROM T_STRUCT"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, +I[42, default_value]]", "+I[2, +I[42, 
default_value]]");
+
+        sql(
+                "CREATE TABLE T_COMPLEX_STRUCT (id INT, config ROW<enabled 
BOOLEAN, timeout INT, name STRING>)");
+        sql(
+                "CALL 
sys.alter_column_default_value('default.T_COMPLEX_STRUCT', 'config', '{true, 
30, config_name}')");
+        sql("INSERT INTO T_COMPLEX_STRUCT (id) VALUES (1)");
+        assertThat(collectResult("SELECT * FROM T_COMPLEX_STRUCT"))
+                .containsExactlyInAnyOrder("+I[1, +I[true, 30, config_name]]");
+    }
+
+    @Test
+    public void testMixedComplexDefaultValue() throws Exception {
+        sql(
+                "CREATE TABLE T_MIXED (id INT, name STRING, tags 
ARRAY<STRING>, metadata MAP<STRING, STRING>, config ROW<enabled BOOLEAN, 
timeout INT>)");
+        sql("CALL sys.alter_column_default_value('default.T_MIXED', 'name', 
'default_name')");
+        sql("CALL sys.alter_column_default_value('default.T_MIXED', 'tags', 
'[default_tag]')");
+        sql(
+                "CALL sys.alter_column_default_value('default.T_MIXED', 
'metadata', '{created_by -> system}')");
+        sql("CALL sys.alter_column_default_value('default.T_MIXED', 'config', 
'{true, 30}')");
+
+        sql("INSERT INTO T_MIXED (id) VALUES (1)");
+        assertThat(collectResult("SELECT * FROM T_MIXED"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, default_name, [default_tag], 
{created_by=system}, +I[true, 30]]");
+
+        sql("INSERT INTO T_MIXED (id, name) VALUES (2, 'custom_name')");
+        assertThat(collectResult("SELECT * FROM T_MIXED WHERE id = 2"))
+                .containsExactlyInAnyOrder(
+                        "+I[2, custom_name, [default_tag], 
{created_by=system}, +I[true, 30]]");
+    }
+
+    @Test
+    public void testNestedComplexDefaultValue() throws Exception {
+        sql(
+                "CREATE TABLE T_NESTED ("
+                        + "id INT, "
+                        + "nested_array ARRAY<ROW<name STRING, val INT>>, "
+                        + "nested_struct ROW<info ROW<x INT, y STRING>, 
enabled BOOLEAN>"
+                        + ")");
+
+        sql(
+                "CALL sys.alter_column_default_value('default.T_NESTED', 
'nested_array', '[{item1, 10}, {item2, 20}]')");
+        sql(
+                "CALL sys.alter_column_default_value('default.T_NESTED', 
'nested_struct', '{{42, nested_value}, true}')");
+
+        sql("INSERT INTO T_NESTED (id) VALUES (1)");
+        assertThat(collectResult("SELECT * FROM T_NESTED"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, [+I[item1, 10], +I[item2, 20]], +I[+I[42, 
nested_value], true]]");
+
+        sql("INSERT INTO T_NESTED (id, nested_array) VALUES (2, 
ARRAY[ROW('custom', 99)])");
+        assertThat(collectResult("SELECT * FROM T_NESTED WHERE id = 2"))
+                .containsExactlyInAnyOrder(
+                        "+I[2, [+I[custom, 99]], +I[+I[42, nested_value], 
true]]");
+    }
+
     @Test
     public void testDefaultValueForPkTableDynamicBucket() throws Exception {
         sql("CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT)");
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 237e1ac135..51c2e63b5f 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -128,6 +128,216 @@ public class SparkWriteITCase {
                         "[[1,2,my_value], [2,2,my_value], [3,2,my_value], 
[4,2,my_value], [5,3,my_value]]");
     }
 
+    @Test
+    public void testWriteWithArrayDefaultValue() {
+        // Test Array type default value - using Spark SQL function syntax
+        spark.sql(
+                "CREATE TABLE T (id INT, tags ARRAY<STRING> DEFAULT 
ARRAY('tag1', 'tag2'), numbers ARRAY<INT> DEFAULT ARRAY(1, 2, 3)) TBLPROPERTIES"
+                        + " ('file.format'='avro')");
+
+        // test show create table for array
+        List<Row> show = spark.sql("SHOW CREATE TABLE T").collectAsList();
+        assertThat(show.toString())
+                .contains("tags ARRAY<STRING> DEFAULT ARRAY('tag1', 'tag2')")
+                .contains("numbers ARRAY<INT> DEFAULT ARRAY(1, 2, 3)");
+
+        // test partial write with array defaults
+        spark.sql("INSERT INTO T (id) VALUES (1), (2)").collectAsList();
+        List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+        // Support both Spark 3.x (WrappedArray) and Spark 4.x (ArraySeq) 
formats
+        assertThat(rows.toString())
+                .satisfiesAnyOf(
+                        s -> assertThat(s).contains("WrappedArray('tag1', 
'tag2')"),
+                        s -> assertThat(s).contains("ArraySeq('tag1', 
'tag2')"));
+        assertThat(rows.toString())
+                .satisfiesAnyOf(
+                        s -> assertThat(s).contains("WrappedArray(1, 2, 3)"),
+                        s -> assertThat(s).contains("ArraySeq(1, 2, 3)"));
+        assertThat(rows.size()).isEqualTo(2);
+
+        // test write with DEFAULT keyword for arrays
+        spark.sql("INSERT INTO T VALUES (3, DEFAULT, 
DEFAULT)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString())
+                .satisfiesAnyOf(
+                        s -> assertThat(s).contains("WrappedArray('tag1', 
'tag2')"),
+                        s -> assertThat(s).contains("ArraySeq('tag1', 
'tag2')"));
+        assertThat(rows.size()).isEqualTo(3);
+
+        // test empty array default value
+        spark.sql("DROP TABLE IF EXISTS T");
+        spark.sql(
+                "CREATE TABLE T (id INT, empty_array ARRAY<STRING> DEFAULT 
ARRAY()) TBLPROPERTIES"
+                        + " ('file.format'='avro')");
+
+        spark.sql("INSERT INTO T (id) VALUES (1)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString())
+                .satisfiesAnyOf(
+                        s -> assertThat(s).contains("WrappedArray()"),
+                        s -> assertThat(s).contains("ArraySeq()"));
+
+        // test write with DEFAULT keyword for empty array
+        spark.sql("INSERT INTO T VALUES (2, DEFAULT)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString())
+                .satisfiesAnyOf(
+                        s -> assertThat(s).contains("WrappedArray()"),
+                        s -> assertThat(s).contains("ArraySeq()"));
+        assertThat(rows.size()).isEqualTo(2);
+    }
+
+    @Test
+    public void testWriteWithMapDefaultValue() {
+        // Test Map type default value - using Spark SQL function syntax
+        spark.sql(
+                "CREATE TABLE T (id INT, properties MAP<STRING, STRING> 
DEFAULT MAP('key1', 'value1', 'key2', 'value2')) TBLPROPERTIES"
+                        + " ('file.format'='avro')");
+
+        // test show create table for map
+        List<Row> show = spark.sql("SHOW CREATE TABLE T").collectAsList();
+        assertThat(show.toString())
+                .contains(
+                        "properties MAP<STRING, STRING> DEFAULT MAP('key1', 
'value1', 'key2', 'value2')");
+
+        // test partial write with map defaults
+        spark.sql("INSERT INTO T (id) VALUES (1), (2)").collectAsList();
+        List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).contains("'key1' -> 'value1'");
+        assertThat(rows.toString()).contains("'key2' -> 'value2'");
+        assertThat(rows.size()).isEqualTo(2);
+
+        // test write with DEFAULT keyword for map
+        spark.sql("INSERT INTO T VALUES (3, DEFAULT)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).contains("'key1' -> 'value1'");
+        assertThat(rows.toString()).contains("'key2' -> 'value2'");
+        assertThat(rows.size()).isEqualTo(3);
+
+        // test empty map default value
+        spark.sql("DROP TABLE IF EXISTS T");
+        spark.sql(
+                "CREATE TABLE T (id INT, empty_map MAP<STRING, INT> DEFAULT 
MAP()) TBLPROPERTIES"
+                        + " ('file.format'='avro')");
+
+        spark.sql("INSERT INTO T (id) VALUES (1)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).contains("Map()");
+
+        // test write with DEFAULT keyword for empty map
+        spark.sql("INSERT INTO T VALUES (2, DEFAULT)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).contains("Map()");
+        assertThat(rows.size()).isEqualTo(2);
+    }
+
+    @Test
+    public void testWriteWithStructDefaultValue() {
+        // Test Struct/Row type default value - using Spark SQL function syntax
+        spark.sql(
+                "CREATE TABLE T (id INT, nested STRUCT<x: INT, y: STRING> 
DEFAULT STRUCT(42, 'default_value')) TBLPROPERTIES"
+                        + " ('file.format'='avro')");
+
+        // test show create table for struct
+        List<Row> show = spark.sql("SHOW CREATE TABLE T").collectAsList();
+        assertThat(show.toString())
+                .contains("nested STRUCT<x: INT, y: STRING> DEFAULT STRUCT(42, 
'default_value')");
+
+        // test partial write with struct defaults
+        spark.sql("INSERT INTO T (id) VALUES (1), (2)").collectAsList();
+        List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).contains("[42,'default_value']");
+        assertThat(rows.size()).isEqualTo(2);
+
+        // test write with DEFAULT keyword for struct
+        spark.sql("INSERT INTO T VALUES (3, DEFAULT)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).contains("[42,'default_value']");
+        assertThat(rows.size()).isEqualTo(3);
+
+        // test complex struct with multiple types
+        spark.sql("DROP TABLE IF EXISTS T");
+        spark.sql(
+                "CREATE TABLE T (id INT, config STRUCT<enabled: BOOLEAN, 
timeout: INT, name: STRING> DEFAULT STRUCT(true, 30, 'config_name')) 
TBLPROPERTIES"
+                        + " ('file.format'='avro')");
+
+        spark.sql("INSERT INTO T (id) VALUES (1)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).contains("[true,30,'config_name']");
+        assertThat(rows.size()).isEqualTo(1);
+    }
+
+    @Test
+    public void testWriteWithNestedComplexDefaultValue() {
+        // Test nested complex types with default values - using Spark SQL 
function syntax
+        spark.sql(
+                "CREATE TABLE T (id INT, "
+                        + "nested_array ARRAY<STRUCT<name: STRING, value: 
INT>> DEFAULT ARRAY(STRUCT('item1', 10), STRUCT('item2', 20)), "
+                        + "map_of_arrays MAP<STRING, ARRAY<INT>> DEFAULT 
MAP('list1', ARRAY(1, 2), 'list2', ARRAY(3, 4))) TBLPROPERTIES"
+                        + " ('file.format'='avro')");
+
+        // test show create table for nested complex types
+        List<Row> show = spark.sql("SHOW CREATE TABLE T").collectAsList();
+        assertThat(show.toString())
+                .contains(
+                        "nested_array ARRAY<STRUCT<name: STRING, value: INT>> 
DEFAULT ARRAY(STRUCT('item1', 10), STRUCT('item2', 20))")
+                .contains(
+                        "map_of_arrays MAP<STRING, ARRAY<INT>> DEFAULT 
MAP('list1', ARRAY(1, 2), 'list2', ARRAY(3, 4))");
+
+        // test partial write with nested complex type defaults
+        spark.sql("INSERT INTO T (id) VALUES (1)").collectAsList();
+        List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.size()).isEqualTo(1);
+        assertThat(rows.get(0).getInt(0)).isEqualTo(1);
+
+        // test write with DEFAULT keyword for nested complex types
+        spark.sql("INSERT INTO T VALUES (2, DEFAULT, 
DEFAULT)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.size()).isEqualTo(2);
+
+        // test mixed simple and complex types with defaults
+        spark.sql("DROP TABLE IF EXISTS T");
+        spark.sql(
+                "CREATE TABLE T (id INT, "
+                        + "name STRING DEFAULT 'default_name', "
+                        + "tags ARRAY<STRING> DEFAULT ARRAY('default_tag'), "
+                        + "metadata MAP<STRING, STRING> DEFAULT 
MAP('created_by', 'system'), "
+                        + "config STRUCT<enabled: BOOLEAN, timeout: INT> 
DEFAULT STRUCT(true, 30)) TBLPROPERTIES"
+                        + " ('file.format'='avro')");
+
+        // test partial write with mixed defaults
+        spark.sql("INSERT INTO T (id) VALUES (1)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).contains("default_name");
+        assertThat(rows.toString())
+                .satisfiesAnyOf(
+                        s -> 
assertThat(s).contains("WrappedArray('default_tag')"),
+                        s -> 
assertThat(s).contains("ArraySeq('default_tag')"));
+        assertThat(rows.toString()).contains("Map('created_by' -> 'system')");
+        assertThat(rows.toString()).contains("[true,30]");
+
+        // test selective column insertion with mixed defaults
+        spark.sql("INSERT INTO T (id, name) VALUES (2, 
'custom_name')").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).contains("custom_name");
+        assertThat(rows.toString())
+                .satisfiesAnyOf(
+                        s -> 
assertThat(s).contains("WrappedArray('default_tag')"),
+                        s -> 
assertThat(s).contains("ArraySeq('default_tag')"));
+        assertThat(rows.size()).isEqualTo(2);
+
+        // test write with some DEFAULT keywords for mixed types
+        spark.sql("INSERT INTO T VALUES (3, DEFAULT, ARRAY('custom_tag'), 
DEFAULT, DEFAULT)")
+                .collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).contains("default_name");
+        assertThat(rows.toString())
+                .satisfiesAnyOf(
+                        s -> 
assertThat(s).contains("WrappedArray(custom_tag)"),
+                        s -> assertThat(s).contains("ArraySeq(custom_tag)"));
+        assertThat(rows.size()).isEqualTo(3);
+    }
+
     @ParameterizedTest
     @ValueSource(strings = {"order", "zorder", "hilbert"})
     public void testWriteWithClustering(String clusterStrategy) {

Reply via email to