This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 576919bfab [Feature][Connector-V2][Assert] Support field type assert
and field value equality assert for full data types (#6275)
576919bfab is described below
commit 576919bfab21a8788fdad54b0ff3a024268005db
Author: Chengyu Yan <[email protected]>
AuthorDate: Tue Jan 30 16:01:47 2024 +0800
[Feature][Connector-V2][Assert] Support field type assert and field value
equality assert for full data types (#6275)
- Support null, row, array, map type check
- Support array, map, row value equal check
- Add a assert rule that value can be null
- Support declare data types and data values for FakeSource and
AssertSink in the consistency way
- Fix JsonDeserializationSchema never convert key of map.
---
docs/en/concept/schema-feature.md | 4 +-
docs/en/connector-v2/sink/Assert.md | 370 +++++++++++++++++++--
docs/en/connector-v2/source/FakeSource.md | 5 +-
release-note.md | 1 +
.../catalog/SeaTunnelDataTypeConvertorUtil.java | 29 +-
.../SeaTunnelDataTypeConvertorUtilTest.java | 77 +++++
seatunnel-connectors-v2/connector-assert/pom.xml | 5 +
.../assertion/excecutor/AssertExecutor.java | 314 +++++++++++++----
.../seatunnel/assertion/rule/AssertFieldRule.java | 4 +-
.../seatunnel/assertion/rule/AssertRuleParser.java | 83 +++--
.../flink/assertion/AssertExecutorTest.java | 285 +++++++++++++++-
.../flink/assertion/rule/AssertRuleParserTest.java | 39 ++-
.../connector/assertion/FakeSourceToAssertIT.java | 22 ++
.../assertion/fake_full_types_to_assert.conf | 308 +++++++++++++++++
.../resources/assertion/fake_row_to_assert.conf | 114 +++++++
.../test/resources/fake_to_assert_with_range.conf | 6 +-
.../resources/fake_to_assert_with_template.conf | 6 +-
.../json/local_file_json_lzo_to_console.conf | 2 +-
.../text/local_file_text_lzo_to_assert.conf | 2 +-
.../json/oss_file_json_lzo_to_console.conf | 2 +-
.../text/oss_file_text_lzo_to_assert.conf | 2 +-
.../src/test/resources/iceberg/iceberg_source.conf | 2 +-
.../src/test/resources/iceberg/iceberg_source.conf | 2 +-
.../resources/jdbc_hive_source_and_assert.conf | 6 +-
.../test/resources/avro/kafka_avro_to_assert.conf | 2 +-
.../jsonFormatIT/kafka_source_json_to_console.conf | 2 +-
.../kafka/kafkasource_earliest_to_console.conf | 2 +-
...ce_format_error_handle_way_fail_to_console.conf | 2 +-
...ce_format_error_handle_way_skip_to_console.conf | 2 +-
.../kafka/kafkasource_group_offset_to_console.conf | 2 +-
.../kafka/kafkasource_latest_to_console.conf | 2 +-
.../kafkasource_specific_offsets_to_console.conf | 2 +-
.../kafka/kafkasource_timestamp_to_console.conf | 2 +-
.../textFormatIT/kafka_source_text_to_console.conf | 2 +-
...ource_text_to_console_assert_catalog_table.conf | 2 +-
.../test/resources/batch_pulsar_to_console.conf | 6 +-
.../resources/rocketmq-source_json_to_console.conf | 2 +-
.../resources/rocketmq-source_text_to_console.conf | 2 +-
.../rocketmq_source_earliest_to_console.conf | 2 +-
.../rocketmq_source_group_offset_to_console.conf | 2 +-
.../rocketmq_source_latest_to_console.conf | 2 +-
...ocketmq_source_specific_offsets_to_console.conf | 2 +-
.../rocketmq_source_timestamp_to_console.conf | 2 +-
.../resources/sql_transform/func_datetime.conf | 14 +-
.../test/resources/sql_transform/func_string.conf | 6 +-
.../seatunnel/format/json/JsonToRowConverters.java | 13 +-
.../format/json/JsonRowDataSerDeSchemaTest.java | 90 +++++
47 files changed, 1644 insertions(+), 211 deletions(-)
diff --git a/docs/en/concept/schema-feature.md
b/docs/en/concept/schema-feature.md
index 04463c2563..15f8186cce 100644
--- a/docs/en/concept/schema-feature.md
+++ b/docs/en/concept/schema-feature.md
@@ -91,7 +91,9 @@ columns = [
#### How to declare type supported
-SeaTunnel provides a simple and direct way to declare basic types. The keyword
names for basic types can be used directly as type declarations, and SeaTunnel
is case-insensitive to type keywords. Basic type keywords include `string`,
`boolean`, `tinyint`, `smallint`, `int`, `bigint`, `float`, `double`, `date`,
`time`, `timestamp`, and `null`. For example, if you need to declare a field
with integer type, you can simply define the field as `int` or `"int"`.
+SeaTunnel provides a simple and direct way to declare basic types. Basic type
keywords include `string`, `boolean`, `tinyint`, `smallint`, `int`, `bigint`,
`float`, `double`, `date`, `time`, `timestamp`, and `null`. The keyword names
for basic types can be used directly as type declarations, and SeaTunnel is
case-insensitive to type keywords. For example, if you need to declare a field
with integer type, you can simply define the field as `int` or `"int"`.
+
+> The null type declaration must be enclosed in double quotes, like `"null"`.
This approach helps avoid confusion with
[HOCON](https://github.com/lightbend/config/blob/main/HOCON.md)'s `null` type
which represents undefined object.
When declaring complex types (such as **decimal**, **array**, **map**, and
**row**), pay attention to specific considerations.
- When declaring a decimal type, precision and scale settings are required,
and the type definition follows the format `decimal(precision, scale)`. It's
essential to emphasize that the declaration of the decimal type must be
enclosed in `"`; you cannot use the type name directly, as with basic types.
For example, when declaring a decimal field with precision 10 and scale 2, you
specify the field type as `"decimal(10,2)"`.
diff --git a/docs/en/connector-v2/sink/Assert.md
b/docs/en/connector-v2/sink/Assert.md
index 8257ff8f65..e02d0fc6b9 100644
--- a/docs/en/connector-v2/sink/Assert.md
+++ b/docs/en/connector-v2/sink/Assert.md
@@ -12,37 +12,38 @@ A flink sink plugin which can assert illegal data by user
defined rules
## Options
-| Name
| Type | Required | Default |
-|------------------------------------------------------------------------------------------------|------------|----------|---------|
-| rules
| ConfigMap | yes | - |
-| rules.field_rules
| string | yes | - |
-| rules.field_rules.field_name
| string | yes | - |
-| rules.field_rules.field_type
| string | no | - |
-| rules.field_rules.field_value
| ConfigList | no | - |
-| rules.field_rules.field_value.rule_type
| string | no | - |
-| rules.field_rules.field_value.rule_value
| double | no | - |
-| rules.row_rules
| string | yes | - |
-| rules.row_rules.rule_type
| string | no | - |
-| rules.row_rules.rule_value
| string | no | - |
-| rules.catalog_table_rule
| ConfigMap | no | - |
-| rules.catalog_table_rule.primary_key_rule
| ConfigMap | no | - |
-| rules.catalog_table_rule.primary_key_rule.primary_key_name
| string | no | - |
-| rules.catalog_table_rule.primary_key_rule.primary_key_columns
| list | no | - |
-| rules.catalog_table_rule.constraint_key_rule
| ConfigList | no | - |
-| rules.catalog_table_rule.constraint_key_rule.constraint_key_name
| string | no | - |
-| rules.catalog_table_rule.constraint_key_rule.constraint_key_type
| string | no | - |
-| rules.catalog_table_rule.constraint_key_rule.constraint_key_columns
| ConfigList | no | - |
-|
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_column_name
| string | no | - |
-|
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_sort_type
| string | no | - |
-| rules.catalog_table_rule.column_rule
| ConfigList | no | - |
-| rules.catalog_table_rule.column_rule.name
| string | no | - |
-| rules.catalog_table_rule.column_rule.type
| string | no | - |
-| rules.catalog_table_rule.column_rule.column_length
| int | no | - |
-| rules.catalog_table_rule.column_rule.nullable
| boolean | no | - |
-| rules.catalog_table_rule.column_rule.default_value
| string | no | - |
-| rules.catalog_table_rule.column_rule.comment
| comment | no | - |
-| rules.table-names
| list | no | - |
-| common-options
| | no | - |
+| Name
| Type | Required
| Default |
+|------------------------------------------------------------------------------------------------|-------------------------------------------------|----------|---------|
+| rules
| ConfigMap | yes
| - |
+| rules.field_rules
| string | yes
| - |
+| rules.field_rules.field_name
| string\|ConfigMap | yes
| - |
+| rules.field_rules.field_type
| string | no
| - |
+| rules.field_rules.field_value
| ConfigList | no
| - |
+| rules.field_rules.field_value.rule_type
| string | no
| - |
+| rules.field_rules.field_value.rule_value
| numeric | no
| - |
+| rules.field_rules.field_value.equals_to
| boolean\|numeric\|string\|ConfigList\|ConfigMap | no
| - |
+| rules.row_rules
| string | yes
| - |
+| rules.row_rules.rule_type
| string | no
| - |
+| rules.row_rules.rule_value
| string | no
| - |
+| rules.catalog_table_rule
| ConfigMap | no
| - |
+| rules.catalog_table_rule.primary_key_rule
| ConfigMap | no
| - |
+| rules.catalog_table_rule.primary_key_rule.primary_key_name
| string | no
| - |
+| rules.catalog_table_rule.primary_key_rule.primary_key_columns
| ConfigList | no
| - |
+| rules.catalog_table_rule.constraint_key_rule
| ConfigList | no
| - |
+| rules.catalog_table_rule.constraint_key_rule.constraint_key_name
| string | no
| - |
+| rules.catalog_table_rule.constraint_key_rule.constraint_key_type
| string | no
| - |
+| rules.catalog_table_rule.constraint_key_rule.constraint_key_columns
| ConfigList | no
| - |
+|
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_column_name
| string | no | - |
+|
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_sort_type
| string | no | - |
+| rules.catalog_table_rule.column_rule
| ConfigList | no
| - |
+| rules.catalog_table_rule.column_rule.name
| string | no
| - |
+| rules.catalog_table_rule.column_rule.type
| string | no
| - |
+| rules.catalog_table_rule.column_rule.column_length
| int | no
| - |
+| rules.catalog_table_rule.column_rule.nullable
| boolean | no
| - |
+| rules.catalog_table_rule.column_rule.default_value
| string | no
| - |
+| rules.catalog_table_rule.column_rule.comment
| comment | no
| - |
+| rules.table-names
| ConfigList | no
| - |
+| common-options
| | no
| - |
### rules [ConfigMap]
@@ -56,9 +57,9 @@ field rules for field validation
field name(string)
-### field_type [string]
+### field_type [string | ConfigMap]
-field type (string), e.g.
`string,boolean,byte,short,int,long,float,double,char,void,BigInteger,BigDecimal,Instant`
+Field type declarations should adhere to this
[guide](../../concept/schema-feature.md#how-to-declare-type-supported).
### field_value [ConfigList]
@@ -68,6 +69,7 @@ A list value rule define the data value validation
The following rules are supported for now
- NOT_NULL `value can't be null`
+- NULL `value can be null`
- MIN `define the minimum value of data`
- MAX `define the maximum value of data`
- MIN_LENGTH `define the minimum string length of a string data`
@@ -75,9 +77,17 @@ The following rules are supported for now
- MIN_ROW `define the minimun number of rows`
- MAX_ROW `define the maximum number of rows`
-### rule_value [double]
+### rule_value [numeric]
-the value related to rule type
+The value related to rule type. When the `rule_type` is `MIN`, `MAX`,
`MIN_LENGTH`, `MAX_LENGTH`, `MIN_ROW` or `MAX_ROW`, users need to assign a
value to the `rule_value`.
+
+### equals_to [boolean | numeric | string | ConfigList | ConfigMap]
+
+`equals_to` is used to compare whether the field value is equal to the
configured expected value. You can assign values of all types to `equals_to`.
These types are detailed
[here](../../concept/schema-feature.md#what-type-supported-at-now). For
instance, if one field is a row with three fields, and the declaration of row
type is `{a = array<string>, b = map<string, decimal(30, 2)>, c={c_0 = int, b =
string}}`, users can assign the value `[["a", "b"], { k0 = 9999.99, k1 = 111.11
}, [123, [...]
+
+> The way of defining field values is consistent with
[FakeSource](../source/FakeSource.md#customize-the-data-content-simple).
+>
+> `equals_to` cannot be applied to `null` type fields. However, users can use
the rule type `NULL` for verification, such as `{rule_type = NULL}`.
### catalog_table_rule [ConfigMap]
@@ -131,6 +141,7 @@ Assert {
field_value = [
{
rule_type = NOT_NULL
+ equals_to = 23
},
{
rule_type = MIN
@@ -178,7 +189,296 @@ Assert {
}
}
+```
+
+Here is a more complex example about `equals_to`. The example involves
FakeSource. You may want to learn it, please read this
[document](../source/FakeSource.md).
+
+```hocon
+source {
+ FakeSource {
+ row.num = 1
+ schema = {
+ fields {
+ c_null = "null"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_timestamp = timestamp
+ c_time = time
+ c_bytes = bytes
+ c_array = "array<int>"
+ c_map = "map<time, string>"
+ c_map_nest = "map<string, {c_int = int, c_string = string}>"
+ c_row = {
+ c_null = "null"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_timestamp = timestamp
+ c_time = time
+ c_bytes = bytes
+ c_array = "array<int>"
+ c_map = "map<string, string>"
+ }
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [
+ null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999,
"2012-12-21", "2012-12-21T12:34:56", "12:34:56",
+ "bWlJWmo=",
+ [0, 1, 2],
+ "{ 12:01:26 = v0 }",
+ { k1 = [123, "BBB-BB"]},
+ [
+ null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333,
99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
+ "bWlJWmo=",
+ [0, 1, 2],
+ { k0 = v0 }
+ ]
+ ]
+ }
+ ]
+ result_table_name = "fake"
+ }
+}
+sink{
+ Assert {
+ source_table_name = "fake"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_null
+ field_type = "null"
+ field_value = [
+ {
+ rule_type = NULL
+ }
+ ]
+ },
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "AAA"
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = false
+ }
+ ]
+ },
+ {
+ field_name = c_tinyint
+ field_type = tinyint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 1
+ }
+ ]
+ },
+ {
+ field_name = c_smallint
+ field_type = smallint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 1
+ }
+ ]
+ },
+ {
+ field_name = c_int
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 333
+ }
+ ]
+ },
+ {
+ field_name = c_bigint
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 323232
+ }
+ ]
+ },
+ {
+ field_name = c_float
+ field_type = float
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 3.1
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 9.33333
+ }
+ ]
+ },
+ {
+ field_name = c_decimal
+ field_type = "decimal(30, 8)"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 99999.99999999
+ }
+ ]
+ },
+ {
+ field_name = c_date
+ field_type = date
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "2012-12-21"
+ }
+ ]
+ },
+ {
+ field_name = c_timestamp
+ field_type = timestamp
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "2012-12-21T12:34:56"
+ }
+ ]
+ },
+ {
+ field_name = c_time
+ field_type = time
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "12:34:56"
+ }
+ ]
+ },
+ {
+ field_name = c_bytes
+ field_type = bytes
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "bWlJWmo="
+ }
+ ]
+ },
+ {
+ field_name = c_array
+ field_type = "array<int>"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = [0, 1, 2]
+ }
+ ]
+ },
+ {
+ field_name = c_map
+ field_type = "map<time, string>"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "{ 12:01:26 = v0 }"
+ }
+ ]
+ },
+ {
+ field_name = c_map_nest
+ field_type = "map<string, {c_int = int, c_string = string}>"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = { k1 = [123, "BBB-BB"] }
+ }
+ ]
+ },
+ {
+ field_name = c_row
+ field_type = {
+ c_null = "null"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_timestamp = timestamp
+ c_time = time
+ c_bytes = bytes
+ c_array = "array<int>"
+ c_map = "map<string, string>"
+ }
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = [
+ null, "AAA", false, 1, 1, 333, 323232, 3.1,
9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
+ "bWlJWmo=",
+ [0, 1, 2],
+ { k0 = v0 }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
```
## Changelog
diff --git a/docs/en/connector-v2/source/FakeSource.md
b/docs/en/connector-v2/source/FakeSource.md
index 43cc8dc671..c85df37261 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -68,12 +68,13 @@ just for some test cases such as type conversion or
connector new feature testin
### Simple:
-> This example Randomly generates data of a specified type
+> This example Randomly generates data of a specified type. If you want to
learn how to declare field types, click
[here](../../concept/schema-feature.md#how-to-declare-type-supported).
```hocon
schema = {
fields {
c_map = "map<string, array<int>>"
+ c_map_nest = "map<string, {c_int = int, c_string = string}>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
@@ -190,6 +191,8 @@ source {
}
```
+> Due to the constraints of the
[HOCON](https://github.com/lightbend/config/blob/main/HOCON.md) specification,
users cannot directly create byte sequence objects. FakeSource uses strings to
assign `bytes` type values. In the example above, the `bytes` type field is
assigned `"bWlJWmo="`, which is encoded from "miIZj" with **base64**. Hence,
when assigning values to `bytes` type fields, please use strings encoded with
**base64**.
+
### Specified Data number Simple:
> This case specifies the number of data generated and the length of the
> generated value
diff --git a/release-note.md b/release-note.md
index 831018d273..e31dad48fe 100644
--- a/release-note.md
+++ b/release-note.md
@@ -187,6 +187,7 @@
- [Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
- [Transform-V2] Add catalog support for SQL Transform plugin (#4819)
- [Connector-V2] [Assert] Support check the precision and scale of Decimal
type (#6110)
+- [Connector-V2] [Assert] Support field type assert and field value equality
assert for full data types (#6275)
### Zeta(ST-Engine)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
index ce71a46051..cc7ec83fb1 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
@@ -43,7 +43,8 @@ public class SeaTunnelDataTypeConvertorUtil {
String field, String columnType) {
SqlType sqlType = null;
try {
- sqlType = SqlType.valueOf(columnType.toUpperCase().replace(" ",
""));
+ String compatible = compatibleTypeDeclare(columnType);
+ sqlType = SqlType.valueOf(compatible.toUpperCase().replace(" ",
""));
} catch (IllegalArgumentException e) {
// nothing
}
@@ -84,6 +85,32 @@ public class SeaTunnelDataTypeConvertorUtil {
}
}
+ /**
+ * User-facing data type declarations will adhere to the specifications
outlined in
+ * schema-feature.md. To maintain backward compatibility, this function
will transform type
+ * declarations into standard form, including: <code>long ->
bigint</code>, <code>
+ * short -> smallint</code>, and <code>byte -> tinyint</code>.
+ *
+ * <p>In a future version, user-facing data type declarations will
strictly follow the
+ * specifications, and this function will be removed.
+ *
+ * @param declare
+ * @return compatible type
+ */
+ @Deprecated
+ private static String compatibleTypeDeclare(String declare) {
+ switch (declare.trim().toUpperCase()) {
+ case "LONG":
+ return "BIGINT";
+ case "SHORT":
+ return "SMALLINT";
+ case "BYTE":
+ return "TINYINT";
+ default:
+ return declare;
+ }
+ }
+
private static SeaTunnelDataType<?> parseComplexDataType(String field,
String columnStr) {
String column = columnStr.toUpperCase().replace(" ", "");
if (column.startsWith(SqlType.MAP.name())) {
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
index f126722d83..56934666bc 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
@@ -17,6 +17,11 @@
package org.apache.seatunnel.api.table.catalog;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.junit.jupiter.api.Assertions;
@@ -89,4 +94,76 @@ public class SeaTunnelDataTypeConvertorUtilTest {
"Unsupported parse SeaTunnel Type from '%s'.",
invalidTypeDeclaration);
Assertions.assertEquals(expectedMsg6, exception6.getMessage());
}
+
+ @Test
+ public void testCompatibleTypeDeclare() {
+ SeaTunnelDataType<?> longType =
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType("c_long", "long");
+ Assertions.assertEquals(BasicType.LONG_TYPE, longType);
+
+ SeaTunnelDataType<?> shortType =
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType("c_short", "short");
+ Assertions.assertEquals(BasicType.SHORT_TYPE, shortType);
+
+ SeaTunnelDataType<?> byteType =
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType("c_byte", "byte");
+ Assertions.assertEquals(BasicType.BYTE_TYPE, byteType);
+
+ ArrayType<?, ?> longArrayType =
+ (ArrayType<?, ?>)
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+ "c_long_array", "array<long>");
+ Assertions.assertEquals(ArrayType.LONG_ARRAY_TYPE, longArrayType);
+
+ ArrayType<?, ?> shortArrayType =
+ (ArrayType<?, ?>)
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+ "c_short_array", "array<short>");
+ Assertions.assertEquals(ArrayType.SHORT_ARRAY_TYPE, shortArrayType);
+
+ ArrayType<?, ?> byteArrayType =
+ (ArrayType<?, ?>)
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+ "c_byte_array", "array<byte>");
+ Assertions.assertEquals(ArrayType.BYTE_ARRAY_TYPE, byteArrayType);
+
+ MapType<?, ?> longMapType =
+ (MapType<?, ?>)
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+ "c_long_map", "map<long, long>");
+ Assertions.assertEquals(BasicType.LONG_TYPE, longMapType.getKeyType());
+ Assertions.assertEquals(BasicType.LONG_TYPE,
longMapType.getValueType());
+
+ MapType<?, ?> shortMapType =
+ (MapType<?, ?>)
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+ "c_short_map", "map<short, short>");
+ Assertions.assertEquals(BasicType.SHORT_TYPE,
shortMapType.getKeyType());
+ Assertions.assertEquals(BasicType.SHORT_TYPE,
shortMapType.getValueType());
+
+ MapType<?, ?> byteMapType =
+ (MapType<?, ?>)
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+ "c_byte_map", "map<byte, byte>");
+ Assertions.assertEquals(BasicType.BYTE_TYPE, byteMapType.getKeyType());
+ Assertions.assertEquals(BasicType.BYTE_TYPE,
byteMapType.getValueType());
+
+ SeaTunnelRowType longRow =
+ (SeaTunnelRowType)
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+ "c_long_row", "{c = long}");
+ Assertions.assertEquals(BasicType.LONG_TYPE, longRow.getFieldType(0));
+
+ SeaTunnelRowType shortRow =
+ (SeaTunnelRowType)
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+ "c_short_row", "{c = short}");
+ Assertions.assertEquals(BasicType.SHORT_TYPE,
shortRow.getFieldType(0));
+
+ SeaTunnelRowType byteRow =
+ (SeaTunnelRowType)
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+ "c_byte_row", "{c = byte}");
+ Assertions.assertEquals(BasicType.BYTE_TYPE, byteRow.getFieldType(0));
+ }
}
diff --git a/seatunnel-connectors-v2/connector-assert/pom.xml
b/seatunnel-connectors-v2/connector-assert/pom.xml
index 6bbc76a5b1..dc141d2aa4 100644
--- a/seatunnel-connectors-v2/connector-assert/pom.xml
+++ b/seatunnel-connectors-v2/connector-assert/pom.xml
@@ -33,6 +33,11 @@
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
index 21142e8158..bc73dbf151 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
@@ -17,28 +17,28 @@
package org.apache.seatunnel.connectors.seatunnel.assertion.excecutor;
+import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
-import
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.format.json.JsonToRowConverters;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import java.io.IOException;
import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.format.DateTimeFormatter;
-import java.time.temporal.TemporalAccessor;
-import java.time.temporal.TemporalQueries;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -74,24 +74,28 @@ public class AssertExecutor {
Lists.newArrayList(rowType.getFieldNames()),
fieldName ->
fieldName.equals(assertFieldRule.getFieldName()));
+ SeaTunnelDataType<?> type = rowType.getFieldType(index);
Object value = rowData.getField(index);
- if (Objects.isNull(value)) {
- return Boolean.FALSE;
- }
+
Boolean typeChecked = checkType(value, assertFieldRule.getFieldType());
if (Boolean.FALSE.equals(typeChecked)) {
return Boolean.FALSE;
}
- Boolean valueChecked = checkValue(value,
assertFieldRule.getFieldRules());
+ Boolean valueChecked = checkValue(value, type,
assertFieldRule.getFieldRules());
if (Boolean.FALSE.equals(valueChecked)) {
return Boolean.FALSE;
}
return Boolean.TRUE;
}
- private Boolean checkValue(Object value, List<AssertFieldRule.AssertRule>
fieldValueRules) {
+ private Boolean checkValue(
+ Object value,
+ SeaTunnelDataType<?> type,
+ List<AssertFieldRule.AssertRule> fieldValueRules) {
Optional<AssertFieldRule.AssertRule> failValueRule =
- fieldValueRules.stream().filter(valueRule -> !pass(value,
valueRule)).findFirst();
+ fieldValueRules.stream()
+ .filter(valueRule -> !pass(value, type, valueRule))
+ .findFirst();
if (failValueRule.isPresent()) {
return Boolean.FALSE;
} else {
@@ -99,78 +103,256 @@ public class AssertExecutor {
}
}
- private boolean pass(Object value, AssertFieldRule.AssertRule valueRule) {
- if
(AssertFieldRule.AssertRuleType.NOT_NULL.equals(valueRule.getRuleType())) {
- return Objects.nonNull(value);
+ private boolean pass(
+ Object value, SeaTunnelDataType<?> type,
AssertFieldRule.AssertRule valueRule) {
+ AssertFieldRule.AssertRuleType ruleType = valueRule.getRuleType();
+ boolean isPass = true;
+ if (ruleType != null) {
+ isPass = checkAssertRule(value, type, valueRule);
}
- if (value instanceof Number
- &&
AssertFieldRule.AssertRuleType.MAX.equals(valueRule.getRuleType())) {
- return ((Number) value).doubleValue() <= valueRule.getRuleValue();
+ if (Objects.nonNull(value) && valueRule.getEqualTo() != null) {
+ isPass = isPass && compareValue(value, type, valueRule);
}
- if (value instanceof Number
- &&
AssertFieldRule.AssertRuleType.MIN.equals(valueRule.getRuleType())) {
- return ((Number) value).doubleValue() >= valueRule.getRuleValue();
+ return isPass;
+ }
+
+ private boolean checkAssertRule(
+ Object value, SeaTunnelDataType<?> type,
AssertFieldRule.AssertRule valueRule) {
+ switch (valueRule.getRuleType()) {
+ case NULL:
+ return Objects.isNull(value);
+ case NOT_NULL:
+ return Objects.nonNull(value);
+ case MAX:
+ {
+ if (Objects.isNull(value) || !(value instanceof Number)) {
+ return Boolean.FALSE;
+ }
+ return ((Number) value).doubleValue() <=
valueRule.getRuleValue();
+ }
+ case MIN:
+ {
+ if (Objects.isNull(value) || !(value instanceof Number)) {
+ return Boolean.FALSE;
+ }
+ return ((Number) value).doubleValue() >=
valueRule.getRuleValue();
+ }
+ case MAX_LENGTH:
+ {
+ String valueStr =
+ Objects.isNull(value) ? StringUtils.EMPTY :
String.valueOf(value);
+ return valueStr.length() <= valueRule.getRuleValue();
+ }
+ case MIN_LENGTH:
+ {
+ String valueStr =
+ Objects.isNull(value) ? StringUtils.EMPTY :
String.valueOf(value);
+ return valueStr.length() >= valueRule.getRuleValue();
+ }
+ default:
+ return false;
}
- if (valueRule.getEqualTo() != null) {
- return compareValue(value, valueRule);
+ }
+
+ private boolean compareValue(
+ Object value, SeaTunnelDataType<?> type,
AssertFieldRule.AssertRule valueRule) {
+ Object config = valueRule.getEqualTo();
+ String confJsonStr = JsonUtils.toJsonString(config);
+
+ JsonToRowConverters converters = new JsonToRowConverters(true, false);
+ JsonToRowConverters.JsonToRowConverter converter =
converters.createConverter(type);
+
+ Object confValue;
+ try {
+ confValue =
+
converter.convert(JsonUtils.stringToJsonNode(JsonUtils.toJsonString(config)));
+ } catch (IOException e) {
+ throw CommonError.jsonOperationError("Assert", confJsonStr, e);
}
- String valueStr = Objects.isNull(value) ? StringUtils.EMPTY :
String.valueOf(value);
- if
(AssertFieldRule.AssertRuleType.MAX_LENGTH.equals(valueRule.getRuleType())) {
- return valueStr.length() <= valueRule.getRuleValue();
+ return compareValue(value, type, confValue);
+ }
+
+ private boolean compareValue(Object value, SeaTunnelDataType<?> type,
Object confValue) {
+ switch (type.getSqlType()) {
+ case ROW:
+ {
+ return compareRowValue(
+ (SeaTunnelRow) value,
+ (SeaTunnelRowType) type,
+ (SeaTunnelRow) confValue);
+ }
+ case ARRAY:
+ {
+ return compareArrayValue(
+ (Object[]) value, (ArrayType<?, ?>) type,
(Object[]) confValue);
+ }
+ case MAP:
+ {
+ return compareMapValue(
+ (Map<?, ?>) value, (MapType<?, ?>) type, (Map<?,
?>) confValue);
+ }
+ case NULL:
+ return value == null && confValue == null;
+ case BYTES:
+ {
+ return Arrays.equals((byte[]) value, (byte[]) confValue);
+ }
+ case STRING:
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case INT:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case DECIMAL:
+ case TIME:
+ case TIMESTAMP:
+ case DATE:
+ default:
+ return value.equals(confValue);
}
+ }
- if
(AssertFieldRule.AssertRuleType.MIN_LENGTH.equals(valueRule.getRuleType())) {
- return valueStr.length() >= valueRule.getRuleValue();
+ private boolean compareRowValue(
+ SeaTunnelRow value, SeaTunnelRowType type, SeaTunnelRow confValue)
{
+ Object[] valFields = value.getFields();
+ Object[] confValFields = confValue.getFields();
+ if (valFields.length != confValFields.length) {
+ return false;
}
- return Boolean.TRUE;
+ for (int idx = 0; idx < confValFields.length; idx++) {
+ Object fieldVal = valFields[idx];
+ Object confField = confValFields[idx];
+ SeaTunnelDataType<?> fieldType = type.getFieldType(idx);
+ if (!compareValue(fieldVal, fieldType, confField)) {
+ return false;
+ }
+ }
+ return true;
}
- private boolean compareValue(Object value, AssertFieldRule.AssertRule
valueRule) {
- if (value instanceof String) {
- return value.equals(valueRule.getEqualTo());
- } else if (value instanceof Integer) {
- return value.equals(Integer.parseInt(valueRule.getEqualTo()));
- } else if (value instanceof Long) {
- return value.equals(Long.parseLong(valueRule.getEqualTo()));
- } else if (value instanceof Short) {
- return value.equals(Short.parseShort(valueRule.getEqualTo()));
- } else if (value instanceof Float) {
- return value.equals((Float.parseFloat(valueRule.getEqualTo())));
- } else if (value instanceof Byte) {
- return value.equals((Byte.parseByte(valueRule.getEqualTo())));
- } else if (value instanceof Double) {
- return value.equals(Double.parseDouble(valueRule.getEqualTo()));
- } else if (value instanceof BigDecimal) {
- return value.equals(new BigDecimal(valueRule.getEqualTo()));
- } else if (value instanceof Boolean) {
- return value.equals(Boolean.parseBoolean(valueRule.getEqualTo()));
- } else if (value instanceof LocalDateTime) {
- TemporalAccessor parsedTimestamp =
-
DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(valueRule.getEqualTo());
- LocalTime localTime =
parsedTimestamp.query(TemporalQueries.localTime());
- LocalDate localDate =
parsedTimestamp.query(TemporalQueries.localDate());
- return ((LocalDateTime) value).isEqual(LocalDateTime.of(localDate,
localTime));
- } else if (value instanceof LocalDate) {
- DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd");
- return ((LocalDate)
value).isEqual(LocalDate.parse(valueRule.getEqualTo(), fmt));
- } else if (value instanceof LocalTime) {
- DateTimeFormatter fmt = DateTimeFormatter.ofPattern("HH:mm:ss");
- return value.equals(LocalTime.parse(valueRule.getEqualTo(), fmt));
- } else {
- throw new AssertConnectorException(
- AssertConnectorErrorCode.TYPES_NOT_SUPPORTED_FAILED,
- String.format(" %s types not supported yet",
value.getClass().getSimpleName()));
+ private boolean compareArrayValue(Object[] value, ArrayType<?, ?> type,
Object[] confValue) {
+ if (value.length != confValue.length) {
+ return false;
+ }
+
+ SeaTunnelDataType<?> elementType = type.getElementType();
+ for (int idx = 0; idx < confValue.length; idx++) {
+ Object elementVal = value[idx];
+ Object confElement = confValue[idx];
+ if (!compareValue(elementVal, elementType, confElement)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean compareMapValue(Map<?, ?> value, MapType<?, ?> type,
Map<?, ?> confValue) {
+ if (value.size() != confValue.size()) {
+ return false;
+ }
+
+ if (value.isEmpty()) {
+ return true;
+ }
+
+ SeaTunnelDataType<?> valType = type.getValueType();
+ for (Map.Entry<?, ?> entry : confValue.entrySet()) {
+ Object confKey = entry.getKey();
+ Object confVal = entry.getValue();
+ if (!value.containsKey(confKey)) {
+ return false;
+ }
+
+ Object val = value.get(confKey);
+ if (!compareValue(val, valType, confVal)) {
+ return false;
+ }
}
+ return true;
}
private Boolean checkType(Object value, SeaTunnelDataType<?> fieldType) {
+ if (value == null) {
+ if (fieldType.getSqlType() == SqlType.NULL) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ if (fieldType.getSqlType() == SqlType.ROW) {
+ return checkRowType(value, (SeaTunnelRowType) fieldType);
+ }
+
+ if (fieldType.getSqlType() == SqlType.ARRAY) {
+ return checkArrayType(value, (ArrayType<?, ?>) fieldType);
+ }
+
+ if (fieldType.getSqlType() == SqlType.MAP) {
+ return checkMapType(value, (MapType) fieldType);
+ }
+
if (fieldType.getSqlType() == SqlType.DECIMAL) {
return checkDecimalType(value, fieldType);
}
+
return value.getClass().equals(fieldType.getTypeClass());
}
+ private boolean checkArrayType(Object value, ArrayType<?, ?> fieldType) {
+ if (!value.getClass().isArray()) {
+ return false;
+ }
+
+ Object[] val = (Object[]) value;
+ SeaTunnelDataType<?> elementType = fieldType.getElementType();
+
+ for (Object elementObj : val) {
+ if (!checkType(elementObj, elementType)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean checkMapType(Object value, MapType<?, ?> fieldType) {
+ if (!(value instanceof Map)) {
+ return false;
+ }
+
+ Map<?, ?> val = (Map<?, ?>) value;
+ SeaTunnelDataType<?> keyType = fieldType.getKeyType();
+ SeaTunnelDataType<?> valType = fieldType.getValueType();
+ for (Map.Entry<?, ?> entry : val.entrySet()) {
+ Object keyObj = entry.getKey();
+ Object valObj = entry.getValue();
+ if (!(checkType(keyObj, keyType) && checkType(valObj, valType))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean checkRowType(Object value, SeaTunnelRowType rowType) {
+ if (!(value instanceof SeaTunnelRow)) {
+ return false;
+ }
+
+ SeaTunnelRow row = (SeaTunnelRow) value;
+ Object[] fields = row.getFields();
+ for (int idx = 0; idx < fields.length; idx++) {
+ Object fieldVal = fields[idx];
+ SeaTunnelDataType<?> fieldType = rowType.getFieldType(idx);
+ if (!checkType(fieldVal, fieldType)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private static Boolean checkDecimalType(Object value, SeaTunnelDataType<?>
fieldType) {
if (!value.getClass().equals(fieldType.getTypeClass())) {
return false;
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertFieldRule.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertFieldRule.java
index 12d2e28bad..f3735c39f1 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertFieldRule.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertFieldRule.java
@@ -34,7 +34,7 @@ public class AssertFieldRule implements Serializable {
public static class AssertRule implements Serializable {
private AssertRuleType ruleType;
private Double ruleValue;
- private String equalTo;
+ private Object equalTo;
}
/**
@@ -42,6 +42,8 @@ public class AssertFieldRule implements Serializable {
* break the rule
*/
public enum AssertRuleType {
+ /** value can be null */
+ NULL,
/** value can't be null */
NOT_NULL,
/** minimum value of the data */
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
index 842dcabd90..3b1e3594b2 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
@@ -18,18 +18,15 @@
package org.apache.seatunnel.connectors.seatunnel.assertion.rule;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.EQUALS_TO;
@@ -39,11 +36,8 @@ import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertCon
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULE_TYPE;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULE_VALUE;
+@Slf4j
public class AssertRuleParser {
-
- private static final Pattern DECIMAL_TYPE_PATTERN =
- Pattern.compile("^decimal\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)$");
-
public List<AssertFieldRule.AssertRule> parseRowRules(List<? extends
Config> rowRuleList) {
return assembleFieldValueRules(rowRuleList);
@@ -58,9 +52,43 @@ public class AssertRuleParser {
.map(
config -> {
AssertFieldRule fieldRule = new AssertFieldRule();
+ String fieldName = config.getString(FIELD_NAME);
fieldRule.setFieldName(config.getString(FIELD_NAME));
if (config.hasPath(FIELD_TYPE)) {
-
fieldRule.setFieldType(getFieldType(config.getString(FIELD_TYPE)));
+ ConfigValue fieldTypeConf =
config.getValue(FIELD_TYPE);
+ switch (fieldTypeConf.valueType()) {
+ case STRING:
+ {
+ String basicTypeStr =
config.getString(FIELD_TYPE);
+ SeaTunnelDataType<?> fieldType =
+
SeaTunnelDataTypeConvertorUtil
+
.deserializeSeaTunnelDataType(
+ fieldName,
basicTypeStr);
+ fieldRule.setFieldType(fieldType);
+ }
+ ;
+ break;
+ case OBJECT:
+ {
+ ConfigObject rowTypeConf =
config.getObject(FIELD_TYPE);
+ SeaTunnelDataType<?> fieldType =
+
SeaTunnelDataTypeConvertorUtil
+
.deserializeSeaTunnelDataType(
+ fieldName,
+
rowTypeConf.render());
+ fieldRule.setFieldType(fieldType);
+ }
+ ;
+ break;
+ case BOOLEAN:
+ case NUMBER:
+ case LIST:
+ case NULL:
+ log.warn(
+ String.format(
+ "Assert Field Rule[%s]
doesn't support '%s' type value.",
+ FIELD_TYPE,
fieldTypeConf.valueType()));
+ }
}
if (config.hasPath(FIELD_VALUE)) {
@@ -88,39 +116,10 @@ public class AssertRuleParser {
valueRule.setRuleValue(config.getDouble(RULE_VALUE));
}
if (config.hasPath(EQUALS_TO)) {
-
valueRule.setEqualTo(config.getString(EQUALS_TO));
+
valueRule.setEqualTo(config.getValue(EQUALS_TO).unwrapped());
}
return valueRule;
})
.collect(Collectors.toList());
}
-
- private SeaTunnelDataType<?> getFieldType(String fieldTypeStr) {
- final String normalTypeStr = fieldTypeStr.trim().toLowerCase();
- Matcher matcher = DECIMAL_TYPE_PATTERN.matcher(normalTypeStr);
- if (matcher.find()) {
- int precision = Integer.parseInt(matcher.group(1));
- int scale = Integer.parseInt(matcher.group(2));
- return new DecimalType(precision, scale);
- }
- return TYPES.get(normalTypeStr);
- }
-
- private static final Map<String, SeaTunnelDataType<?>> TYPES =
Maps.newHashMap();
-
- static {
- TYPES.put("string", BasicType.STRING_TYPE);
- TYPES.put("boolean", BasicType.BOOLEAN_TYPE);
- TYPES.put("byte", BasicType.BYTE_TYPE);
- TYPES.put("short", BasicType.SHORT_TYPE);
- TYPES.put("int", BasicType.INT_TYPE);
- TYPES.put("long", BasicType.LONG_TYPE);
- TYPES.put("float", BasicType.FLOAT_TYPE);
- TYPES.put("double", BasicType.DOUBLE_TYPE);
- TYPES.put("void", BasicType.VOID_TYPE);
- TYPES.put("timestamp", LocalTimeType.LOCAL_DATE_TIME_TYPE);
- TYPES.put("datetime", LocalTimeType.LOCAL_DATE_TIME_TYPE);
- TYPES.put("date", LocalTimeType.LOCAL_DATE_TYPE);
- TYPES.put("time", LocalTimeType.LOCAL_TIME_TYPE);
- }
}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
index 48b31989c6..96b61bfaae 100644
---
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
+++
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
@@ -17,21 +17,39 @@
package org.apache.seatunnel.flink.assertion;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.assertion.excecutor.AssertExecutor;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.format.json.JsonToRowConverters;
import org.junit.jupiter.api.Test;
import com.google.common.collect.Lists;
+import java.io.IOException;
import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.Base64;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -118,48 +136,285 @@ public class AssertExecutorTest {
@Test
public void testDecimalTypeCheck() {
+ assertFieldRuleNotNull(new DecimalType(10, 2), new
BigDecimal("99999999.90"));
+ }
+
+ @Test
+ public void testDecimalTypeCheckError() {
List<AssertFieldRule> rules = Lists.newArrayList();
AssertFieldRule rule = new AssertFieldRule();
rule.setFieldName("c_mock");
- DecimalType assertFieldType = new DecimalType(10, 2);
+ DecimalType assertFieldType = new DecimalType(1, 0);
rule.setFieldType(assertFieldType);
AssertFieldRule.AssertRule valueRule = new
AssertFieldRule.AssertRule();
- valueRule.setEqualTo("99999999.90");
-
+ valueRule.setRuleType(AssertFieldRule.AssertRuleType.NOT_NULL);
rule.setFieldRules(Collections.singletonList(valueRule));
rules.add(rule);
- SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] {new
BigDecimal("99999999.90")});
+ SeaTunnelRow mockRow = new SeaTunnelRow(new Object[]
{BigDecimal.valueOf(99999999.99)});
SeaTunnelRowType mockType =
new SeaTunnelRowType(
new String[] {"c_mock"}, new SeaTunnelDataType[] {new
DecimalType(10, 2)});
AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType,
rules).orElse(null);
- assertNull(failRule);
+ assertNotNull(failRule);
+ assertEquals(assertFieldType, failRule.getFieldType());
+ assertEquals("c_mock", failRule.getFieldName());
}
@Test
- public void testDecimalTypeCheckError() {
+ public void testDecimalEqualsTo() {
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = \"999999.90\"
}").getValue("equals_to"),
+ new DecimalType(10, 2),
+ new BigDecimal("999999.90"));
+ }
+
+ @Test
+ public void testRowTypeCheck() {
+ SeaTunnelRowType assertFieldType =
+ new SeaTunnelRowType(
+ new String[] {"c_0"}, new SeaTunnelDataType[]
{BasicType.INT_TYPE});
+ assertFieldRuleNotNull(assertFieldType, new SeaTunnelRow(new Object[]
{0}));
+ }
+
+ @Test
+ public void testRowEqualsTo() {
+ SeaTunnelRowType assertFieldType =
+ new SeaTunnelRowType(
+ new String[] {"c_0", "c_1"},
+ new SeaTunnelDataType[] {BasicType.INT_TYPE,
BasicType.STRING_TYPE});
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = [0,
\"xx\"]}").getValue("equals_to"),
+ assertFieldType,
+ new SeaTunnelRow(new Object[] {0, "xx"}));
+ }
+
+ @Test
+ public void testNestRowEqualsTo() {
+ SeaTunnelRowType assertFieldType =
+ new SeaTunnelRowType(
+ new String[] {"c_0"},
+ new SeaTunnelDataType[] {
+ new SeaTunnelRowType(
+ new String[] {"c_0_0"},
+ new SeaTunnelDataType[]
{BasicType.INT_TYPE})
+ });
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to =
[[1]]}").getValue("equals_to"),
+ assertFieldType,
+ new SeaTunnelRow(new Object[] {new SeaTunnelRow(new Object[]
{1})}));
+ }
+
+ @Test
+ public void testArrayTypeCheck() {
+ assertFieldRuleNotNull(ArrayType.INT_ARRAY_TYPE, new Integer[] {0, 1,
2});
+ }
+
+ @Test
+ public void testArrayEqualsTo() {
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = [0, 1,
2]}").getValue("equals_to"),
+ ArrayType.INT_ARRAY_TYPE,
+ new Integer[] {0, 1, 2});
+ }
+
+ @Test
+ public void testMapTypeCheck() {
+ Map<String, String> map = new HashMap<>();
+ map.put("k0", "v0");
+ assertFieldRuleNotNull(
+ new MapType<String, String>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE), map);
+ }
+
+ @Test
+ public void testMapEqualsTo() {
+ Map<String, String> map = new HashMap<>();
+ map.put("k0", "v0");
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = { k0 = v0 }
}").getValue("equals_to"),
+ new MapType<String, String>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE),
+ map);
+ }
+
+ @Test
+ public void testNullTypeCheck() {
+ assertFieldRuleNull(BasicType.VOID_TYPE, null);
+ }
+
+ @Test
+ public void testStringEqualsTo() {
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = \"string\"
}").getValue("equals_to"),
+ BasicType.STRING_TYPE,
+ "string");
+ }
+
+ @Test
+ public void testBooleanEqualsTo() {
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = false
}").getValue("equals_to"),
+ BasicType.BOOLEAN_TYPE,
+ false);
+ }
+
+ @Test
+ public void testTinyIntEqualsTo() {
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = 1
}").getValue("equals_to"),
+ BasicType.BYTE_TYPE,
+ (byte) 1);
+ }
+
+ @Test
+ public void testSmallIntEqualsTo() {
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = 1
}").getValue("equals_to"),
+ BasicType.SHORT_TYPE,
+ (short) 1);
+ }
+
+ @Test
+ public void testIntEqualsTo() {
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = 333
}").getValue("equals_to"),
+ BasicType.INT_TYPE,
+ (int) 333);
+ }
+
+ @Test
+ public void testBigIntEqualsTo() {
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = 323232
}").getValue("equals_to"),
+ BasicType.LONG_TYPE,
+ (long) 323232L);
+ }
+
+ @Test
+ public void testFloatEqualsTo() {
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = 3.1
}").getValue("equals_to"),
+ BasicType.FLOAT_TYPE,
+ (float) 3.1);
+ }
+
+ @Test
+ public void testDoubleEqualsTo() {
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = 19.33333
}").getValue("equals_to"),
+ BasicType.DOUBLE_TYPE,
+ (double) 19.33333);
+ }
+
+ @Test
+ public void testBytesEqualsTo() throws IOException {
+ byte[] bytes = "010101".getBytes();
+ String base64Str =
Base64.getEncoder().encodeToString("010101".getBytes());
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = \"" + base64Str + "\"
}")
+ .getValue("equals_to"),
+ PrimitiveByteArrayType.INSTANCE,
+ (byte[]) bytes);
+ }
+
+ @Test
+ public void testDateEqualsTo() throws IOException {
+ String dateStr = "2024-01-24";
+ LocalDate date =
+
DateTimeFormatter.ISO_LOCAL_DATE.parse(dateStr).query(TemporalQueries.localDate());
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = \"" + dateStr + "\" }")
+ .getValue("equals_to"),
+ LocalTimeType.LOCAL_DATE_TYPE,
+ (LocalDate) date);
+ }
+
+ @Test
+ public void testTimeEqualsTo() throws IOException {
+ String timeStr = "12:11:34";
+ LocalTime time =
+
JsonToRowConverters.TIME_FORMAT.parse(timeStr).query(TemporalQueries.localTime());
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = \"" + timeStr + "\" }")
+ .getValue("equals_to"),
+ LocalTimeType.LOCAL_TIME_TYPE,
+ (LocalTime) time);
+ }
+
+ @Test
+ public void testTimestampEqualsTo() throws IOException {
+ String timestampStr = "2024-01-24T12:11:34.123";
+ TemporalAccessor parsedTimestamp =
+ DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(timestampStr);
+ LocalTime time = parsedTimestamp.query(TemporalQueries.localTime());
+ LocalDate date = parsedTimestamp.query(TemporalQueries.localDate());
+ LocalDateTime timestamp = LocalDateTime.of(date, time);
+ assertFieldRuleEqualsTo(
+ ConfigFactory.parseString("{equals_to = \"" + timestampStr +
"\" }")
+ .getValue("equals_to"),
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ (LocalDateTime) timestamp);
+ }
+
+ private void assertFieldRuleNotNull(SeaTunnelDataType<?> type, Object
value) {
+ assertFieldRuleMayNull(type, value, false);
+ }
+
+ private void assertFieldRuleNull(SeaTunnelDataType<?> type, Object value) {
+ assertFieldRuleMayNull(type, value, true);
+ }
+
+ private void assertFieldRuleMayNull(SeaTunnelDataType<?> type, Object
value, boolean isNull) {
List<AssertFieldRule> rules = Lists.newArrayList();
AssertFieldRule rule = new AssertFieldRule();
rule.setFieldName("c_mock");
- DecimalType assertFieldType = new DecimalType(1, 0);
- rule.setFieldType(assertFieldType);
+ rule.setFieldType(type);
AssertFieldRule.AssertRule valueRule = new
AssertFieldRule.AssertRule();
- valueRule.setRuleType(AssertFieldRule.AssertRuleType.NOT_NULL);
+ valueRule.setRuleType(
+ isNull
+ ? AssertFieldRule.AssertRuleType.NULL
+ : AssertFieldRule.AssertRuleType.NOT_NULL);
+
rule.setFieldRules(Collections.singletonList(valueRule));
rules.add(rule);
- SeaTunnelRow mockRow = new SeaTunnelRow(new Object[]
{BigDecimal.valueOf(99999999.99)});
+ SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] {value});
SeaTunnelRowType mockType =
- new SeaTunnelRowType(
- new String[] {"c_mock"}, new SeaTunnelDataType[] {new
DecimalType(10, 2)});
+ new SeaTunnelRowType(new String[] {"c_mock"}, new
SeaTunnelDataType[] {type});
AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType,
rules).orElse(null);
- assertNotNull(failRule);
- assertEquals(assertFieldType, failRule.getFieldType());
- assertEquals("c_mock", failRule.getFieldName());
+ assertNull(failRule);
+ }
+
+ private void assertFieldRuleEqualsTo(
+ ConfigValue equalsTo, SeaTunnelDataType<?> type, Object expected) {
+ assertFieldRuleEqualsTo(equalsTo, type, expected, true);
+ }
+
+ private void assertFieldRuleEqualsTo(
+ ConfigValue equalsTo, SeaTunnelDataType<?> type, Object expected,
boolean isEqualsTo) {
+ List<AssertFieldRule> rules = Lists.newArrayList();
+ AssertFieldRule rule = new AssertFieldRule();
+ rule.setFieldName("c_mock");
+ rule.setFieldType(type);
+
+ AssertFieldRule.AssertRule valueRule = new
AssertFieldRule.AssertRule();
+ valueRule.setEqualTo(equalsTo.unwrapped());
+
+ rule.setFieldRules(Collections.singletonList(valueRule));
+ rules.add(rule);
+
+ SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] {expected});
+ SeaTunnelRowType mockType =
+ new SeaTunnelRowType(new String[] {"c_mock"}, new
SeaTunnelDataType[] {type});
+
+ AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType,
rules).orElse(null);
+ if (isEqualsTo) {
+ assertNull(failRule);
+ } else {
+ assertNotNull(failRule);
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
index 494c3c4496..b4ebe7bb9e 100644
---
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
+++
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
@@ -22,11 +22,14 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertRuleParser;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -38,7 +41,7 @@ public class AssertRuleParserTest {
public void testParseRules() {
List<? extends Config> ruleConfigList = assembleConfig();
List<AssertFieldRule> assertFieldRules =
parser.parseRules(ruleConfigList);
- assertEquals(3, assertFieldRules.size());
+ assertEquals(4, assertFieldRules.size());
AssertFieldRule nameRule = assertFieldRules.get(0);
List<AssertFieldRule.AssertRule> nameValueRules =
nameRule.getFieldRules();
@@ -70,7 +73,28 @@ public class AssertRuleParserTest {
assertEquals(2, decimalValueRules.size());
assertEquals(
AssertFieldRule.AssertRuleType.NOT_NULL,
decimalValueRules.get(0).getRuleType());
- assertEquals("12.12", decimalValueRules.get(1).getEqualTo());
+ assertEquals("12.12", (String) decimalValueRules.get(1).getEqualTo());
+
+ AssertFieldRule rowRule = assertFieldRules.get(3);
+ List<AssertFieldRule.AssertRule> rowValueRules =
rowRule.getFieldRules();
+ SeaTunnelRowType expectedRowType =
+ new SeaTunnelRowType(
+ new String[] {"c_0"},
+ new SeaTunnelDataType[] {
+ new SeaTunnelRowType(
+ new String[] {"c_0_0"},
+ new SeaTunnelDataType[]
{BasicType.INT_TYPE})
+ });
+ assertEquals("c_row", rowRule.getFieldName());
+ assertEquals(expectedRowType, rowRule.getFieldType());
+ assertEquals(2, rowValueRules.size());
+ assertEquals(AssertFieldRule.AssertRuleType.NOT_NULL,
rowValueRules.get(0).getRuleType());
+
+ final List<List<?>> cRow = (List<List<?>>)
rowValueRules.get(1).getEqualTo();
+ assertEquals(1, cRow.size());
+ assertEquals(ArrayList.class, cRow.get(0).getClass());
+ assertEquals(1, ((List) cRow.get(0)).size());
+ assertEquals(1, ((Integer) ((List) cRow.get(0)).get(0)));
}
private List<? extends Config> assembleConfig() {
@@ -119,6 +143,17 @@ public class AssertRuleParserTest {
+ " equals_to = \"12.12\"\n"
+ " }\n"
+ " ]\n"
+ + " },{\n"
+ + " field_name = c_row\n"
+ + " field_type= {c_0 = {c_0_0=int}}\n"
+ + " field_value = [\n"
+ + " {\n"
+ + " rule_type = NOT_NULL\n"
+ + " },\n"
+ + " {\n"
+ + " equals_to = [[1]]\n"
+ + " }\n"
+ + " ]\n"
+ " }\n"
+ " ]\n"
+ " \n"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
index fbdf6733a9..277d048a6d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
@@ -18,7 +18,9 @@
package org.apache.seatunnel.e2e.connector.assertion;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
@@ -35,4 +37,24 @@ public class FakeSourceToAssertIT extends TestSuiteBase {
container.executeJob("/assertion/fakesource_to_assert.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
+
+ @TestTemplate
+ public void testFakeSourceToAssertRowSink(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/assertion/fake_row_to_assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.FLINK},
+ disabledReason = "Currently FLINK engine unsupported NULL type")
+ public void testFakeFullTypesToAssertSink(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/assertion/fake_full_types_to_assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fake_full_types_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fake_full_types_to_assert.conf
new file mode 100644
index 0000000000..96cf30306e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fake_full_types_to_assert.conf
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = BATCH
+ # checkpoint.interval = 10000
+}
+
+source {
+ FakeSource {
+ row.num = 1
+ schema = {
+ fields {
+ c_null = "null"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_timestamp = timestamp
+ c_time = time
+ c_bytes = bytes
+ c_array = "array<int>"
+ c_map = "map<date, string>"
+ c_map_nest = "map<string, {c_int = int, c_string = string}>"
+ c_row = {
+ c_null = "null"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_timestamp = timestamp
+ c_time = time
+ c_bytes = bytes
+ c_array = "array<int>"
+ c_map = "map<string, string>"
+ }
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [
+ null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999,
"2012-12-21", "2012-12-21T12:34:56", "12:34:56",
+ "bWlJWmo=",
+ [0, 1, 2],
+ { "2024-01-26" = v0 },
+ { k1 = [123, "BBB-BB"]},
+ [
+ null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333,
99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
+ "bWlJWmo=",
+ [0, 1, 2],
+ { k0 = v0 }
+ ]
+ ]
+ }
+ ]
+ result_table_name = "fake"
+ }
+}
+
+sink{
+ Assert {
+ source_table_name = "fake"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_null
+ field_type = "null"
+ field_value = [
+ {
+ rule_type = NULL
+ }
+ ]
+ },
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "AAA"
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = false
+ }
+ ]
+ },
+ {
+ field_name = c_tinyint
+ field_type = tinyint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 1
+ }
+ ]
+ },
+ {
+ field_name = c_smallint
+ field_type = smallint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 1
+ }
+ ]
+ },
+ {
+ field_name = c_int
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 333
+ }
+ ]
+ },
+ {
+ field_name = c_bigint
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 323232
+ }
+ ]
+ },
+ {
+ field_name = c_float
+ field_type = float
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 3.1
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 9.33333
+ }
+ ]
+ },
+ {
+ field_name = c_decimal
+ field_type = "decimal(30, 8)"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 99999.99999999
+ }
+ ]
+ },
+ {
+ field_name = c_date
+ field_type = date
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "2012-12-21"
+ }
+ ]
+ },
+ {
+ field_name = c_timestamp
+ field_type = timestamp
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "2012-12-21T12:34:56"
+ }
+ ]
+ },
+ {
+ field_name = c_time
+ field_type = time
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "12:34:56"
+ }
+ ]
+ },
+ {
+ field_name = c_bytes
+ field_type = bytes
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "bWlJWmo="
+ }
+ ]
+ },
+ {
+ field_name = c_array
+ field_type = "array<int>"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = [0, 1, 2]
+ }
+ ]
+ },
+ {
+ field_name = c_map
+ field_type = "map<date, string>"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = { "2024-01-26" = v0 }
+ }
+ ]
+ },
+ {
+ field_name = c_map_nest
+ field_type = "map<string, {c_int = int, c_string = string}>"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = { k1 = [123, "BBB-BB"] }
+ }
+ ]
+ },
+ {
+ field_name = c_row
+ field_type = {
+ c_null = "null"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_timestamp = timestamp
+ c_time = time
+ c_bytes = bytes
+ c_array = "array<int>"
+ c_map = "map<string, string>"
+ }
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = [
+ null, "AAA", false, 1, 1, 333, 323232, 3.1,
9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
+ "bWlJWmo=",
+ [0, 1, 2],
+ { k0 = v0 }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fake_row_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fake_row_to_assert.conf
new file mode 100644
index 0000000000..731724a417
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fake_row_to_assert.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = BATCH
+ # checkpoint.interval = 10000
+}
+
+source {
+ FakeSource {
+ row.num = 1
+ schema = {
+ fields {
+ c_array = "array<int>"
+ c_map = "map<string, string>"
+ c_row = {
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [[0, 1, 2], { k0 = v0 }, ["AAA", false, 1, 1, 333, 323232,
3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56"]]
+ }
+ ]
+ result_table_name = "fake"
+ }
+}
+
+sink{
+ Assert {
+ source_table_name = "fake"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_array
+ field_type = "array<int>"
+ field_value = [
+ {
+ equals_to = [0, 1, 2]
+ }
+ ]
+ },
+ {
+ field_name = c_map
+ field_type = "map<string, string>"
+ field_value = [
+ {
+ equals_to = { k0 = v0 }
+ }
+ ]
+ },
+ {
+ field_name = c_row
+ field_type = {
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_date = date
+ c_timestamp = timestamp
+ }
+ field_value = [
+ {
+ equals_to = ["AAA", false, 1, 1, 333, 323232, 3.1, 9.33333,
99999.99999999, "2012-12-21", "2012-12-21T12:34:56"]
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_range.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_range.conf
index 49fd9807df..58f30a56cc 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_range.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_range.conf
@@ -79,7 +79,7 @@ sink {
},
{
field_name = c_tinyint
- field_type = byte
+ field_type = tinyint
field_value = [
{
rule_type = MIN
@@ -93,7 +93,7 @@ sink {
},
{
field_name = c_smallint
- field_type = short
+ field_type = smallint
field_value = [
{
rule_type = MIN
@@ -121,7 +121,7 @@ sink {
},
{
field_name = c_bigint
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = MIN
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_template.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_template.conf
index 8a034f6094..9a1113b4b8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_template.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_template.conf
@@ -80,7 +80,7 @@ sink {
},
{
field_name = c_tinyint
- field_type = byte
+ field_type = tinyint
field_value = [
{
rule_type = MIN
@@ -94,7 +94,7 @@ sink {
},
{
field_name = c_smallint
- field_type = short
+ field_type = smallint
field_value = [
{
rule_type = MIN
@@ -122,7 +122,7 @@ sink {
},
{
field_name = c_bigint
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = MIN
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf
index cfc734e6bf..e03ff0e52c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf
@@ -119,7 +119,7 @@ sink {
},
{
field_name = c_smallint
- field_type = short
+ field_type = smallint
field_value = [
{
equals_to = 15920
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf
index a70a51ee87..a4d0f4468e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf
@@ -118,7 +118,7 @@ sink {
},
{
field_name = c_smallint
- field_type = short
+ field_type = smallint
field_value = [
{
equals_to = 13846
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_file_json_lzo_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_file_json_lzo_to_console.conf
index a17fd8679b..05d50539c3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_file_json_lzo_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_file_json_lzo_to_console.conf
@@ -121,7 +121,7 @@ sink {
},
{
field_name = c_smallint
- field_type = short
+ field_type = smallint
field_value = [
{
equals_to = 15920
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/text/oss_file_text_lzo_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/text/oss_file_text_lzo_to_assert.conf
index 7417e74d27..9709dd15e2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/text/oss_file_text_lzo_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/text/oss_file_text_lzo_to_assert.conf
@@ -120,7 +120,7 @@ sink {
},
{
field_name = c_smallint
- field_type = short
+ field_type = smallint
field_value = [
{
equals_to = 13846
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
index 969e85e123..58cf96fe38 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -64,7 +64,7 @@ sink {
field_rules = [
{
field_name = f1
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = NOT_NULL
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
index 97637f9a04..35e27e7718 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -67,7 +67,7 @@ sink {
field_rules = [
{
field_name = f1
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = NOT_NULL
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
index 0f2b909c12..b8dd877997 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
@@ -65,17 +65,17 @@ sink{
},
{
field_name = hive_e2e_source_table.bigint_column
- field_type = long
+ field_type = bigint
field_value = [{equals_to = 1234567890}]
},
{
field_name = hive_e2e_source_table.smallint_column
- field_type = short
+ field_type = smallint
field_value = [{equals_to = 32767}]
},
{
field_name = hive_e2e_source_table.tinyint_column
- field_type = byte
+ field_type = tinyint
field_value = [{equals_to = 127}]
},
{
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
index 05c64e0ebe..31fe77a3e2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
@@ -65,7 +65,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = NOT_NULL
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
index 58b3474065..f9a41e7987 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
@@ -64,7 +64,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = NOT_NULL
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
index 16251c344f..b5d63d204e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
@@ -55,7 +55,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
index 11b7b35042..b6db50989a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
@@ -74,7 +74,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = NOT_NULL
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
index 3419175228..45b29d1915 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
@@ -74,7 +74,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = NOT_NULL
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
index 57129b040c..dabbbdb7db 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
@@ -55,7 +55,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
index 100de6d2b6..46a8c5f0dc 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
@@ -53,7 +53,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
index cb898a759d..39c16a2a3b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
@@ -57,7 +57,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
index 26ee02d614..bab9f197c0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
@@ -54,7 +54,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
index 4d4d8536c7..36f01c0337 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
@@ -66,7 +66,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = NOT_NULL
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
index d757a36cb5..d7f875272b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
@@ -127,7 +127,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = NOT_NULL
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
index 0e1aa6a949..beff2cfac6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
@@ -103,7 +103,7 @@ sink {
},
{
field_name = c_tinyint
- field_type = byte
+ field_type = tinyint
field_value = [
{
rule_type = NOT_NULL
@@ -112,7 +112,7 @@ sink {
},
{
field_name = c_smallint
- field_type = short
+ field_type = smallint
field_value = [
{
rule_type = NOT_NULL
@@ -130,7 +130,7 @@ sink {
},
{
field_name = c_bigint
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = NOT_NULL
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
index b52dd20eb4..9ba89caf86 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
@@ -69,7 +69,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = NOT_NULL
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
index b60de4284f..09ab946964 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
@@ -70,7 +70,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = NOT_NULL
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
index d047e310ba..b7c9aff4f6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
@@ -48,7 +48,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = MIN
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
index b631e243b1..eabf53242b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
@@ -47,7 +47,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
index fac6fe305c..d9a6a51265 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
@@ -48,7 +48,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = MIN
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
index 833ed1621d..cee9f1e273 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
@@ -52,7 +52,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
rule_type = MIN
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
index d3ce2a72df..0de010a26c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
@@ -50,7 +50,7 @@ sink {
field_rules = [
{
field_name = id
- field_type = long
+ field_type = bigint
field_value = [
{
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_datetime.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_datetime.conf
index a91a9eff74..042b66ad25 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_datetime.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_datetime.conf
@@ -123,49 +123,49 @@ sink {
},
{
field_name = "test"
- field_type = "long"
+ field_type = bigint
field_value = [
{equals_to = 283}
]
},
{
field_name = "c2_1"
- field_type = "long"
+ field_type = bigint
field_value = [
{equals_to = 283}
]
},
{
field_name = "c2_2"
- field_type = "long"
+ field_type = bigint
field_value = [
{equals_to = 0}
]
},
{
field_name = "c2_3"
- field_type = "long"
+ field_type = bigint
field_value = [
{equals_to = 9}
]
},
{
field_name = "c2_4"
- field_type = "long"
+ field_type = bigint
field_value = [
{equals_to = 6791}
]
},
{
field_name = "c2_5"
- field_type = "long"
+ field_type = bigint
field_value = [
{equals_to = 407460}
]
},
{
field_name = "c2_6"
- field_type = "long"
+ field_type = bigint
field_value = [
{equals_to = 24447611}
]
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
index 2cf38bb4d3..ebeea3659c 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
@@ -79,21 +79,21 @@ sink {
},
{
field_name = "c4_1"
- field_type = "long"
+ field_type = bigint
field_value = [
{equals_to = 64}
]
},
{
field_name = "c4_2"
- field_type = "long"
+ field_type = bigint
field_value = [
{equals_to = 8}
]
},
{
field_name = "c4_3"
- field_type = "long"
+ field_type = bigint
field_value = [
{equals_to = 8}
]
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
index 648b2c98af..dd91ac5a87 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import java.io.IOException;
@@ -351,6 +352,7 @@ public class JsonToRowConverters implements Serializable {
}
private JsonToRowConverter createMapConverter(MapType<?, ?> type) {
+ JsonToRowConverter keyConverter = createConverter(type.getKeyType());
JsonToRowConverter valueConverter =
createConverter(type.getValueType());
return new JsonToRowConverter() {
@Override
@@ -361,8 +363,17 @@ public class JsonToRowConverters implements Serializable {
new Consumer<Map.Entry<String, JsonNode>>() {
@Override
public void accept(Map.Entry<String,
JsonNode> entry) {
+ JsonNode keyNode;
+ try {
+ keyNode =
+ JsonUtils.stringToJsonNode(
+
JsonUtils.toJsonString(entry.getKey()));
+ } catch (Exception e) {
+ throw
CommonError.jsonOperationError(
+ FORMAT, entry.getKey(), e);
+ }
value.put(
- entry.getKey(),
+ keyConverter.convert(keyNode),
valueConverter.convert(entry.getValue()));
}
});
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
index 109f8b496a..7a81625811 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
@@ -18,10 +18,13 @@
package org.apache.seatunnel.format.json;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -29,23 +32,30 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.junit.jupiter.api.Test;
+import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalQueries;
import java.util.HashMap;
import java.util.Map;
import static org.apache.seatunnel.api.table.type.ArrayType.INT_ARRAY_TYPE;
import static org.apache.seatunnel.api.table.type.ArrayType.STRING_ARRAY_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.SHORT_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -391,4 +401,84 @@ public class JsonRowDataSerDeSchemaTest {
assertEquals(actual.getMessage(), expected.getMessage());
}
+
+ @Test
+ public void testMapConverterKeyType() throws JsonProcessingException {
+ MapType<String, String> stringKeyMapType = new MapType<>(STRING_TYPE,
STRING_TYPE);
+ MapType<Boolean, String> booleanKeyMapType = new
MapType<>(BOOLEAN_TYPE, STRING_TYPE);
+ MapType<Byte, String> tinyintKeyMapType = new MapType<>(BYTE_TYPE,
STRING_TYPE);
+ MapType<Short, String> smallintKeyMapType = new MapType<>(SHORT_TYPE,
STRING_TYPE);
+ MapType<Integer, String> intKeyMapType = new MapType<>(INT_TYPE,
STRING_TYPE);
+ MapType<Long, String> bigintKeyMapType = new MapType<>(LONG_TYPE,
STRING_TYPE);
+ MapType<Float, String> floatKeyMapType = new MapType<>(FLOAT_TYPE,
STRING_TYPE);
+ MapType<Double, String> doubleKeyMapType = new MapType<>(DOUBLE_TYPE,
STRING_TYPE);
+ MapType<LocalDate, String> dateKeyMapType =
+ new MapType<>(LocalTimeType.LOCAL_DATE_TYPE, STRING_TYPE);
+ MapType<LocalTime, String> timeKeyMapType =
+ new MapType<>(LocalTimeType.LOCAL_TIME_TYPE, STRING_TYPE);
+ MapType<LocalDateTime, String> timestampKeyMapType =
+ new MapType<>(LocalTimeType.LOCAL_DATE_TIME_TYPE, STRING_TYPE);
+ MapType<BigDecimal, String> decimalKeyMapType =
+ new MapType<>(new DecimalType(10, 2), STRING_TYPE);
+
+ JsonToRowConverters converters = new JsonToRowConverters(true, false);
+
+ JsonToRowConverters.JsonToRowConverter stringConverter =
+ converters.createConverter(stringKeyMapType);
+ JsonToRowConverters.JsonToRowConverter booleanConverter =
+ converters.createConverter(booleanKeyMapType);
+ JsonToRowConverters.JsonToRowConverter tinyintConverter =
+ converters.createConverter(tinyintKeyMapType);
+ JsonToRowConverters.JsonToRowConverter smallintConverter =
+ converters.createConverter(smallintKeyMapType);
+ JsonToRowConverters.JsonToRowConverter intConverter =
+ converters.createConverter(intKeyMapType);
+ JsonToRowConverters.JsonToRowConverter bigintConverter =
+ converters.createConverter(bigintKeyMapType);
+ JsonToRowConverters.JsonToRowConverter floatConverter =
+ converters.createConverter(floatKeyMapType);
+ JsonToRowConverters.JsonToRowConverter doubleConverter =
+ converters.createConverter(doubleKeyMapType);
+ JsonToRowConverters.JsonToRowConverter dateConverter =
+ converters.createConverter(dateKeyMapType);
+ JsonToRowConverters.JsonToRowConverter timeConverter =
+ converters.createConverter(timeKeyMapType);
+ JsonToRowConverters.JsonToRowConverter timestampConverter =
+ converters.createConverter(timestampKeyMapType);
+ JsonToRowConverters.JsonToRowConverter decimalConverter =
+ converters.createConverter(decimalKeyMapType);
+
+ assertMapKeyType("{\"abc\": \"xxx\"}", stringConverter, "abc");
+ assertMapKeyType("{\"false\": \"xxx\"}", booleanConverter, false);
+ assertMapKeyType("{\"1\": \"xxx\"}", tinyintConverter, (byte) 1);
+ assertMapKeyType("{\"12\": \"xxx\"}", smallintConverter, (short) 12);
+ assertMapKeyType("{\"123\": \"xxx\"}", intConverter, 123);
+ assertMapKeyType("{\"12345\": \"xxx\"}", bigintConverter, 12345L);
+ assertMapKeyType("{\"1.0001\": \"xxx\"}", floatConverter, 1.0001f);
+ assertMapKeyType("{\"999.9999\": \"xxx\"}", doubleConverter, 999.9999);
+ assertMapKeyType("{\"9999.23\": \"xxx\"}", decimalConverter,
BigDecimal.valueOf(9999.23));
+
+ LocalDate date =
+ DateTimeFormatter.ISO_LOCAL_DATE
+ .parse("2024-01-26")
+ .query(TemporalQueries.localDate());
+ assertMapKeyType("{\"2024-01-26\": \"xxx\"}", dateConverter, date);
+
+ LocalTime time =
+ JsonToRowConverters.TIME_FORMAT
+ .parse("12:00:12.001")
+ .query(TemporalQueries.localTime());
+ assertMapKeyType("{\"12:00:12.001\": \"xxx\"}", timeConverter, time);
+
+ LocalDateTime timestamp = LocalDateTime.of(date, time);
+ assertMapKeyType("{\"2024-01-26T12:00:12.001\": \"xxx\"}",
timestampConverter, timestamp);
+ }
+
+ private void assertMapKeyType(
+ String payload, JsonToRowConverters.JsonToRowConverter converter,
Object expect)
+ throws JsonProcessingException {
+ JsonNode keyMapNode = JsonUtils.stringToJsonNode(payload);
+ Map<?, ?> keyMap = (Map<?, ?>) converter.convert(keyMapNode);
+ assertEquals(expect, keyMap.keySet().iterator().next());
+ }
}