This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d1cbf68bfb [core] Support default values for complex data types (#6046)
d1cbf68bfb is described below
commit d1cbf68bfb91e490d2a5b58875f9780824f0ea64
Author: xiaochen <[email protected]>
AuthorDate: Sat Aug 16 17:24:29 2025 +0800
[core] Support default values for complex data types (#6046)
---
docs/content/flink/default-value.md | 14 +-
docs/content/spark/default-value.md | 34 +++-
.../java/org/apache/paimon/utils/StringUtils.java | 8 +
.../org/apache/paimon/casting/CastExecutors.java | 4 +
.../paimon/casting/StringToArrayCastRule.java | 165 ++++++++++++++++
.../apache/paimon/casting/StringToMapCastRule.java | 207 ++++++++++++++++++++
.../apache/paimon/casting/StringToRowCastRule.java | 196 +++++++++++++++++++
.../apache/paimon/casting/CastExecutorTest.java | 149 +++++++++++++++
.../apache/paimon/data/DefaultValueRowTest.java | 159 ++++++++++++++++
.../org/apache/paimon/flink/BranchSqlITCase.java | 102 ++++++++++
.../org/apache/paimon/spark/SparkWriteITCase.java | 210 +++++++++++++++++++++
11 files changed, 1244 insertions(+), 4 deletions(-)
diff --git a/docs/content/flink/default-value.md
b/docs/content/flink/default-value.md
index 9030d6f14b..3da7ef5e7e 100644
--- a/docs/content/flink/default-value.md
+++ b/docs/content/flink/default-value.md
@@ -37,7 +37,10 @@ Flink SQL does not have native support for default values,
so we can only create
CREATE TABLE my_table (
a BIGINT,
b STRING,
- c INT
+ c INT,
+ tags ARRAY<STRING>,
+ properties MAP<STRING, STRING>,
+ nested ROW<x INT, y STRING>
);
```
@@ -45,8 +48,14 @@ We support the procedure of modifying column default values
in Flink. You can ad
creating the table:
```sql
+-- Set simple type default values
CALL sys.alter_column_default_value('default.my_table', 'b', 'my_value');
CALL sys.alter_column_default_value('default.my_table', 'c', '5');
+
+-- Set complex type default values
+CALL sys.alter_column_default_value('default.my_table', 'tags', '[tag1, tag2,
tag3]');
+CALL sys.alter_column_default_value('default.my_table', 'properties', '{key1
-> value1, key2 -> value2}');
+CALL sys.alter_column_default_value('default.my_table', 'nested', '{42,
default_value}');
```
## Insert Table
@@ -60,5 +69,6 @@ For example:
INSERT INTO my_table (a) VALUES (1), (2);
SELECT * FROM my_table;
--- result: [[1, 5, my_value], [2, 5, my_value]]
+-- result: [[1, my_value, 5, [tag1, tag2, tag3], {key1=value1, key2=value2},
+I[42, default_value]],
+-- [2, my_value, 5, [tag1, tag2, tag3], {key1=value1, key2=value2},
+I[42, default_value]]]
```
diff --git a/docs/content/spark/default-value.md
b/docs/content/spark/default-value.md
index ba5423133e..a8b4f3c4e6 100644
--- a/docs/content/spark/default-value.md
+++ b/docs/content/spark/default-value.md
@@ -37,7 +37,10 @@ You can create a table with columns with default values
using the following SQL:
CREATE TABLE my_table (
a BIGINT,
b STRING DEFAULT 'my_value',
- c INT DEFAULT 5
+ c INT DEFAULT 5,
+ tags ARRAY<STRING> DEFAULT ARRAY('tag1', 'tag2', 'tag3'),
+ properties MAP<STRING, STRING> DEFAULT MAP('key1', 'value1', 'key2',
'value2'),
+ nested STRUCT<x: INT, y: STRING> DEFAULT STRUCT(42, 'default_value')
);
```
@@ -46,6 +49,16 @@ CREATE TABLE my_table (
For SQL commands that execute table writes, such as the `INSERT`, `UPDATE`,
and `MERGE` commands, the `DEFAULT` keyword
or `NULL` value is parsed into the default value specified for the
corresponding column.
+For example:
+
+```sql
+INSERT INTO my_table (a) VALUES (1), (2);
+
+SELECT * FROM my_table;
+-- result: [[1, my_value, 5, [tag1, tag2, tag3], {'key1' -> 'value1', 'key2'
-> 'value2'}, {42, default_value}],
+-- [2, my_value, 5, [tag1, tag2, tag3], {'key1' -> 'value1', 'key2'
-> 'value2'}, {42, default_value}]]
+```
+
## Alter Default Value
Paimon supports alter column default value.
@@ -64,7 +77,24 @@ INSERT INTO T (a) VALUES (2);
-- result: [[1, 2], [2, 3]]
```
-The default value of `'b'` column has been changed to 3 from 2.
+The default value of `'b'` column has been changed to 3 from 2. You can also
alter default values for complex types:
+
+```sql
+ALTER TABLE my_table ALTER COLUMN tags SET DEFAULT ARRAY('new_tag1',
'new_tag2');
+
+INSERT INTO my_table (a) VALUES (3);
+-- result: [[1, my_value, 5, [tag1, tag2, tag3], {'key1' -> 'value1', 'key2'
-> 'value2'}, {42, default_value}],
+-- [2, my_value, 5, [tag1, tag2, tag3], {'key1' -> 'value1', 'key2'
-> 'value2'}, {42, default_value}],
+-- [3, my_value, 5, [new_tag1, new_tag2], {'key1' -> 'value1', 'key2'
-> 'value2'}, {42, default_value}]]
+
+ALTER TABLE my_table ALTER COLUMN properties SET DEFAULT MAP('new_key',
'new_value');
+
+INSERT INTO my_table (a) VALUES (4);
+-- result: [[1, my_value, 5, [tag1, tag2, tag3], {'key1' -> 'value1', 'key2'
-> 'value2'}, {42, default_value}],
+-- [2, my_value, 5, [tag1, tag2, tag3], {'key1' -> 'value1', 'key2'
-> 'value2'}, {42, default_value}],
+-- [3, my_value, 5, [new_tag1, new_tag2], {'key1' -> 'value1', 'key2'
-> 'value2'}, {42, default_value}],
+-- [4, my_value, 5, [new_tag1, new_tag2], {'new_key' -> 'new_value'},
{42, default_value}]]
+```
## Limitation
diff --git a/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
b/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
index ca32b50b68..e20151f35e 100644
--- a/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -554,4 +554,12 @@ public class StringUtils {
}
return value.toLowerCase();
}
+
+ public static boolean isOpenBracket(char c) {
+ return c == '[' || c == '{' || c == '(';
+ }
+
+ public static boolean isCloseBracket(char c) {
+ return c == ']' || c == '}' || c == ')';
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java
b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java
index 5971f80c4d..6822c2c3ce 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java
@@ -70,6 +70,10 @@ public class CastExecutors {
.addRule(StringToTimeCastRule.INSTANCE)
.addRule(StringToTimestampCastRule.INSTANCE)
.addRule(StringToBinaryCastRule.INSTANCE)
+ .addRule(StringToArrayCastRule.INSTANCE)
+ .addRule(StringToMapCastRule.INSTANCE)
+ .addRule(StringToRowCastRule.INSTANCE)
+
// Date/Time/Timestamp rules
.addRule(TimestampToTimestampCastRule.INSTANCE)
.addRule(TimestampToDateCastRule.INSTANCE)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/casting/StringToArrayCastRule.java
b/paimon-common/src/main/java/org/apache/paimon/casting/StringToArrayCastRule.java
new file mode 100644
index 0000000000..2e48e0b825
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/casting/StringToArrayCastRule.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** {@link DataTypeFamily#CHARACTER_STRING} to {@link DataTypeRoot#ARRAY} cast
rule. */
+class StringToArrayCastRule extends AbstractCastRule<BinaryString,
InternalArray> {
+
+ static final StringToArrayCastRule INSTANCE = new StringToArrayCastRule();
+
+ // Pattern for bracket format: [element1, element2, element3]
+ private static final Pattern BRACKET_ARRAY_PATTERN =
Pattern.compile("^\\s*\\[(.*)\\]\\s*$");
+
+ // Pattern for SQL function format: ARRAY(element1, element2, element3)
+ private static final Pattern FUNCTION_ARRAY_PATTERN =
+ Pattern.compile("^\\s*ARRAY\\s*\\((.*)\\)\\s*$",
Pattern.CASE_INSENSITIVE);
+
+ private StringToArrayCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .input(DataTypeFamily.CHARACTER_STRING)
+ .target(DataTypeRoot.ARRAY)
+ .build());
+ }
+
+ @Override
+ public CastExecutor<BinaryString, InternalArray> create(
+ DataType inputType, DataType targetType) {
+ ArrayType arrayType = (ArrayType) targetType;
+ @SuppressWarnings("unchecked")
+ CastExecutor<BinaryString, Object> elementCastExecutor =
+ (CastExecutor<BinaryString, Object>)
+ CastExecutors.resolve(VarCharType.STRING_TYPE,
arrayType.getElementType());
+ if (elementCastExecutor == null) {
+ throw new RuntimeException(
+ "Cannot cast string to array element type: " +
arrayType.getElementType());
+ }
+ return value -> parseArray(value, elementCastExecutor);
+ }
+
+ private InternalArray parseArray(
+ BinaryString value, CastExecutor<BinaryString, Object>
elementCastExecutor) {
+ try {
+ String str = value.toString().trim();
+ if ("[]".equals(str) || "ARRAY()".equalsIgnoreCase(str)) {
+ return new GenericArray(new Object[0]);
+ }
+
+ String content = extractArrayContent(str);
+ if (content.isEmpty()) {
+ return new GenericArray(new Object[0]);
+ }
+
+ List<Object> elements = parseArrayElements(content,
elementCastExecutor);
+ return new GenericArray(elements.toArray());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Cannot parse '" + value + "' as ARRAY: " +
e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Extract content from array string, supporting both bracket format [a,
b, c] and SQL function
+ * format ARRAY(a, b, c).
+ */
+ private String extractArrayContent(String str) {
+ // Try bracket format first: [element1, element2, element3]
+ Matcher bracketMatcher = BRACKET_ARRAY_PATTERN.matcher(str);
+ if (bracketMatcher.matches()) {
+ return bracketMatcher.group(1).trim();
+ }
+
+ // Try SQL function format: ARRAY(element1, element2, element3)
+ Matcher functionMatcher = FUNCTION_ARRAY_PATTERN.matcher(str);
+ if (functionMatcher.matches()) {
+ return functionMatcher.group(1).trim();
+ }
+
+ throw new RuntimeException(
+ "Invalid array format: " + str + ". Expected format: [a, b, c]
or ARRAY(a, b, c)");
+ }
+
+ private List<Object> parseArrayElements(
+ String content, CastExecutor<BinaryString, Object>
elementCastExecutor) {
+ List<Object> elements = new ArrayList<>();
+ for (String token : splitArrayElements(content)) {
+ String trimmedToken = token.trim();
+ Object element =
+ "null".equals(trimmedToken)
+ ? null
+ :
elementCastExecutor.cast(BinaryString.fromString(trimmedToken));
+ elements.add(element);
+ }
+ return elements;
+ }
+
+ private List<String> splitArrayElements(String content) {
+ List<String> elements = new ArrayList<>();
+ StringBuilder current = new StringBuilder();
+ Stack<Character> bracketStack = new Stack<>();
+ boolean inQuotes = false;
+ boolean escaped = false;
+
+ for (char c : content.toCharArray()) {
+ if (escaped) {
+ escaped = false;
+ } else if (c == '\\') {
+ escaped = true;
+ } else if (c == '"') {
+ inQuotes = !inQuotes;
+ } else if (!inQuotes) {
+ if (StringUtils.isOpenBracket(c)) {
+ bracketStack.push(c);
+ } else if (StringUtils.isCloseBracket(c) &&
!bracketStack.isEmpty()) {
+ bracketStack.pop();
+ } else if (c == ',' && bracketStack.isEmpty()) {
+ addCurrentElement(elements, current);
+ continue;
+ }
+ }
+ current.append(c);
+ }
+
+ addCurrentElement(elements, current);
+ return elements;
+ }
+
+ private void addCurrentElement(List<String> elements, StringBuilder
current) {
+ if (current.length() > 0) {
+ elements.add(current.toString());
+ current.setLength(0);
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/casting/StringToMapCastRule.java
b/paimon-common/src/main/java/org/apache/paimon/casting/StringToMapCastRule.java
new file mode 100644
index 0000000000..cc366605ee
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/casting/StringToMapCastRule.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** {@link DataTypeFamily#CHARACTER_STRING} to {@link DataTypeRoot#MAP} cast
rule. */
+class StringToMapCastRule extends AbstractCastRule<BinaryString, InternalMap> {
+
+ static final StringToMapCastRule INSTANCE = new StringToMapCastRule();
+
+ // Pattern for bracket format: {key1 -> value1, key2 -> value2}
+ private static final Pattern BRACKET_MAP_PATTERN =
Pattern.compile("^\\s*\\{(.*)\\}\\s*$");
+
+ // Pattern for SQL function format: MAP('key1', 'value1', 'key2', 'value2')
+ private static final Pattern FUNCTION_MAP_PATTERN =
+ Pattern.compile("^\\s*MAP\\s*\\((.*)\\)\\s*$",
Pattern.CASE_INSENSITIVE);
+
+ private static final Pattern ENTRY_PATTERN =
Pattern.compile("(.+?)\\s*->\\s*(.+)");
+
+ private StringToMapCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .input(DataTypeFamily.CHARACTER_STRING)
+ .target(DataTypeRoot.MAP)
+ .build());
+ }
+
+ @Override
+ public CastExecutor<BinaryString, InternalMap> create(DataType inputType,
DataType targetType) {
+ MapType mapType = (MapType) targetType;
+ CastExecutor<BinaryString, Object> keyCastExecutor =
+ createCastExecutor(mapType.getKeyType());
+ CastExecutor<BinaryString, Object> valueCastExecutor =
+ createCastExecutor(mapType.getValueType());
+
+ return value -> parseMap(value, keyCastExecutor, valueCastExecutor);
+ }
+
+ private InternalMap parseMap(
+ BinaryString value,
+ CastExecutor<BinaryString, Object> keyCastExecutor,
+ CastExecutor<BinaryString, Object> valueCastExecutor) {
+ try {
+ String str = value.toString().trim();
+ if ("{}".equals(str) || "MAP()".equalsIgnoreCase(str)) {
+ return new GenericMap(new HashMap<>());
+ }
+ return new GenericMap(parseDefaultMap(str, keyCastExecutor,
valueCastExecutor));
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot parse '" + value + "' as MAP: "
+ e.getMessage(), e);
+ }
+ }
+
+ private CastExecutor<BinaryString, Object> createCastExecutor(DataType
targetType) {
+ @SuppressWarnings("unchecked")
+ CastExecutor<BinaryString, Object> executor =
+ (CastExecutor<BinaryString, Object>)
+ CastExecutors.resolve(VarCharType.STRING_TYPE,
targetType);
+ if (executor == null) {
+ throw new RuntimeException("Cannot cast string to type: " +
targetType);
+ }
+ return executor;
+ }
+
+ private Map<Object, Object> parseDefaultMap(
+ String str,
+ CastExecutor<BinaryString, Object> keyCastExecutor,
+ CastExecutor<BinaryString, Object> valueCastExecutor) {
+
+ Map<Object, Object> mapContent = Maps.newHashMap();
+ Matcher bracketMatcher = BRACKET_MAP_PATTERN.matcher(str);
+ if (bracketMatcher.matches()) {
+ // Parse bracket format (arrow-separated entries)
+ String content = bracketMatcher.group(1).trim();
+ return parseMapEntry(content, keyCastExecutor, valueCastExecutor);
+ }
+
+ Matcher functionMatcher = FUNCTION_MAP_PATTERN.matcher(str);
+ if (functionMatcher.matches()) {
+ String functionContent = functionMatcher.group(1).trim();
+ return parseFunctionDefaultMap(functionContent, keyCastExecutor,
valueCastExecutor);
+ }
+
+ throw new RuntimeException(
+ "Invalid map format: " + str + ". Expected format: {k -> v} or
MAP(k, v)");
+ }
+
+ private Map<Object, Object> parseFunctionDefaultMap(
+ String content,
+ CastExecutor<BinaryString, Object> keyCastExecutor,
+ CastExecutor<BinaryString, Object> valueCastExecutor) {
+
+ List<String> elements = splitMapEntries(content.trim());
+ if (elements.size() % 2 != 0) {
+ throw new RuntimeException("Invalid Function map format: odd
number of elements");
+ }
+
+ return IntStream.range(0, elements.size() / 2)
+ .boxed()
+ .collect(
+ Collectors.toMap(
+ i -> parseValue(elements.get(i * 2).trim(),
keyCastExecutor),
+ i ->
+ parseValue(
+ elements.get(i * 2 + 1).trim(),
+ valueCastExecutor)));
+ }
+
+ private Map<Object, Object> parseMapEntry(
+ String content,
+ CastExecutor<BinaryString, Object> keyCastExecutor,
+ CastExecutor<BinaryString, Object> valueCastExecutor) {
+
+ Map<Object, Object> mapContent = Maps.newHashMap();
+ for (String entry : splitMapEntries(content)) {
+ Matcher entryMatcher = ENTRY_PATTERN.matcher(entry);
+ if (!entryMatcher.matches()) {
+ throw new RuntimeException("Invalid map entry format: " +
entry);
+ }
+ mapContent.put(
+ parseValue(entryMatcher.group(1).trim(), keyCastExecutor),
+ parseValue(entryMatcher.group(2).trim(),
valueCastExecutor));
+ }
+ return mapContent;
+ }
+
+ private Object parseValue(String valueStr, CastExecutor<BinaryString,
Object> castExecutor) {
+ return "null".equals(valueStr)
+ ? null
+ : castExecutor.cast(BinaryString.fromString(valueStr));
+ }
+
+ private List<String> splitMapEntries(String content) {
+ List<String> entries = new ArrayList<>();
+ StringBuilder current = new StringBuilder();
+ Stack<Character> bracketStack = new Stack<>();
+ boolean inQuotes = false;
+ boolean escaped = false;
+
+ for (char c : content.toCharArray()) {
+ if (escaped) {
+ escaped = false;
+ } else if (c == '\\') {
+ escaped = true;
+ } else if (c == '"') {
+ inQuotes = !inQuotes;
+ } else if (!inQuotes) {
+ if (StringUtils.isOpenBracket(c)) {
+ bracketStack.push(c);
+ } else if (StringUtils.isCloseBracket(c) &&
!bracketStack.isEmpty()) {
+ bracketStack.pop();
+ } else if (c == ',' && bracketStack.isEmpty()) {
+ addCurrentEntry(entries, current);
+ continue;
+ }
+ }
+ current.append(c);
+ }
+
+ addCurrentEntry(entries, current);
+ return entries;
+ }
+
+ private void addCurrentEntry(List<String> entries, StringBuilder current) {
+ if (current.length() > 0) {
+ entries.add(current.toString());
+ current.setLength(0);
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/casting/StringToRowCastRule.java
b/paimon-common/src/main/java/org/apache/paimon/casting/StringToRowCastRule.java
new file mode 100644
index 0000000000..5a2379d273
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/casting/StringToRowCastRule.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** {@link DataTypeFamily#CHARACTER_STRING} to {@link DataTypeRoot#ROW} cast
rule. */
+class StringToRowCastRule extends AbstractCastRule<BinaryString, InternalRow> {
+
+ static final StringToRowCastRule INSTANCE = new StringToRowCastRule();
+
+ // Pattern for bracket format: {field1, field2, field3}
+ private static final Pattern BRACKET_ROW_PATTERN =
Pattern.compile("^\\s*\\{(.*)\\}\\s*$");
+
+ // Pattern for SQL function format: STRUCT(field1, field2, field3)
+ private static final Pattern FUNCTION_ROW_PATTERN =
+ Pattern.compile("^\\s*STRUCT\\s*\\((.*)\\)\\s*$",
Pattern.CASE_INSENSITIVE);
+
+ private StringToRowCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .input(DataTypeFamily.CHARACTER_STRING)
+ .target(DataTypeRoot.ROW)
+ .build());
+ }
+
+ @Override
+ public CastExecutor<BinaryString, InternalRow> create(DataType inputType,
DataType targetType) {
+ RowType rowType = (RowType) targetType;
+ CastExecutor<BinaryString, Object>[] fieldCastExecutors =
createFieldCastExecutors(rowType);
+ return value -> parseRow(value, fieldCastExecutors,
rowType.getFieldCount());
+ }
+
+ private CastExecutor<BinaryString, Object>[]
createFieldCastExecutors(RowType rowType) {
+ int fieldCount = rowType.getFieldCount();
+ @SuppressWarnings("unchecked")
+ CastExecutor<BinaryString, Object>[] fieldCastExecutors = new
CastExecutor[fieldCount];
+
+ for (int i = 0; i < fieldCount; i++) {
+ DataType fieldType = rowType.getTypeAt(i);
+ @SuppressWarnings("unchecked")
+ CastExecutor<BinaryString, Object> executor =
+ (CastExecutor<BinaryString, Object>)
+ CastExecutors.resolve(VarCharType.STRING_TYPE,
fieldType);
+ if (executor == null) {
+ throw new RuntimeException("Cannot cast string to row field
type: " + fieldType);
+ }
+ fieldCastExecutors[i] = executor;
+ }
+ return fieldCastExecutors;
+ }
+
+ private InternalRow parseRow(
+ BinaryString value,
+ CastExecutor<BinaryString, Object>[] fieldCastExecutors,
+ int fieldCount) {
+ try {
+ String str = value.toString().trim();
+ if ("{}".equals(str) || "STRUCT()".equalsIgnoreCase(str)) {
+ return createNullRow(fieldCount);
+ }
+ String content = extractRowContent(str);
+ if (content.isEmpty()) {
+ return createNullRow(fieldCount);
+ }
+ List<String> fieldValues = splitRowFields(content);
+ if (fieldValues.size() != fieldCount) {
+ throw new RuntimeException(
+ "Row field count mismatch. Expected: "
+ + fieldCount
+ + ", Actual: "
+ + fieldValues.size());
+ }
+
+ return createRowFromFields(fieldValues, fieldCastExecutors,
fieldCount);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot parse '" + value + "' as ROW: "
+ e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Extract content from row string, supporting both bracket format {f1,
f2} and function format
+ * STRUCT(f1, f2).
+ */
+ private String extractRowContent(String str) {
+ // Try bracket format first: {field1, field2, field3}
+ Matcher bracketMatcher = BRACKET_ROW_PATTERN.matcher(str);
+ if (bracketMatcher.matches()) {
+ return bracketMatcher.group(1).trim();
+ }
+ // Try SQL function format: STRUCT(field1, field2, field3)
+ Matcher functionMatcher = FUNCTION_ROW_PATTERN.matcher(str);
+ if (functionMatcher.matches()) {
+ return functionMatcher.group(1).trim();
+ }
+
+ throw new RuntimeException(
+ "Invalid row format: " + str + ". Expected format: {f1, f2} or
STRUCT(f1, f2)");
+ }
+
+ private GenericRow createNullRow(int fieldCount) {
+ GenericRow row = new GenericRow(fieldCount);
+ for (int i = 0; i < fieldCount; i++) {
+ row.setField(i, null);
+ }
+ return row;
+ }
+
+ private GenericRow createRowFromFields(
+ List<String> fieldValues,
+ CastExecutor<BinaryString, Object>[] fieldCastExecutors,
+ int fieldCount) {
+ GenericRow row = new GenericRow(fieldCount);
+ for (int i = 0; i < fieldCount; i++) {
+ String fieldValue = fieldValues.get(i).trim();
+ Object value = parseFieldValue(fieldValue, fieldCastExecutors[i]);
+ row.setField(i, value);
+ }
+ return row;
+ }
+
+ private Object parseFieldValue(
+ String fieldValue, CastExecutor<BinaryString, Object>
castExecutor) {
+ return "null".equals(fieldValue)
+ ? null
+ : castExecutor.cast(BinaryString.fromString(fieldValue));
+ }
+
+ private List<String> splitRowFields(String content) {
+ List<String> fields = new ArrayList<>();
+ StringBuilder current = new StringBuilder();
+ Stack<Character> bracketStack = new Stack<>();
+ boolean inQuotes = false;
+ boolean escaped = false;
+
+ for (char c : content.toCharArray()) {
+ if (escaped) {
+ escaped = false;
+ } else if (c == '\\') {
+ escaped = true;
+ } else if (c == '"') {
+ inQuotes = !inQuotes;
+ } else if (!inQuotes) {
+ if (StringUtils.isOpenBracket(c)) {
+ bracketStack.push(c);
+ } else if (StringUtils.isCloseBracket(c) &&
!bracketStack.isEmpty()) {
+ bracketStack.pop();
+ } else if (c == ',' && bracketStack.isEmpty()) {
+ addCurrentField(fields, current);
+ continue;
+ }
+ }
+ current.append(c);
+ }
+
+ addCurrentField(fields, current);
+ return fields;
+ }
+
+ private void addCurrentField(List<String> fields, StringBuilder current) {
+ if (current.length() > 0) {
+ fields.add(current.toString());
+ current.setLength(0);
+ }
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/casting/CastExecutorTest.java
b/paimon-common/src/test/java/org/apache/paimon/casting/CastExecutorTest.java
index cb3e14c536..457895f839 100644
---
a/paimon-common/src/test/java/org/apache/paimon/casting/CastExecutorTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/casting/CastExecutorTest.java
@@ -22,6 +22,9 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
@@ -54,6 +57,7 @@ import java.util.TimeZone;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
/** Test for {@link CastExecutor}. */
public class CastExecutorTest {
@@ -569,6 +573,151 @@ public class CastExecutorTest {
"12345678".getBytes());
}
+ @Test
+ public void testStringToArray() {
+ CastExecutor<BinaryString, InternalArray> stringToIntArray =
+ (CastExecutor<BinaryString, InternalArray>)
+ CastExecutors.resolve(
+ VarCharType.STRING_TYPE,
DataTypes.ARRAY(DataTypes.INT()));
+
+ InternalArray result =
stringToIntArray.cast(BinaryString.fromString("[1, 2, 3]"));
+ assertThat(result.size()).isEqualTo(3);
+ assertThat(result.getInt(0)).isEqualTo(1);
+ assertThat(result.getInt(1)).isEqualTo(2);
+ assertThat(result.getInt(2)).isEqualTo(3);
+
+ // Test empty array
+ result = stringToIntArray.cast(BinaryString.fromString("[]"));
+ assertThat(result.size()).isEqualTo(0);
+
+ // Test string to string array
+ CastExecutor<BinaryString, InternalArray> stringToStringArray =
+ (CastExecutor<BinaryString, InternalArray>)
+ CastExecutors.resolve(
+ VarCharType.STRING_TYPE,
DataTypes.ARRAY(DataTypes.STRING()));
+
+ result = stringToStringArray.cast(BinaryString.fromString("[hello,
world, test]"));
+ assertThat(result.size()).isEqualTo(3);
+ assertThat(result.getString(0).toString()).isEqualTo("hello");
+ assertThat(result.getString(1).toString()).isEqualTo("world");
+ assertThat(result.getString(2).toString()).isEqualTo("test");
+
+ // Test array with null values
+ result = stringToIntArray.cast(BinaryString.fromString("[1, null,
3]"));
+ assertThat(result.size()).isEqualTo(3);
+ assertThat(result.getInt(0)).isEqualTo(1);
+ assertThat(result.isNullAt(1)).isTrue();
+ assertThat(result.getInt(2)).isEqualTo(3);
+ }
+
+ @Test
+ public void testStringToMap() {
+ // Test string to map<string, int>
+ CastExecutor<BinaryString, InternalMap> stringToMap =
+ (CastExecutor<BinaryString, InternalMap>)
+ CastExecutors.resolve(
+ VarCharType.STRING_TYPE,
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()));
+
+ InternalMap result = stringToMap.cast(BinaryString.fromString("{key1
-> 1, key2 -> 2}"));
+ assertThat(result.size()).isEqualTo(2);
+
+ InternalArray keyArray = result.keyArray();
+ InternalArray valueArray = result.valueArray();
+ assertThat(keyArray.getString(0).toString()).isEqualTo("key2");
+ assertThat(valueArray.getInt(0)).isEqualTo(2);
+ assertThat(keyArray.getString(1).toString()).isEqualTo("key1");
+ assertThat(valueArray.getInt(1)).isEqualTo(1);
+
+ // Test empty map
+ result = stringToMap.cast(BinaryString.fromString("{}"));
+ assertThat(result.size()).isEqualTo(0);
+
+ // Test map with null values
+ result = stringToMap.cast(BinaryString.fromString("{key1 -> null, key2
-> 42}"));
+ assertThat(result.size()).isEqualTo(2);
+ keyArray = result.keyArray();
+ valueArray = result.valueArray();
+ assertThat(keyArray.getString(0).toString()).isEqualTo("key2");
+ assertThat(valueArray.getInt(0)).isEqualTo(42);
+ assertThat(keyArray.getString(1).toString()).isEqualTo("key1");
+ assertThat(valueArray.isNullAt(1)).isTrue();
+ }
+
+ @Test
+ public void testStringToRow() {
+ // Test string to row
+ RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING(),
DataTypes.BOOLEAN());
+ CastExecutor<BinaryString, InternalRow> stringToRow =
+ (CastExecutor<BinaryString, InternalRow>)
+ CastExecutors.resolve(VarCharType.STRING_TYPE,
rowType);
+
+ InternalRow result = stringToRow.cast(BinaryString.fromString("{42,
hello, true}"));
+ assertThat(result.getFieldCount()).isEqualTo(3);
+ assertThat(result.getInt(0)).isEqualTo(42);
+ assertThat(result.getString(1).toString()).isEqualTo("hello");
+ assertThat(result.getBoolean(2)).isTrue();
+
+ // Test empty row
+ result = stringToRow.cast(BinaryString.fromString("{}"));
+ assertThat(result.getFieldCount()).isEqualTo(3);
+ assertThat(result.isNullAt(0)).isTrue();
+ assertThat(result.isNullAt(1)).isTrue();
+ assertThat(result.isNullAt(2)).isTrue();
+
+ // Test row with null values
+ result = stringToRow.cast(BinaryString.fromString("{null, test,
false}"));
+ assertThat(result.getFieldCount()).isEqualTo(3);
+ assertThat(result.isNullAt(0)).isTrue();
+ assertThat(result.getString(1).toString()).isEqualTo("test");
+ assertThat(result.getBoolean(2)).isFalse();
+ }
+
+ @Test
+ public void testStringToComplexTypesErrorHandling() {
+ // Test invalid array format
+ CastExecutor<BinaryString, InternalArray> stringToIntArray =
+ (CastExecutor<BinaryString, InternalArray>)
+ CastExecutors.resolve(
+ VarCharType.STRING_TYPE,
DataTypes.ARRAY(DataTypes.INT()));
+
+ try {
+ stringToIntArray.cast(BinaryString.fromString("[1, 2, 3")); //
missing closing bracket
+ fail("Expected RuntimeException for invalid array format");
+ } catch (RuntimeException e) {
+ assertThat(e.getMessage()).contains("Cannot parse");
+ assertThat(e.getMessage()).contains("as ARRAY");
+ }
+
+ // Test invalid map format
+ CastExecutor<BinaryString, InternalMap> stringToMap =
+ (CastExecutor<BinaryString, InternalMap>)
+ CastExecutors.resolve(
+ VarCharType.STRING_TYPE,
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()));
+
+ try {
+ stringToMap.cast(BinaryString.fromString("{key1 -> 1, key2")); //
incomplete entry
+ fail("Expected RuntimeException for invalid map format");
+ } catch (RuntimeException e) {
+ assertThat(e.getMessage()).contains("Cannot parse");
+ assertThat(e.getMessage()).contains("as MAP");
+ }
+
+ // Test invalid row format
+ RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
+ CastExecutor<BinaryString, InternalRow> stringToRow =
+ (CastExecutor<BinaryString, InternalRow>)
+ CastExecutors.resolve(VarCharType.STRING_TYPE,
rowType);
+
+ try {
+ stringToRow.cast(BinaryString.fromString("{42, hello, extra}"));
// too many fields
+ fail("Expected RuntimeException for field count mismatch");
+ } catch (RuntimeException e) {
+ assertThat(e.getMessage()).contains("field count mismatch");
+ }
+ }
+
@Test
public void testBinaryToString() {
// binary(5) to string(10)
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/DefaultValueRowTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/DefaultValueRowTest.java
new file mode 100644
index 0000000000..9dcb1061fc
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/data/DefaultValueRowTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.casting.DefaultValueRow;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link DefaultValueRow} with complex data types support. */
+public class DefaultValueRowTest {
+
+ @Test
+ public void testDefaultValueRowWithArrayType() {
+ // Test with Array default value
+ RowType rowType =
+ RowType.of(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(
+ 1,
+ "tags",
+ DataTypes.ARRAY(DataTypes.STRING()),
+ "Default tags",
+ "[tag1, tag2, tag3]"),
+ new DataField(
+ 2,
+ "numbers",
+ DataTypes.ARRAY(DataTypes.INT()),
+ "Default numbers",
+ "[1, 2, 3]"));
+
+ DefaultValueRow defaultValueRow = DefaultValueRow.create(rowType);
+
+ GenericRow originalRow = new GenericRow(3);
+ originalRow.setField(0, 100);
+ originalRow.setField(1, null); // Use default tags
+ originalRow.setField(2, null); // Use default numbers
+
+ DefaultValueRow wrappedRow = defaultValueRow.replaceRow(originalRow);
+
+ assertThat(wrappedRow.getInt(0)).isEqualTo(100);
+
+ // Check default array values
+ InternalArray tagsValue = wrappedRow.getArray(1);
+ assertThat(tagsValue).isNotNull();
+
+ InternalArray numbersValue = wrappedRow.getArray(2);
+ assertThat(numbersValue).isNotNull();
+ }
+
+ @Test
+ public void testDefaultValueRowWithMapType() {
+ // Test with Map default value
+ RowType rowType =
+ RowType.of(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(
+ 1,
+ "properties",
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()),
+ "Default properties",
+ "{key1 -> value1, key2 -> value2}"));
+
+ DefaultValueRow defaultValueRow = DefaultValueRow.create(rowType);
+
+ GenericRow originalRow = new GenericRow(2);
+ originalRow.setField(0, 200);
+ originalRow.setField(1, null); // Use default properties
+
+ DefaultValueRow wrappedRow = defaultValueRow.replaceRow(originalRow);
+
+ assertThat(wrappedRow.getInt(0)).isEqualTo(200);
+
+ InternalMap propertiesValue = wrappedRow.getMap(1);
+ assertThat(propertiesValue).isNotNull();
+ }
+
+ @Test
+ public void testDefaultValueRowWithRowType() {
+ // Test with Row default value
+ RowType innerRowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
+ RowType rowType =
+ RowType.of(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(
+ 1,
+ "nested",
+ innerRowType,
+ "Default nested",
+ "{42, default_value}"));
+
+ DefaultValueRow defaultValueRow = DefaultValueRow.create(rowType);
+
+ GenericRow originalRow = new GenericRow(2);
+ originalRow.setField(0, 300);
+ originalRow.setField(1, null); // Use default nested row
+
+ DefaultValueRow wrappedRow = defaultValueRow.replaceRow(originalRow);
+
+ assertThat(wrappedRow.getInt(0)).isEqualTo(300);
+
+ InternalRow nestedValue = wrappedRow.getRow(1, 2);
+ assertThat(nestedValue).isNotNull();
+ }
+
+ @Test
+ public void testDefaultValueRowErrorHandlingForComplexTypes() {
+ // Test error handling for invalid array default
+ RowType rowType =
+ RowType.of(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(
+ 1,
+ "invalid_array",
+ DataTypes.ARRAY(DataTypes.INT()),
+ "Invalid array",
+ "[1, 2, invalid]"));
+
+ assertThatThrownBy(() -> DefaultValueRow.create(rowType))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Cannot parse");
+
+ // Test error handling for invalid map default
+ RowType mapRowType =
+ RowType.of(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(
+ 1,
+ "invalid_map",
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()),
+ "Invalid map",
+ "{key1 -> invalid_value}"));
+
+ assertThatThrownBy(() -> DefaultValueRow.create(mapRowType))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Cannot parse");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index dd7750b2f0..8430ff3e9e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -52,6 +52,108 @@ public class BranchSqlITCase extends CatalogITCaseBase {
.containsExactlyInAnyOrder("+I[1, 5]", "+I[2, 5]");
}
+ @Test
+ public void testArrayDefaultValue() throws Exception {
+ sql("CREATE TABLE T_ARRAY (id INT, tags ARRAY<STRING>, numbers
ARRAY<INT>)");
+ sql("CALL sys.alter_column_default_value('default.T_ARRAY', 'tags',
'[tag1, tag2]')");
+ sql("CALL sys.alter_column_default_value('default.T_ARRAY', 'numbers',
'[1, 2, 3]')");
+
+ sql("INSERT INTO T_ARRAY (id) VALUES (1), (2)");
+ assertThat(collectResult("SELECT * FROM T_ARRAY"))
+ .containsExactlyInAnyOrder(
+ "+I[1, [tag1, tag2], [1, 2, 3]]", "+I[2, [tag1, tag2],
[1, 2, 3]]");
+
+ sql("CREATE TABLE T_EMPTY_ARRAY (id INT, empty_tags ARRAY<STRING>)");
+ sql("CALL sys.alter_column_default_value('default.T_EMPTY_ARRAY',
'empty_tags', '[]')");
+ sql("INSERT INTO T_EMPTY_ARRAY (id) VALUES (1)");
+ assertThat(collectResult("SELECT * FROM T_EMPTY_ARRAY"))
+ .containsExactlyInAnyOrder("+I[1, []]");
+ }
+
+ @Test
+ public void testMapDefaultValue() throws Exception {
+ sql("CREATE TABLE T_MAP (id INT, properties MAP<STRING, STRING>)");
+ sql(
+ "CALL sys.alter_column_default_value('default.T_MAP',
'properties', '{key1 -> value1, key2 -> value2}')");
+
+ sql("INSERT INTO T_MAP (id) VALUES (1), (2)");
+ assertThat(collectResult("SELECT * FROM T_MAP"))
+ .containsExactlyInAnyOrder(
+ "+I[1, {key1=value1, key2=value2}]", "+I[2,
{key1=value1, key2=value2}]");
+
+ sql("CREATE TABLE T_EMPTY_MAP (id INT, empty_map MAP<STRING, INT>)");
+ sql("CALL sys.alter_column_default_value('default.T_EMPTY_MAP',
'empty_map', '{}')");
+ sql("INSERT INTO T_EMPTY_MAP (id) VALUES (1)");
+ assertThat(collectResult("SELECT * FROM T_EMPTY_MAP"))
+ .containsExactlyInAnyOrder("+I[1, {}]");
+ }
+
+ @Test
+ public void testRowDefaultValue() throws Exception {
+ sql("CREATE TABLE T_STRUCT (id INT, nested ROW<x INT, y STRING>)");
+ sql(
+ "CALL sys.alter_column_default_value('default.T_STRUCT',
'nested', '{42, default_value}')");
+
+ sql("INSERT INTO T_STRUCT (id) VALUES (1), (2)");
+ assertThat(collectResult("SELECT * FROM T_STRUCT"))
+ .containsExactlyInAnyOrder(
+ "+I[1, +I[42, default_value]]", "+I[2, +I[42,
default_value]]");
+
+ sql(
+ "CREATE TABLE T_COMPLEX_STRUCT (id INT, config ROW<enabled
BOOLEAN, timeout INT, name STRING>)");
+ sql(
+ "CALL
sys.alter_column_default_value('default.T_COMPLEX_STRUCT', 'config', '{true,
30, config_name}')");
+ sql("INSERT INTO T_COMPLEX_STRUCT (id) VALUES (1)");
+ assertThat(collectResult("SELECT * FROM T_COMPLEX_STRUCT"))
+ .containsExactlyInAnyOrder("+I[1, +I[true, 30, config_name]]");
+ }
+
+ @Test
+ public void testMixedComplexDefaultValue() throws Exception {
+ sql(
+ "CREATE TABLE T_MIXED (id INT, name STRING, tags
ARRAY<STRING>, metadata MAP<STRING, STRING>, config ROW<enabled BOOLEAN,
timeout INT>)");
+ sql("CALL sys.alter_column_default_value('default.T_MIXED', 'name',
'default_name')");
+ sql("CALL sys.alter_column_default_value('default.T_MIXED', 'tags',
'[default_tag]')");
+ sql(
+ "CALL sys.alter_column_default_value('default.T_MIXED',
'metadata', '{created_by -> system}')");
+ sql("CALL sys.alter_column_default_value('default.T_MIXED', 'config',
'{true, 30}')");
+
+ sql("INSERT INTO T_MIXED (id) VALUES (1)");
+ assertThat(collectResult("SELECT * FROM T_MIXED"))
+ .containsExactlyInAnyOrder(
+ "+I[1, default_name, [default_tag],
{created_by=system}, +I[true, 30]]");
+
+ sql("INSERT INTO T_MIXED (id, name) VALUES (2, 'custom_name')");
+ assertThat(collectResult("SELECT * FROM T_MIXED WHERE id = 2"))
+ .containsExactlyInAnyOrder(
+ "+I[2, custom_name, [default_tag],
{created_by=system}, +I[true, 30]]");
+ }
+
+ @Test
+ public void testNestedComplexDefaultValue() throws Exception {
+ sql(
+ "CREATE TABLE T_NESTED ("
+ + "id INT, "
+ + "nested_array ARRAY<ROW<name STRING, val INT>>, "
+ + "nested_struct ROW<info ROW<x INT, y STRING>,
enabled BOOLEAN>"
+ + ")");
+
+ sql(
+ "CALL sys.alter_column_default_value('default.T_NESTED',
'nested_array', '[{item1, 10}, {item2, 20}]')");
+ sql(
+ "CALL sys.alter_column_default_value('default.T_NESTED',
'nested_struct', '{{42, nested_value}, true}')");
+
+ sql("INSERT INTO T_NESTED (id) VALUES (1)");
+ assertThat(collectResult("SELECT * FROM T_NESTED"))
+ .containsExactlyInAnyOrder(
+ "+I[1, [+I[item1, 10], +I[item2, 20]], +I[+I[42,
nested_value], true]]");
+
+ sql("INSERT INTO T_NESTED (id, nested_array) VALUES (2,
ARRAY[ROW('custom', 99)])");
+ assertThat(collectResult("SELECT * FROM T_NESTED WHERE id = 2"))
+ .containsExactlyInAnyOrder(
+ "+I[2, [+I[custom, 99]], +I[+I[42, nested_value],
true]]");
+ }
+
@Test
public void testDefaultValueForPkTableDynamicBucket() throws Exception {
sql("CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT)");
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 237e1ac135..51c2e63b5f 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -128,6 +128,216 @@ public class SparkWriteITCase {
"[[1,2,my_value], [2,2,my_value], [3,2,my_value],
[4,2,my_value], [5,3,my_value]]");
}
+ @Test
+ public void testWriteWithArrayDefaultValue() {
+ // Test Array type default value - using Spark SQL function syntax
+ spark.sql(
+ "CREATE TABLE T (id INT, tags ARRAY<STRING> DEFAULT
ARRAY('tag1', 'tag2'), numbers ARRAY<INT> DEFAULT ARRAY(1, 2, 3)) TBLPROPERTIES"
+ + " ('file.format'='avro')");
+
+ // test show create table for array
+ List<Row> show = spark.sql("SHOW CREATE TABLE T").collectAsList();
+ assertThat(show.toString())
+ .contains("tags ARRAY<STRING> DEFAULT ARRAY('tag1', 'tag2')")
+ .contains("numbers ARRAY<INT> DEFAULT ARRAY(1, 2, 3)");
+
+ // test partial write with array defaults
+ spark.sql("INSERT INTO T (id) VALUES (1), (2)").collectAsList();
+ List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+ // Support both Spark 3.x (WrappedArray) and Spark 4.x (ArraySeq)
formats
+ assertThat(rows.toString())
+ .satisfiesAnyOf(
+ s -> assertThat(s).contains("WrappedArray('tag1',
'tag2')"),
+ s -> assertThat(s).contains("ArraySeq('tag1',
'tag2')"));
+ assertThat(rows.toString())
+ .satisfiesAnyOf(
+ s -> assertThat(s).contains("WrappedArray(1, 2, 3)"),
+ s -> assertThat(s).contains("ArraySeq(1, 2, 3)"));
+ assertThat(rows.size()).isEqualTo(2);
+
+ // test write with DEFAULT keyword for arrays
+ spark.sql("INSERT INTO T VALUES (3, DEFAULT,
DEFAULT)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString())
+ .satisfiesAnyOf(
+ s -> assertThat(s).contains("WrappedArray('tag1',
'tag2')"),
+ s -> assertThat(s).contains("ArraySeq('tag1',
'tag2')"));
+ assertThat(rows.size()).isEqualTo(3);
+
+ // test empty array default value
+ spark.sql("DROP TABLE IF EXISTS T");
+ spark.sql(
+ "CREATE TABLE T (id INT, empty_array ARRAY<STRING> DEFAULT
ARRAY()) TBLPROPERTIES"
+ + " ('file.format'='avro')");
+
+ spark.sql("INSERT INTO T (id) VALUES (1)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString())
+ .satisfiesAnyOf(
+ s -> assertThat(s).contains("WrappedArray()"),
+ s -> assertThat(s).contains("ArraySeq()"));
+
+ // test write with DEFAULT keyword for empty array
+ spark.sql("INSERT INTO T VALUES (2, DEFAULT)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString())
+ .satisfiesAnyOf(
+ s -> assertThat(s).contains("WrappedArray()"),
+ s -> assertThat(s).contains("ArraySeq()"));
+ assertThat(rows.size()).isEqualTo(2);
+ }
+
+ @Test
+ public void testWriteWithMapDefaultValue() {
+ // Test Map type default value - using Spark SQL function syntax
+ spark.sql(
+ "CREATE TABLE T (id INT, properties MAP<STRING, STRING>
DEFAULT MAP('key1', 'value1', 'key2', 'value2')) TBLPROPERTIES"
+ + " ('file.format'='avro')");
+
+ // test show create table for map
+ List<Row> show = spark.sql("SHOW CREATE TABLE T").collectAsList();
+ assertThat(show.toString())
+ .contains(
+ "properties MAP<STRING, STRING> DEFAULT MAP('key1',
'value1', 'key2', 'value2')");
+
+ // test partial write with map defaults
+ spark.sql("INSERT INTO T (id) VALUES (1), (2)").collectAsList();
+ List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).contains("'key1' -> 'value1'");
+ assertThat(rows.toString()).contains("'key2' -> 'value2'");
+ assertThat(rows.size()).isEqualTo(2);
+
+ // test write with DEFAULT keyword for map
+ spark.sql("INSERT INTO T VALUES (3, DEFAULT)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).contains("'key1' -> 'value1'");
+ assertThat(rows.toString()).contains("'key2' -> 'value2'");
+ assertThat(rows.size()).isEqualTo(3);
+
+ // test empty map default value
+ spark.sql("DROP TABLE IF EXISTS T");
+ spark.sql(
+ "CREATE TABLE T (id INT, empty_map MAP<STRING, INT> DEFAULT
MAP()) TBLPROPERTIES"
+ + " ('file.format'='avro')");
+
+ spark.sql("INSERT INTO T (id) VALUES (1)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).contains("Map()");
+
+ // test write with DEFAULT keyword for empty map
+ spark.sql("INSERT INTO T VALUES (2, DEFAULT)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).contains("Map()");
+ assertThat(rows.size()).isEqualTo(2);
+ }
+
+ @Test
+ public void testWriteWithStructDefaultValue() {
+ // Test Struct/Row type default value - using Spark SQL function syntax
+ spark.sql(
+ "CREATE TABLE T (id INT, nested STRUCT<x: INT, y: STRING>
DEFAULT STRUCT(42, 'default_value')) TBLPROPERTIES"
+ + " ('file.format'='avro')");
+
+ // test show create table for struct
+ List<Row> show = spark.sql("SHOW CREATE TABLE T").collectAsList();
+ assertThat(show.toString())
+ .contains("nested STRUCT<x: INT, y: STRING> DEFAULT STRUCT(42,
'default_value')");
+
+ // test partial write with struct defaults
+ spark.sql("INSERT INTO T (id) VALUES (1), (2)").collectAsList();
+ List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).contains("[42,'default_value']");
+ assertThat(rows.size()).isEqualTo(2);
+
+ // test write with DEFAULT keyword for struct
+ spark.sql("INSERT INTO T VALUES (3, DEFAULT)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).contains("[42,'default_value']");
+ assertThat(rows.size()).isEqualTo(3);
+
+ // test complex struct with multiple types
+ spark.sql("DROP TABLE IF EXISTS T");
+ spark.sql(
+ "CREATE TABLE T (id INT, config STRUCT<enabled: BOOLEAN,
timeout: INT, name: STRING> DEFAULT STRUCT(true, 30, 'config_name'))
TBLPROPERTIES"
+ + " ('file.format'='avro')");
+
+ spark.sql("INSERT INTO T (id) VALUES (1)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).contains("[true,30,'config_name']");
+ assertThat(rows.size()).isEqualTo(1);
+ }
+
+ @Test
+ public void testWriteWithNestedComplexDefaultValue() {
+ // Test nested complex types with default values - using Spark SQL
function syntax
+ spark.sql(
+ "CREATE TABLE T (id INT, "
+ + "nested_array ARRAY<STRUCT<name: STRING, value:
INT>> DEFAULT ARRAY(STRUCT('item1', 10), STRUCT('item2', 20)), "
+ + "map_of_arrays MAP<STRING, ARRAY<INT>> DEFAULT
MAP('list1', ARRAY(1, 2), 'list2', ARRAY(3, 4))) TBLPROPERTIES"
+ + " ('file.format'='avro')");
+
+ // test show create table for nested complex types
+ List<Row> show = spark.sql("SHOW CREATE TABLE T").collectAsList();
+ assertThat(show.toString())
+ .contains(
+ "nested_array ARRAY<STRUCT<name: STRING, value: INT>>
DEFAULT ARRAY(STRUCT('item1', 10), STRUCT('item2', 20))")
+ .contains(
+ "map_of_arrays MAP<STRING, ARRAY<INT>> DEFAULT
MAP('list1', ARRAY(1, 2), 'list2', ARRAY(3, 4))");
+
+ // test partial write with nested complex type defaults
+ spark.sql("INSERT INTO T (id) VALUES (1)").collectAsList();
+ List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.size()).isEqualTo(1);
+ assertThat(rows.get(0).getInt(0)).isEqualTo(1);
+
+ // test write with DEFAULT keyword for nested complex types
+ spark.sql("INSERT INTO T VALUES (2, DEFAULT,
DEFAULT)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.size()).isEqualTo(2);
+
+ // test mixed simple and complex types with defaults
+ spark.sql("DROP TABLE IF EXISTS T");
+ spark.sql(
+ "CREATE TABLE T (id INT, "
+ + "name STRING DEFAULT 'default_name', "
+ + "tags ARRAY<STRING> DEFAULT ARRAY('default_tag'), "
+ + "metadata MAP<STRING, STRING> DEFAULT
MAP('created_by', 'system'), "
+ + "config STRUCT<enabled: BOOLEAN, timeout: INT>
DEFAULT STRUCT(true, 30)) TBLPROPERTIES"
+ + " ('file.format'='avro')");
+
+ // test partial write with mixed defaults
+ spark.sql("INSERT INTO T (id) VALUES (1)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).contains("default_name");
+ assertThat(rows.toString())
+ .satisfiesAnyOf(
+ s ->
assertThat(s).contains("WrappedArray('default_tag')"),
+ s ->
assertThat(s).contains("ArraySeq('default_tag')"));
+ assertThat(rows.toString()).contains("Map('created_by' -> 'system')");
+ assertThat(rows.toString()).contains("[true,30]");
+
+ // test selective column insertion with mixed defaults
+ spark.sql("INSERT INTO T (id, name) VALUES (2,
'custom_name')").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).contains("custom_name");
+ assertThat(rows.toString())
+ .satisfiesAnyOf(
+ s ->
assertThat(s).contains("WrappedArray('default_tag')"),
+ s ->
assertThat(s).contains("ArraySeq('default_tag')"));
+ assertThat(rows.size()).isEqualTo(2);
+
+ // test write with some DEFAULT keywords for mixed types
+ spark.sql("INSERT INTO T VALUES (3, DEFAULT, ARRAY('custom_tag'),
DEFAULT, DEFAULT)")
+ .collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).contains("default_name");
+ assertThat(rows.toString())
+ .satisfiesAnyOf(
+ s ->
assertThat(s).contains("WrappedArray(custom_tag)"),
+ s -> assertThat(s).contains("ArraySeq(custom_tag)"));
+ assertThat(rows.size()).isEqualTo(3);
+ }
+
@ParameterizedTest
@ValueSource(strings = {"order", "zorder", "hilbert"})
public void testWriteWithClustering(String clusterStrategy) {