This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 576919bfab [Feature][Connector-V2][Assert] Support field type assert 
and field value equality assert for full data types (#6275)
576919bfab is described below

commit 576919bfab21a8788fdad54b0ff3a024268005db
Author: Chengyu Yan <[email protected]>
AuthorDate: Tue Jan 30 16:01:47 2024 +0800

    [Feature][Connector-V2][Assert] Support field type assert and field value 
equality assert for full data types (#6275)
    
    - Support null, row, array, map type check
      - Support array, map, row value equal check
      - Add a assert rule that value can be null
      - Support declare data types and data values for FakeSource and 
AssertSink in the consistency way
      - Fix JsonDeserializationSchema never convert key of map.
---
 docs/en/concept/schema-feature.md                  |   4 +-
 docs/en/connector-v2/sink/Assert.md                | 370 +++++++++++++++++++--
 docs/en/connector-v2/source/FakeSource.md          |   5 +-
 release-note.md                                    |   1 +
 .../catalog/SeaTunnelDataTypeConvertorUtil.java    |  29 +-
 .../SeaTunnelDataTypeConvertorUtilTest.java        |  77 +++++
 seatunnel-connectors-v2/connector-assert/pom.xml   |   5 +
 .../assertion/excecutor/AssertExecutor.java        | 314 +++++++++++++----
 .../seatunnel/assertion/rule/AssertFieldRule.java  |   4 +-
 .../seatunnel/assertion/rule/AssertRuleParser.java |  83 +++--
 .../flink/assertion/AssertExecutorTest.java        | 285 +++++++++++++++-
 .../flink/assertion/rule/AssertRuleParserTest.java |  39 ++-
 .../connector/assertion/FakeSourceToAssertIT.java  |  22 ++
 .../assertion/fake_full_types_to_assert.conf       | 308 +++++++++++++++++
 .../resources/assertion/fake_row_to_assert.conf    | 114 +++++++
 .../test/resources/fake_to_assert_with_range.conf  |   6 +-
 .../resources/fake_to_assert_with_template.conf    |   6 +-
 .../json/local_file_json_lzo_to_console.conf       |   2 +-
 .../text/local_file_text_lzo_to_assert.conf        |   2 +-
 .../json/oss_file_json_lzo_to_console.conf         |   2 +-
 .../text/oss_file_text_lzo_to_assert.conf          |   2 +-
 .../src/test/resources/iceberg/iceberg_source.conf |   2 +-
 .../src/test/resources/iceberg/iceberg_source.conf |   2 +-
 .../resources/jdbc_hive_source_and_assert.conf     |   6 +-
 .../test/resources/avro/kafka_avro_to_assert.conf  |   2 +-
 .../jsonFormatIT/kafka_source_json_to_console.conf |   2 +-
 .../kafka/kafkasource_earliest_to_console.conf     |   2 +-
 ...ce_format_error_handle_way_fail_to_console.conf |   2 +-
 ...ce_format_error_handle_way_skip_to_console.conf |   2 +-
 .../kafka/kafkasource_group_offset_to_console.conf |   2 +-
 .../kafka/kafkasource_latest_to_console.conf       |   2 +-
 .../kafkasource_specific_offsets_to_console.conf   |   2 +-
 .../kafka/kafkasource_timestamp_to_console.conf    |   2 +-
 .../textFormatIT/kafka_source_text_to_console.conf |   2 +-
 ...ource_text_to_console_assert_catalog_table.conf |   2 +-
 .../test/resources/batch_pulsar_to_console.conf    |   6 +-
 .../resources/rocketmq-source_json_to_console.conf |   2 +-
 .../resources/rocketmq-source_text_to_console.conf |   2 +-
 .../rocketmq_source_earliest_to_console.conf       |   2 +-
 .../rocketmq_source_group_offset_to_console.conf   |   2 +-
 .../rocketmq_source_latest_to_console.conf         |   2 +-
 ...ocketmq_source_specific_offsets_to_console.conf |   2 +-
 .../rocketmq_source_timestamp_to_console.conf      |   2 +-
 .../resources/sql_transform/func_datetime.conf     |  14 +-
 .../test/resources/sql_transform/func_string.conf  |   6 +-
 .../seatunnel/format/json/JsonToRowConverters.java |  13 +-
 .../format/json/JsonRowDataSerDeSchemaTest.java    |  90 +++++
 47 files changed, 1644 insertions(+), 211 deletions(-)

diff --git a/docs/en/concept/schema-feature.md 
b/docs/en/concept/schema-feature.md
index 04463c2563..15f8186cce 100644
--- a/docs/en/concept/schema-feature.md
+++ b/docs/en/concept/schema-feature.md
@@ -91,7 +91,9 @@ columns = [
 
 #### How to declare type supported
 
-SeaTunnel provides a simple and direct way to declare basic types. The keyword 
names for basic types can be used directly as type declarations, and SeaTunnel 
is case-insensitive to type keywords. Basic type keywords include `string`, 
`boolean`, `tinyint`, `smallint`, `int`, `bigint`, `float`, `double`, `date`, 
`time`, `timestamp`, and `null`. For example, if you need to declare a field 
with integer type, you can simply define the field as `int` or `"int"`.
+SeaTunnel provides a simple and direct way to declare basic types. Basic type 
keywords include `string`, `boolean`, `tinyint`, `smallint`, `int`, `bigint`, 
`float`, `double`, `date`, `time`, `timestamp`, and `null`. The keyword names 
for basic types can be used directly as type declarations, and SeaTunnel is 
case-insensitive to type keywords. For example, if you need to declare a field 
with integer type, you can simply define the field as `int` or `"int"`.
+
+> The null type declaration must be enclosed in double quotes, like `"null"`. 
This approach helps avoid confusion with 
[HOCON](https://github.com/lightbend/config/blob/main/HOCON.md)'s `null` type 
which represents undefined object.
 
 When declaring complex types (such as **decimal**, **array**, **map**, and 
**row**), pay attention to specific considerations.
 - When declaring a decimal type, precision and scale settings are required, 
and the type definition follows the format `decimal(precision, scale)`. It's 
essential to emphasize that the declaration of the decimal type must be 
enclosed in `"`; you cannot use the type name directly, as with basic types. 
For example, when declaring a decimal field with precision 10 and scale 2, you 
specify the field type as `"decimal(10,2)"`.
diff --git a/docs/en/connector-v2/sink/Assert.md 
b/docs/en/connector-v2/sink/Assert.md
index 8257ff8f65..e02d0fc6b9 100644
--- a/docs/en/connector-v2/sink/Assert.md
+++ b/docs/en/connector-v2/sink/Assert.md
@@ -12,37 +12,38 @@ A flink sink plugin which can assert illegal data by user 
defined rules
 
 ## Options
 
-|                                              Name                            
                  |    Type    | Required | Default |
-|------------------------------------------------------------------------------------------------|------------|----------|---------|
-| rules                                                                        
                  | ConfigMap  | yes      | -       |
-| rules.field_rules                                                            
                  | string     | yes      | -       |
-| rules.field_rules.field_name                                                 
                  | string     | yes      | -       |
-| rules.field_rules.field_type                                                 
                  | string     | no       | -       |
-| rules.field_rules.field_value                                                
                  | ConfigList | no       | -       |
-| rules.field_rules.field_value.rule_type                                      
                  | string     | no       | -       |
-| rules.field_rules.field_value.rule_value                                     
                  | double     | no       | -       |
-| rules.row_rules                                                              
                  | string     | yes      | -       |
-| rules.row_rules.rule_type                                                    
                  | string     | no       | -       |
-| rules.row_rules.rule_value                                                   
                  | string     | no       | -       |
-| rules.catalog_table_rule                                                     
                  | ConfigMap  | no       | -       |
-| rules.catalog_table_rule.primary_key_rule                                    
                  | ConfigMap  | no       | -       |
-| rules.catalog_table_rule.primary_key_rule.primary_key_name                   
                  | string     | no       | -       |
-| rules.catalog_table_rule.primary_key_rule.primary_key_columns                
                  | list       | no       | -       |
-| rules.catalog_table_rule.constraint_key_rule                                 
                  | ConfigList | no       | -       |
-| rules.catalog_table_rule.constraint_key_rule.constraint_key_name             
                  | string     | no       | -       |
-| rules.catalog_table_rule.constraint_key_rule.constraint_key_type             
                  | string     | no       | -       |
-| rules.catalog_table_rule.constraint_key_rule.constraint_key_columns          
                  | ConfigList | no       | -       |
-| 
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_column_name
 | string     | no       | -       |
-| 
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_sort_type
   | string     | no       | -       |
-| rules.catalog_table_rule.column_rule                                         
                  | ConfigList | no       | -       |
-| rules.catalog_table_rule.column_rule.name                                    
                  | string     | no       | -       |
-| rules.catalog_table_rule.column_rule.type                                    
                  | string     | no       | -       |
-| rules.catalog_table_rule.column_rule.column_length                           
                  | int        | no       | -       |
-| rules.catalog_table_rule.column_rule.nullable                                
                  | boolean    | no       | -       |
-| rules.catalog_table_rule.column_rule.default_value                           
                  | string     | no       | -       |
-| rules.catalog_table_rule.column_rule.comment                                 
                  | comment    | no       | -       |
-| rules.table-names                                                            
                  | list       | no       | -       |
-| common-options                                                               
                  |            | no       | -       |
+|                                              Name                            
                  |                      Type                       | Required 
| Default |
+|------------------------------------------------------------------------------------------------|-------------------------------------------------|----------|---------|
+| rules                                                                        
                  | ConfigMap                                       | yes      
| -       |
+| rules.field_rules                                                            
                  | string                                          | yes      
| -       |
+| rules.field_rules.field_name                                                 
                  | string\|ConfigMap                               | yes      
| -       |
+| rules.field_rules.field_type                                                 
                  | string                                          | no       
| -       |
+| rules.field_rules.field_value                                                
                  | ConfigList                                      | no       
| -       |
+| rules.field_rules.field_value.rule_type                                      
                  | string                                          | no       
| -       |
+| rules.field_rules.field_value.rule_value                                     
                  | numeric                                         | no       
| -       |
+| rules.field_rules.field_value.equals_to                                      
                  | boolean\|numeric\|string\|ConfigList\|ConfigMap | no       
| -       |
+| rules.row_rules                                                              
                  | string                                          | yes      
| -       |
+| rules.row_rules.rule_type                                                    
                  | string                                          | no       
| -       |
+| rules.row_rules.rule_value                                                   
                  | string                                          | no       
| -       |
+| rules.catalog_table_rule                                                     
                  | ConfigMap                                       | no       
| -       |
+| rules.catalog_table_rule.primary_key_rule                                    
                  | ConfigMap                                       | no       
| -       |
+| rules.catalog_table_rule.primary_key_rule.primary_key_name                   
                  | string                                          | no       
| -       |
+| rules.catalog_table_rule.primary_key_rule.primary_key_columns                
                  | ConfigList                                      | no       
| -       |
+| rules.catalog_table_rule.constraint_key_rule                                 
                  | ConfigList                                      | no       
| -       |
+| rules.catalog_table_rule.constraint_key_rule.constraint_key_name             
                  | string                                          | no       
| -       |
+| rules.catalog_table_rule.constraint_key_rule.constraint_key_type             
                  | string                                          | no       
| -       |
+| rules.catalog_table_rule.constraint_key_rule.constraint_key_columns          
                  | ConfigList                                      | no       
| -       |
+| 
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_column_name
 | string                                          | no       | -       |
+| 
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_sort_type
   | string                                          | no       | -       |
+| rules.catalog_table_rule.column_rule                                         
                  | ConfigList                                      | no       
| -       |
+| rules.catalog_table_rule.column_rule.name                                    
                  | string                                          | no       
| -       |
+| rules.catalog_table_rule.column_rule.type                                    
                  | string                                          | no       
| -       |
+| rules.catalog_table_rule.column_rule.column_length                           
                  | int                                             | no       
| -       |
+| rules.catalog_table_rule.column_rule.nullable                                
                  | boolean                                         | no       
| -       |
+| rules.catalog_table_rule.column_rule.default_value                           
                  | string                                          | no       
| -       |
+| rules.catalog_table_rule.column_rule.comment                                 
                  | comment                                         | no       
| -       |
+| rules.table-names                                                            
                  | ConfigList                                      | no       
| -       |
+| common-options                                                               
                  |                                                 | no       
| -       |
 
 ### rules [ConfigMap]
 
@@ -56,9 +57,9 @@ field rules for field validation
 
 field name(string)
 
-### field_type [string]
+### field_type [string | ConfigMap]
 
-field type (string),  e.g. 
`string,boolean,byte,short,int,long,float,double,char,void,BigInteger,BigDecimal,Instant`
+Field type declarations should adhere to this 
[guide](../../concept/schema-feature.md#how-to-declare-type-supported).
 
 ### field_value [ConfigList]
 
@@ -68,6 +69,7 @@ A list value rule define the data value validation
 
 The following rules are supported for now
 - NOT_NULL `value can't be null`
+- NULL `value can be null`
 - MIN `define the minimum value of data`
 - MAX `define the maximum value of data`
 - MIN_LENGTH `define the minimum string length of a string data`
@@ -75,9 +77,17 @@ The following rules are supported for now
 - MIN_ROW `define the minimun number of rows`
 - MAX_ROW `define the maximum number of rows`
 
-### rule_value [double]
+### rule_value [numeric]
 
-the value related to rule type
+The value related to rule type. When the `rule_type` is `MIN`, `MAX`, 
`MIN_LENGTH`, `MAX_LENGTH`, `MIN_ROW` or `MAX_ROW`, users need to assign a 
value to the `rule_value`.
+
+### equals_to [boolean | numeric | string | ConfigList | ConfigMap]
+
+`equals_to` is used to compare whether the field value is equal to the 
configured expected value. You can assign values of all types to `equals_to`. 
These types are detailed 
[here](../../concept/schema-feature.md#what-type-supported-at-now). For 
instance, if one field is a row with three fields, and the declaration of row 
type is `{a = array<string>, b = map<string, decimal(30, 2)>, c={c_0 = int, b = 
string}}`, users can assign the value `[["a", "b"], { k0 = 9999.99, k1 = 111.11 
}, [123, [...]
+
+> The way of defining field values is consistent with 
[FakeSource](../source/FakeSource.md#customize-the-data-content-simple).
+>
+> `equals_to` cannot be applied to `null` type fields. However, users can use 
the rule type `NULL` for verification, such as `{rule_type = NULL}`.
 
 ### catalog_table_rule [ConfigMap]
 
@@ -131,6 +141,7 @@ Assert {
           field_value = [
             {
               rule_type = NOT_NULL
+              equals_to = 23
             },
             {
               rule_type = MIN
@@ -178,7 +189,296 @@ Assert {
       }
 
   }
+```
+
+Here is a more complex example about `equals_to`. The example involves 
FakeSource. You may want to learn it, please read this 
[document](../source/FakeSource.md).
+
+```hocon
+source {
+  FakeSource {
+    row.num = 1
+    schema = {
+      fields {
+        c_null = "null"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_date = date
+        c_timestamp = timestamp
+        c_time = time
+        c_bytes = bytes
+        c_array = "array<int>"
+        c_map = "map<time, string>"
+        c_map_nest = "map<string, {c_int = int, c_string = string}>"
+        c_row = {
+          c_null = "null"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_decimal = "decimal(30, 8)"
+          c_date = date
+          c_timestamp = timestamp
+          c_time = time
+          c_bytes = bytes
+          c_array = "array<int>"
+          c_map = "map<string, string>"
+        }
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [
+          null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, 
"2012-12-21", "2012-12-21T12:34:56", "12:34:56",
+          "bWlJWmo=",
+          [0, 1, 2],
+          "{ 12:01:26 = v0 }",
+          { k1 = [123, "BBB-BB"]},
+          [
+            null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 
99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
+            "bWlJWmo=",
+            [0, 1, 2],
+            { k0 = v0 }
+          ]
+        ]
+      }
+    ]
+    result_table_name = "fake"
+  }
+}
 
+sink{
+  Assert {
+    source_table_name = "fake"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MAX_ROW
+            rule_value = 1
+          },
+          {
+            rule_type = MIN_ROW
+            rule_value = 1
+          }
+        ],
+        field_rules = [
+            {
+                field_name = c_null
+                field_type = "null"
+                field_value = [
+                    {
+                        rule_type = NULL
+                    }
+                ]
+            },
+            {
+                field_name = c_string
+                field_type = string
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = "AAA"
+                    }
+                ]
+            },
+            {
+                field_name = c_boolean
+                field_type = boolean
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = false
+                    }
+                ]
+            },
+            {
+                field_name = c_tinyint
+                field_type = tinyint
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 1
+                    }
+                ]
+            },
+            {
+                field_name = c_smallint
+                field_type = smallint
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 1
+                    }
+                ]
+            },
+            {
+                field_name = c_int
+                field_type = int
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 333
+                    }
+                ]
+            },
+            {
+                field_name = c_bigint
+                field_type = bigint
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 323232
+                    }
+                ]
+            },
+            {
+                field_name = c_float
+                field_type = float
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 3.1
+                    }
+                ]
+            },
+            {
+                field_name = c_double
+                field_type = double
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 9.33333
+                    }
+                ]
+            },
+            {
+                field_name = c_decimal
+                field_type = "decimal(30, 8)"
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 99999.99999999
+                    }
+                ]
+            },
+            {
+                field_name = c_date
+                field_type = date
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = "2012-12-21"
+                    }
+                ]
+            },
+            {
+                field_name = c_timestamp
+                field_type = timestamp
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = "2012-12-21T12:34:56"
+                    }
+                ]
+            },
+            {
+                field_name = c_time
+                field_type = time
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = "12:34:56"
+                    }
+                ]
+            },
+            {
+                field_name = c_bytes
+                field_type = bytes
+                field_value = [
+                      {
+                          rule_type = NOT_NULL
+                          equals_to = "bWlJWmo="
+                      }
+                ]
+            },
+            {
+                field_name = c_array
+                field_type = "array<int>"
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = [0, 1, 2]
+                    }
+                ]
+            },
+            {
+                field_name = c_map
+                field_type = "map<time, string>"
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = "{ 12:01:26 = v0 }"
+                    }
+                ]
+            },
+            {
+                field_name = c_map_nest
+                field_type = "map<string, {c_int = int, c_string = string}>"
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = { k1 = [123, "BBB-BB"] }
+                    }
+                ]
+            },
+            {
+                field_name = c_row
+                field_type = {
+                    c_null = "null"
+                    c_string = string
+                    c_boolean = boolean
+                    c_tinyint = tinyint
+                    c_smallint = smallint
+                    c_int = int
+                    c_bigint = bigint
+                    c_float = float
+                    c_double = double
+                    c_decimal = "decimal(30, 8)"
+                    c_date = date
+                    c_timestamp = timestamp
+                    c_time = time
+                    c_bytes = bytes
+                    c_array = "array<int>"
+                    c_map = "map<string, string>"
+                }
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = [
+                           null, "AAA", false, 1, 1, 333, 323232, 3.1, 
9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
+                           "bWlJWmo=",
+                           [0, 1, 2],
+                           { k0 = v0 }
+                        ]
+                    }
+                ]
+            }
+        ]
+    }
+  }
+}
 ```
 
 ## Changelog
diff --git a/docs/en/connector-v2/source/FakeSource.md 
b/docs/en/connector-v2/source/FakeSource.md
index 43cc8dc671..c85df37261 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -68,12 +68,13 @@ just for some test cases such as type conversion or 
connector new feature testin
 
 ### Simple:
 
-> This example Randomly generates data of a specified type
+> This example Randomly generates data of a specified type. If you want to 
learn how to declare field types, click 
[here](../../concept/schema-feature.md#how-to-declare-type-supported).
 
 ```hocon
 schema = {
   fields {
     c_map = "map<string, array<int>>"
+    c_map_nest = "map<string, {c_int = int, c_string = string}>"
     c_array = "array<int>"
     c_string = string
     c_boolean = boolean
@@ -190,6 +191,8 @@ source {
 }
 ```
 
+> Due to the constraints of the 
[HOCON](https://github.com/lightbend/config/blob/main/HOCON.md) specification, 
users cannot directly create byte sequence objects. FakeSource uses strings to 
assign `bytes` type values. In the example above, the `bytes` type field is 
assigned `"bWlJWmo="`, which is encoded from "miIZj" with **base64**. Hence, 
when assigning values to `bytes` type fields, please use strings encoded with 
**base64**.
+
 ### Specified Data number Simple:
 
 > This case specifies the number of data generated and the length of the 
 > generated value
diff --git a/release-note.md b/release-note.md
index 831018d273..e31dad48fe 100644
--- a/release-note.md
+++ b/release-note.md
@@ -187,6 +187,7 @@
 - [Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
 - [Transform-V2] Add catalog support for SQL Transform plugin (#4819)
 - [Connector-V2] [Assert] Support check the precision and scale of Decimal 
type (#6110)
+- [Connector-V2] [Assert] Support field type assert and field value equality 
assert for full data types (#6275)
 
 ### Zeta(ST-Engine)
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
index ce71a46051..cc7ec83fb1 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
@@ -43,7 +43,8 @@ public class SeaTunnelDataTypeConvertorUtil {
             String field, String columnType) {
         SqlType sqlType = null;
         try {
-            sqlType = SqlType.valueOf(columnType.toUpperCase().replace(" ", 
""));
+            String compatible = compatibleTypeDeclare(columnType);
+            sqlType = SqlType.valueOf(compatible.toUpperCase().replace(" ", 
""));
         } catch (IllegalArgumentException e) {
             // nothing
         }
@@ -84,6 +85,32 @@ public class SeaTunnelDataTypeConvertorUtil {
         }
     }
 
+    /**
+     * User-facing data type declarations will adhere to the specifications 
outlined in
+     * schema-feature.md. To maintain backward compatibility, this function 
will transform type
+     * declarations into standard form, including: <code>long -> 
bigint</code>, <code>
+     * short -> smallint</code>, and <code>byte -> tinyint</code>.
+     *
+     * <p>In a future version, user-facing data type declarations will 
strictly follow the
+     * specifications, and this function will be removed.
+     *
+     * @param declare
+     * @return compatible type
+     */
+    @Deprecated
+    private static String compatibleTypeDeclare(String declare) {
+        switch (declare.trim().toUpperCase()) {
+            case "LONG":
+                return "BIGINT";
+            case "SHORT":
+                return "SMALLINT";
+            case "BYTE":
+                return "TINYINT";
+            default:
+                return declare;
+        }
+    }
+
     private static SeaTunnelDataType<?> parseComplexDataType(String field, 
String columnStr) {
         String column = columnStr.toUpperCase().replace(" ", "");
         if (column.startsWith(SqlType.MAP.name())) {
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
index f126722d83..56934666bc 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
 import org.junit.jupiter.api.Assertions;
@@ -89,4 +94,76 @@ public class SeaTunnelDataTypeConvertorUtilTest {
                         "Unsupported parse SeaTunnel Type from '%s'.", 
invalidTypeDeclaration);
         Assertions.assertEquals(expectedMsg6, exception6.getMessage());
     }
+
+    @Test
+    public void testCompatibleTypeDeclare() {
+        SeaTunnelDataType<?> longType =
+                
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType("c_long", "long");
+        Assertions.assertEquals(BasicType.LONG_TYPE, longType);
+
+        SeaTunnelDataType<?> shortType =
+                
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType("c_short", "short");
+        Assertions.assertEquals(BasicType.SHORT_TYPE, shortType);
+
+        SeaTunnelDataType<?> byteType =
+                
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType("c_byte", "byte");
+        Assertions.assertEquals(BasicType.BYTE_TYPE, byteType);
+
+        ArrayType<?, ?> longArrayType =
+                (ArrayType<?, ?>)
+                        
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+                                "c_long_array", "array<long>");
+        Assertions.assertEquals(ArrayType.LONG_ARRAY_TYPE, longArrayType);
+
+        ArrayType<?, ?> shortArrayType =
+                (ArrayType<?, ?>)
+                        
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+                                "c_short_array", "array<short>");
+        Assertions.assertEquals(ArrayType.SHORT_ARRAY_TYPE, shortArrayType);
+
+        ArrayType<?, ?> byteArrayType =
+                (ArrayType<?, ?>)
+                        
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+                                "c_byte_array", "array<byte>");
+        Assertions.assertEquals(ArrayType.BYTE_ARRAY_TYPE, byteArrayType);
+
+        MapType<?, ?> longMapType =
+                (MapType<?, ?>)
+                        
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+                                "c_long_map", "map<long, long>");
+        Assertions.assertEquals(BasicType.LONG_TYPE, longMapType.getKeyType());
+        Assertions.assertEquals(BasicType.LONG_TYPE, 
longMapType.getValueType());
+
+        MapType<?, ?> shortMapType =
+                (MapType<?, ?>)
+                        
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+                                "c_short_map", "map<short, short>");
+        Assertions.assertEquals(BasicType.SHORT_TYPE, 
shortMapType.getKeyType());
+        Assertions.assertEquals(BasicType.SHORT_TYPE, 
shortMapType.getValueType());
+
+        MapType<?, ?> byteMapType =
+                (MapType<?, ?>)
+                        
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+                                "c_byte_map", "map<byte, byte>");
+        Assertions.assertEquals(BasicType.BYTE_TYPE, byteMapType.getKeyType());
+        Assertions.assertEquals(BasicType.BYTE_TYPE, 
byteMapType.getValueType());
+
+        SeaTunnelRowType longRow =
+                (SeaTunnelRowType)
+                        
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+                                "c_long_row", "{c = long}");
+        Assertions.assertEquals(BasicType.LONG_TYPE, longRow.getFieldType(0));
+
+        SeaTunnelRowType shortRow =
+                (SeaTunnelRowType)
+                        
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+                                "c_short_row", "{c = short}");
+        Assertions.assertEquals(BasicType.SHORT_TYPE, 
shortRow.getFieldType(0));
+
+        SeaTunnelRowType byteRow =
+                (SeaTunnelRowType)
+                        
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+                                "c_byte_row", "{c = byte}");
+        Assertions.assertEquals(BasicType.BYTE_TYPE, byteRow.getFieldType(0));
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-assert/pom.xml 
b/seatunnel-connectors-v2/connector-assert/pom.xml
index 6bbc76a5b1..dc141d2aa4 100644
--- a/seatunnel-connectors-v2/connector-assert/pom.xml
+++ b/seatunnel-connectors-v2/connector-assert/pom.xml
@@ -33,6 +33,11 @@
             <artifactId>connector-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
index 21142e8158..bc73dbf151 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
@@ -17,28 +17,28 @@
 
 package org.apache.seatunnel.connectors.seatunnel.assertion.excecutor;
 
+import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
-import 
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.format.json.JsonToRowConverters;
 
 import org.apache.commons.lang3.StringUtils;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
+import java.io.IOException;
 import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.format.DateTimeFormatter;
-import java.time.temporal.TemporalAccessor;
-import java.time.temporal.TemporalQueries;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
@@ -74,24 +74,28 @@ public class AssertExecutor {
                         Lists.newArrayList(rowType.getFieldNames()),
                         fieldName -> 
fieldName.equals(assertFieldRule.getFieldName()));
 
+        SeaTunnelDataType<?> type = rowType.getFieldType(index);
         Object value = rowData.getField(index);
-        if (Objects.isNull(value)) {
-            return Boolean.FALSE;
-        }
+
         Boolean typeChecked = checkType(value, assertFieldRule.getFieldType());
         if (Boolean.FALSE.equals(typeChecked)) {
             return Boolean.FALSE;
         }
-        Boolean valueChecked = checkValue(value, 
assertFieldRule.getFieldRules());
+        Boolean valueChecked = checkValue(value, type, 
assertFieldRule.getFieldRules());
         if (Boolean.FALSE.equals(valueChecked)) {
             return Boolean.FALSE;
         }
         return Boolean.TRUE;
     }
 
-    private Boolean checkValue(Object value, List<AssertFieldRule.AssertRule> 
fieldValueRules) {
+    private Boolean checkValue(
+            Object value,
+            SeaTunnelDataType<?> type,
+            List<AssertFieldRule.AssertRule> fieldValueRules) {
         Optional<AssertFieldRule.AssertRule> failValueRule =
-                fieldValueRules.stream().filter(valueRule -> !pass(value, 
valueRule)).findFirst();
+                fieldValueRules.stream()
+                        .filter(valueRule -> !pass(value, type, valueRule))
+                        .findFirst();
         if (failValueRule.isPresent()) {
             return Boolean.FALSE;
         } else {
@@ -99,78 +103,256 @@ public class AssertExecutor {
         }
     }
 
-    private boolean pass(Object value, AssertFieldRule.AssertRule valueRule) {
-        if 
(AssertFieldRule.AssertRuleType.NOT_NULL.equals(valueRule.getRuleType())) {
-            return Objects.nonNull(value);
+    private boolean pass(
+            Object value, SeaTunnelDataType<?> type, 
AssertFieldRule.AssertRule valueRule) {
+        AssertFieldRule.AssertRuleType ruleType = valueRule.getRuleType();
+        boolean isPass = true;
+        if (ruleType != null) {
+            isPass = checkAssertRule(value, type, valueRule);
         }
 
-        if (value instanceof Number
-                && 
AssertFieldRule.AssertRuleType.MAX.equals(valueRule.getRuleType())) {
-            return ((Number) value).doubleValue() <= valueRule.getRuleValue();
+        if (Objects.nonNull(value) && valueRule.getEqualTo() != null) {
+            isPass = isPass && compareValue(value, type, valueRule);
         }
-        if (value instanceof Number
-                && 
AssertFieldRule.AssertRuleType.MIN.equals(valueRule.getRuleType())) {
-            return ((Number) value).doubleValue() >= valueRule.getRuleValue();
+        return isPass;
+    }
+
+    private boolean checkAssertRule(
+            Object value, SeaTunnelDataType<?> type, 
AssertFieldRule.AssertRule valueRule) {
+        switch (valueRule.getRuleType()) {
+            case NULL:
+                return Objects.isNull(value);
+            case NOT_NULL:
+                return Objects.nonNull(value);
+            case MAX:
+                {
+                    if (Objects.isNull(value) || !(value instanceof Number)) {
+                        return Boolean.FALSE;
+                    }
+                    return ((Number) value).doubleValue() <= 
valueRule.getRuleValue();
+                }
+            case MIN:
+                {
+                    if (Objects.isNull(value) || !(value instanceof Number)) {
+                        return Boolean.FALSE;
+                    }
+                    return ((Number) value).doubleValue() >= 
valueRule.getRuleValue();
+                }
+            case MAX_LENGTH:
+                {
+                    String valueStr =
+                            Objects.isNull(value) ? StringUtils.EMPTY : 
String.valueOf(value);
+                    return valueStr.length() <= valueRule.getRuleValue();
+                }
+            case MIN_LENGTH:
+                {
+                    String valueStr =
+                            Objects.isNull(value) ? StringUtils.EMPTY : 
String.valueOf(value);
+                    return valueStr.length() >= valueRule.getRuleValue();
+                }
+            default:
+                return false;
         }
-        if (valueRule.getEqualTo() != null) {
-            return compareValue(value, valueRule);
+    }
+
+    private boolean compareValue(
+            Object value, SeaTunnelDataType<?> type, 
AssertFieldRule.AssertRule valueRule) {
+        Object config = valueRule.getEqualTo();
+        String confJsonStr = JsonUtils.toJsonString(config);
+
+        JsonToRowConverters converters = new JsonToRowConverters(true, false);
+        JsonToRowConverters.JsonToRowConverter converter = 
converters.createConverter(type);
+
+        Object confValue;
+        try {
+            confValue =
+                    
converter.convert(JsonUtils.stringToJsonNode(JsonUtils.toJsonString(config)));
+        } catch (IOException e) {
+            throw CommonError.jsonOperationError("Assert", confJsonStr, e);
         }
-        String valueStr = Objects.isNull(value) ? StringUtils.EMPTY : 
String.valueOf(value);
-        if 
(AssertFieldRule.AssertRuleType.MAX_LENGTH.equals(valueRule.getRuleType())) {
-            return valueStr.length() <= valueRule.getRuleValue();
+        return compareValue(value, type, confValue);
+    }
+
+    private boolean compareValue(Object value, SeaTunnelDataType<?> type, 
Object confValue) {
+        switch (type.getSqlType()) {
+            case ROW:
+                {
+                    return compareRowValue(
+                            (SeaTunnelRow) value,
+                            (SeaTunnelRowType) type,
+                            (SeaTunnelRow) confValue);
+                }
+            case ARRAY:
+                {
+                    return compareArrayValue(
+                            (Object[]) value, (ArrayType<?, ?>) type, 
(Object[]) confValue);
+                }
+            case MAP:
+                {
+                    return compareMapValue(
+                            (Map<?, ?>) value, (MapType<?, ?>) type, (Map<?, 
?>) confValue);
+                }
+            case NULL:
+                return value == null && confValue == null;
+            case BYTES:
+                {
+                    return Arrays.equals((byte[]) value, (byte[]) confValue);
+                }
+            case STRING:
+            case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case DECIMAL:
+            case TIME:
+            case TIMESTAMP:
+            case DATE:
+            default:
+                return value.equals(confValue);
         }
+    }
 
-        if 
(AssertFieldRule.AssertRuleType.MIN_LENGTH.equals(valueRule.getRuleType())) {
-            return valueStr.length() >= valueRule.getRuleValue();
+    private boolean compareRowValue(
+            SeaTunnelRow value, SeaTunnelRowType type, SeaTunnelRow confValue) 
{
+        Object[] valFields = value.getFields();
+        Object[] confValFields = confValue.getFields();
+        if (valFields.length != confValFields.length) {
+            return false;
         }
-        return Boolean.TRUE;
+        for (int idx = 0; idx < confValFields.length; idx++) {
+            Object fieldVal = valFields[idx];
+            Object confField = confValFields[idx];
+            SeaTunnelDataType<?> fieldType = type.getFieldType(idx);
+            if (!compareValue(fieldVal, fieldType, confField)) {
+                return false;
+            }
+        }
+        return true;
     }
 
-    private boolean compareValue(Object value, AssertFieldRule.AssertRule 
valueRule) {
-        if (value instanceof String) {
-            return value.equals(valueRule.getEqualTo());
-        } else if (value instanceof Integer) {
-            return value.equals(Integer.parseInt(valueRule.getEqualTo()));
-        } else if (value instanceof Long) {
-            return value.equals(Long.parseLong(valueRule.getEqualTo()));
-        } else if (value instanceof Short) {
-            return value.equals(Short.parseShort(valueRule.getEqualTo()));
-        } else if (value instanceof Float) {
-            return value.equals((Float.parseFloat(valueRule.getEqualTo())));
-        } else if (value instanceof Byte) {
-            return value.equals((Byte.parseByte(valueRule.getEqualTo())));
-        } else if (value instanceof Double) {
-            return value.equals(Double.parseDouble(valueRule.getEqualTo()));
-        } else if (value instanceof BigDecimal) {
-            return value.equals(new BigDecimal(valueRule.getEqualTo()));
-        } else if (value instanceof Boolean) {
-            return value.equals(Boolean.parseBoolean(valueRule.getEqualTo()));
-        } else if (value instanceof LocalDateTime) {
-            TemporalAccessor parsedTimestamp =
-                    
DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(valueRule.getEqualTo());
-            LocalTime localTime = 
parsedTimestamp.query(TemporalQueries.localTime());
-            LocalDate localDate = 
parsedTimestamp.query(TemporalQueries.localDate());
-            return ((LocalDateTime) value).isEqual(LocalDateTime.of(localDate, 
localTime));
-        } else if (value instanceof LocalDate) {
-            DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd");
-            return ((LocalDate) 
value).isEqual(LocalDate.parse(valueRule.getEqualTo(), fmt));
-        } else if (value instanceof LocalTime) {
-            DateTimeFormatter fmt = DateTimeFormatter.ofPattern("HH:mm:ss");
-            return value.equals(LocalTime.parse(valueRule.getEqualTo(), fmt));
-        } else {
-            throw new AssertConnectorException(
-                    AssertConnectorErrorCode.TYPES_NOT_SUPPORTED_FAILED,
-                    String.format(" %s types not supported yet", 
value.getClass().getSimpleName()));
+    private boolean compareArrayValue(Object[] value, ArrayType<?, ?> type, 
Object[] confValue) {
+        if (value.length != confValue.length) {
+            return false;
+        }
+
+        SeaTunnelDataType<?> elementType = type.getElementType();
+        for (int idx = 0; idx < confValue.length; idx++) {
+            Object elementVal = value[idx];
+            Object confElement = confValue[idx];
+            if (!compareValue(elementVal, elementType, confElement)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean compareMapValue(Map<?, ?> value, MapType<?, ?> type, 
Map<?, ?> confValue) {
+        if (value.size() != confValue.size()) {
+            return false;
+        }
+
+        if (value.isEmpty()) {
+            return true;
+        }
+
+        SeaTunnelDataType<?> valType = type.getValueType();
+        for (Map.Entry<?, ?> entry : confValue.entrySet()) {
+            Object confKey = entry.getKey();
+            Object confVal = entry.getValue();
+            if (!value.containsKey(confKey)) {
+                return false;
+            }
+
+            Object val = value.get(confKey);
+            if (!compareValue(val, valType, confVal)) {
+                return false;
+            }
         }
+        return true;
     }
 
     private Boolean checkType(Object value, SeaTunnelDataType<?> fieldType) {
+        if (value == null) {
+            if (fieldType.getSqlType() == SqlType.NULL) {
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        if (fieldType.getSqlType() == SqlType.ROW) {
+            return checkRowType(value, (SeaTunnelRowType) fieldType);
+        }
+
+        if (fieldType.getSqlType() == SqlType.ARRAY) {
+            return checkArrayType(value, (ArrayType<?, ?>) fieldType);
+        }
+
+        if (fieldType.getSqlType() == SqlType.MAP) {
+            return checkMapType(value, (MapType) fieldType);
+        }
+
         if (fieldType.getSqlType() == SqlType.DECIMAL) {
             return checkDecimalType(value, fieldType);
         }
+
         return value.getClass().equals(fieldType.getTypeClass());
     }
 
+    private boolean checkArrayType(Object value, ArrayType<?, ?> fieldType) {
+        if (!value.getClass().isArray()) {
+            return false;
+        }
+
+        Object[] val = (Object[]) value;
+        SeaTunnelDataType<?> elementType = fieldType.getElementType();
+
+        for (Object elementObj : val) {
+            if (!checkType(elementObj, elementType)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean checkMapType(Object value, MapType<?, ?> fieldType) {
+        if (!(value instanceof Map)) {
+            return false;
+        }
+
+        Map<?, ?> val = (Map<?, ?>) value;
+        SeaTunnelDataType<?> keyType = fieldType.getKeyType();
+        SeaTunnelDataType<?> valType = fieldType.getValueType();
+        for (Map.Entry<?, ?> entry : val.entrySet()) {
+            Object keyObj = entry.getKey();
+            Object valObj = entry.getValue();
+            if (!(checkType(keyObj, keyType) && checkType(valObj, valType))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean checkRowType(Object value, SeaTunnelRowType rowType) {
+        if (!(value instanceof SeaTunnelRow)) {
+            return false;
+        }
+
+        SeaTunnelRow row = (SeaTunnelRow) value;
+        Object[] fields = row.getFields();
+        for (int idx = 0; idx < fields.length; idx++) {
+            Object fieldVal = fields[idx];
+            SeaTunnelDataType<?> fieldType = rowType.getFieldType(idx);
+            if (!checkType(fieldVal, fieldType)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     private static Boolean checkDecimalType(Object value, SeaTunnelDataType<?> 
fieldType) {
         if (!value.getClass().equals(fieldType.getTypeClass())) {
             return false;
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertFieldRule.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertFieldRule.java
index 12d2e28bad..f3735c39f1 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertFieldRule.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertFieldRule.java
@@ -34,7 +34,7 @@ public class AssertFieldRule implements Serializable {
     public static class AssertRule implements Serializable {
         private AssertRuleType ruleType;
         private Double ruleValue;
-        private String equalTo;
+        private Object equalTo;
     }
 
     /**
@@ -42,6 +42,8 @@ public class AssertFieldRule implements Serializable {
      * break the rule
      */
     public enum AssertRuleType {
+        /** value can be null */
+        NULL,
         /** value can't be null */
         NOT_NULL,
         /** minimum value of the data */
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
index 842dcabd90..3b1e3594b2 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java
@@ -18,18 +18,15 @@
 package org.apache.seatunnel.connectors.seatunnel.assertion.rule;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
 
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
-import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.EQUALS_TO;
@@ -39,11 +36,8 @@ import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertCon
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULE_TYPE;
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULE_VALUE;
 
+@Slf4j
 public class AssertRuleParser {
-
-    private static final Pattern DECIMAL_TYPE_PATTERN =
-            Pattern.compile("^decimal\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)$");
-
     public List<AssertFieldRule.AssertRule> parseRowRules(List<? extends 
Config> rowRuleList) {
 
         return assembleFieldValueRules(rowRuleList);
@@ -58,9 +52,43 @@ public class AssertRuleParser {
                 .map(
                         config -> {
                             AssertFieldRule fieldRule = new AssertFieldRule();
+                            String fieldName = config.getString(FIELD_NAME);
                             
fieldRule.setFieldName(config.getString(FIELD_NAME));
                             if (config.hasPath(FIELD_TYPE)) {
-                                
fieldRule.setFieldType(getFieldType(config.getString(FIELD_TYPE)));
+                                ConfigValue fieldTypeConf = 
config.getValue(FIELD_TYPE);
+                                switch (fieldTypeConf.valueType()) {
+                                    case STRING:
+                                        {
+                                            String basicTypeStr = 
config.getString(FIELD_TYPE);
+                                            SeaTunnelDataType<?> fieldType =
+                                                    
SeaTunnelDataTypeConvertorUtil
+                                                            
.deserializeSeaTunnelDataType(
+                                                                    fieldName, 
basicTypeStr);
+                                            fieldRule.setFieldType(fieldType);
+                                        }
+                                        ;
+                                        break;
+                                    case OBJECT:
+                                        {
+                                            ConfigObject rowTypeConf = 
config.getObject(FIELD_TYPE);
+                                            SeaTunnelDataType<?> fieldType =
+                                                    
SeaTunnelDataTypeConvertorUtil
+                                                            
.deserializeSeaTunnelDataType(
+                                                                    fieldName,
+                                                                    
rowTypeConf.render());
+                                            fieldRule.setFieldType(fieldType);
+                                        }
+                                        ;
+                                        break;
+                                    case BOOLEAN:
+                                    case NUMBER:
+                                    case LIST:
+                                    case NULL:
+                                        log.warn(
+                                                String.format(
+                                                        "Assert Field Rule[%s] 
doesn't support '%s' type value.",
+                                                        FIELD_TYPE, 
fieldTypeConf.valueType()));
+                                }
                             }
 
                             if (config.hasPath(FIELD_VALUE)) {
@@ -88,39 +116,10 @@ public class AssertRuleParser {
                                 
valueRule.setRuleValue(config.getDouble(RULE_VALUE));
                             }
                             if (config.hasPath(EQUALS_TO)) {
-                                
valueRule.setEqualTo(config.getString(EQUALS_TO));
+                                
valueRule.setEqualTo(config.getValue(EQUALS_TO).unwrapped());
                             }
                             return valueRule;
                         })
                 .collect(Collectors.toList());
     }
-
-    private SeaTunnelDataType<?> getFieldType(String fieldTypeStr) {
-        final String normalTypeStr = fieldTypeStr.trim().toLowerCase();
-        Matcher matcher = DECIMAL_TYPE_PATTERN.matcher(normalTypeStr);
-        if (matcher.find()) {
-            int precision = Integer.parseInt(matcher.group(1));
-            int scale = Integer.parseInt(matcher.group(2));
-            return new DecimalType(precision, scale);
-        }
-        return TYPES.get(normalTypeStr);
-    }
-
-    private static final Map<String, SeaTunnelDataType<?>> TYPES = 
Maps.newHashMap();
-
-    static {
-        TYPES.put("string", BasicType.STRING_TYPE);
-        TYPES.put("boolean", BasicType.BOOLEAN_TYPE);
-        TYPES.put("byte", BasicType.BYTE_TYPE);
-        TYPES.put("short", BasicType.SHORT_TYPE);
-        TYPES.put("int", BasicType.INT_TYPE);
-        TYPES.put("long", BasicType.LONG_TYPE);
-        TYPES.put("float", BasicType.FLOAT_TYPE);
-        TYPES.put("double", BasicType.DOUBLE_TYPE);
-        TYPES.put("void", BasicType.VOID_TYPE);
-        TYPES.put("timestamp", LocalTimeType.LOCAL_DATE_TIME_TYPE);
-        TYPES.put("datetime", LocalTimeType.LOCAL_DATE_TIME_TYPE);
-        TYPES.put("date", LocalTimeType.LOCAL_DATE_TYPE);
-        TYPES.put("time", LocalTimeType.LOCAL_TIME_TYPE);
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
 
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
index 48b31989c6..96b61bfaae 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java
@@ -17,21 +17,39 @@
 
 package org.apache.seatunnel.flink.assertion;
 
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.excecutor.AssertExecutor;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
+import org.apache.seatunnel.format.json.JsonToRowConverters;
 
 import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.Lists;
 
+import java.io.IOException;
 import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.Base64;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -118,48 +136,285 @@ public class AssertExecutorTest {
 
     @Test
     public void testDecimalTypeCheck() {
+        assertFieldRuleNotNull(new DecimalType(10, 2), new 
BigDecimal("99999999.90"));
+    }
+
+    @Test
+    public void testDecimalTypeCheckError() {
         List<AssertFieldRule> rules = Lists.newArrayList();
         AssertFieldRule rule = new AssertFieldRule();
         rule.setFieldName("c_mock");
-        DecimalType assertFieldType = new DecimalType(10, 2);
+        DecimalType assertFieldType = new DecimalType(1, 0);
         rule.setFieldType(assertFieldType);
 
         AssertFieldRule.AssertRule valueRule = new 
AssertFieldRule.AssertRule();
-        valueRule.setEqualTo("99999999.90");
-
+        valueRule.setRuleType(AssertFieldRule.AssertRuleType.NOT_NULL);
         rule.setFieldRules(Collections.singletonList(valueRule));
         rules.add(rule);
 
-        SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] {new 
BigDecimal("99999999.90")});
+        SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] 
{BigDecimal.valueOf(99999999.99)});
         SeaTunnelRowType mockType =
                 new SeaTunnelRowType(
                         new String[] {"c_mock"}, new SeaTunnelDataType[] {new 
DecimalType(10, 2)});
 
         AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType, 
rules).orElse(null);
-        assertNull(failRule);
+        assertNotNull(failRule);
+        assertEquals(assertFieldType, failRule.getFieldType());
+        assertEquals("c_mock", failRule.getFieldName());
     }
 
     @Test
-    public void testDecimalTypeCheckError() {
+    public void testDecimalEqualsTo() {
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = \"999999.90\" 
}").getValue("equals_to"),
+                new DecimalType(10, 2),
+                new BigDecimal("999999.90"));
+    }
+
+    @Test
+    public void testRowTypeCheck() {
+        SeaTunnelRowType assertFieldType =
+                new SeaTunnelRowType(
+                        new String[] {"c_0"}, new SeaTunnelDataType[] 
{BasicType.INT_TYPE});
+        assertFieldRuleNotNull(assertFieldType, new SeaTunnelRow(new Object[] 
{0}));
+    }
+
+    @Test
+    public void testRowEqualsTo() {
+        SeaTunnelRowType assertFieldType =
+                new SeaTunnelRowType(
+                        new String[] {"c_0", "c_1"},
+                        new SeaTunnelDataType[] {BasicType.INT_TYPE, 
BasicType.STRING_TYPE});
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = [0, 
\"xx\"]}").getValue("equals_to"),
+                assertFieldType,
+                new SeaTunnelRow(new Object[] {0, "xx"}));
+    }
+
+    @Test
+    public void testNestRowEqualsTo() {
+        SeaTunnelRowType assertFieldType =
+                new SeaTunnelRowType(
+                        new String[] {"c_0"},
+                        new SeaTunnelDataType[] {
+                            new SeaTunnelRowType(
+                                    new String[] {"c_0_0"},
+                                    new SeaTunnelDataType[] 
{BasicType.INT_TYPE})
+                        });
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = 
[[1]]}").getValue("equals_to"),
+                assertFieldType,
+                new SeaTunnelRow(new Object[] {new SeaTunnelRow(new Object[] 
{1})}));
+    }
+
+    @Test
+    public void testArrayTypeCheck() {
+        assertFieldRuleNotNull(ArrayType.INT_ARRAY_TYPE, new Integer[] {0, 1, 
2});
+    }
+
+    @Test
+    public void testArrayEqualsTo() {
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = [0, 1, 
2]}").getValue("equals_to"),
+                ArrayType.INT_ARRAY_TYPE,
+                new Integer[] {0, 1, 2});
+    }
+
+    @Test
+    public void testMapTypeCheck() {
+        Map<String, String> map = new HashMap<>();
+        map.put("k0", "v0");
+        assertFieldRuleNotNull(
+                new MapType<String, String>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE), map);
+    }
+
+    @Test
+    public void testMapEqualsTo() {
+        Map<String, String> map = new HashMap<>();
+        map.put("k0", "v0");
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = { k0 = v0 } 
}").getValue("equals_to"),
+                new MapType<String, String>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE),
+                map);
+    }
+
+    @Test
+    public void testNullTypeCheck() {
+        assertFieldRuleNull(BasicType.VOID_TYPE, null);
+    }
+
+    @Test
+    public void testStringEqualsTo() {
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = \"string\" 
}").getValue("equals_to"),
+                BasicType.STRING_TYPE,
+                "string");
+    }
+
+    @Test
+    public void testBooleanEqualsTo() {
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = false 
}").getValue("equals_to"),
+                BasicType.BOOLEAN_TYPE,
+                false);
+    }
+
+    @Test
+    public void testTinyIntEqualsTo() {
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = 1 
}").getValue("equals_to"),
+                BasicType.BYTE_TYPE,
+                (byte) 1);
+    }
+
+    @Test
+    public void testSmallIntEqualsTo() {
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = 1 
}").getValue("equals_to"),
+                BasicType.SHORT_TYPE,
+                (short) 1);
+    }
+
+    @Test
+    public void testIntEqualsTo() {
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = 333 
}").getValue("equals_to"),
+                BasicType.INT_TYPE,
+                (int) 333);
+    }
+
+    @Test
+    public void testBigIntEqualsTo() {
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = 323232 
}").getValue("equals_to"),
+                BasicType.LONG_TYPE,
+                (long) 323232L);
+    }
+
+    @Test
+    public void testFloatEqualsTo() {
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = 3.1 
}").getValue("equals_to"),
+                BasicType.FLOAT_TYPE,
+                (float) 3.1);
+    }
+
+    @Test
+    public void testDoubleEqualsTo() {
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = 19.33333 
}").getValue("equals_to"),
+                BasicType.DOUBLE_TYPE,
+                (double) 19.33333);
+    }
+
+    @Test
+    public void testBytesEqualsTo() throws IOException {
+        byte[] bytes = "010101".getBytes();
+        String base64Str = 
Base64.getEncoder().encodeToString("010101".getBytes());
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = \"" + base64Str + "\" 
}")
+                        .getValue("equals_to"),
+                PrimitiveByteArrayType.INSTANCE,
+                (byte[]) bytes);
+    }
+
+    @Test
+    public void testDateEqualsTo() throws IOException {
+        String dateStr = "2024-01-24";
+        LocalDate date =
+                
DateTimeFormatter.ISO_LOCAL_DATE.parse(dateStr).query(TemporalQueries.localDate());
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = \"" + dateStr + "\" }")
+                        .getValue("equals_to"),
+                LocalTimeType.LOCAL_DATE_TYPE,
+                (LocalDate) date);
+    }
+
+    @Test
+    public void testTimeEqualsTo() throws IOException {
+        String timeStr = "12:11:34";
+        LocalTime time =
+                
JsonToRowConverters.TIME_FORMAT.parse(timeStr).query(TemporalQueries.localTime());
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = \"" + timeStr + "\" }")
+                        .getValue("equals_to"),
+                LocalTimeType.LOCAL_TIME_TYPE,
+                (LocalTime) time);
+    }
+
+    @Test
+    public void testTimestampEqualsTo() throws IOException {
+        String timestampStr = "2024-01-24T12:11:34.123";
+        TemporalAccessor parsedTimestamp =
+                DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(timestampStr);
+        LocalTime time = parsedTimestamp.query(TemporalQueries.localTime());
+        LocalDate date = parsedTimestamp.query(TemporalQueries.localDate());
+        LocalDateTime timestamp = LocalDateTime.of(date, time);
+        assertFieldRuleEqualsTo(
+                ConfigFactory.parseString("{equals_to = \"" + timestampStr + 
"\" }")
+                        .getValue("equals_to"),
+                LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                (LocalDateTime) timestamp);
+    }
+
+    private void assertFieldRuleNotNull(SeaTunnelDataType<?> type, Object 
value) {
+        assertFieldRuleMayNull(type, value, false);
+    }
+
+    private void assertFieldRuleNull(SeaTunnelDataType<?> type, Object value) {
+        assertFieldRuleMayNull(type, value, true);
+    }
+
+    private void assertFieldRuleMayNull(SeaTunnelDataType<?> type, Object 
value, boolean isNull) {
         List<AssertFieldRule> rules = Lists.newArrayList();
         AssertFieldRule rule = new AssertFieldRule();
         rule.setFieldName("c_mock");
-        DecimalType assertFieldType = new DecimalType(1, 0);
-        rule.setFieldType(assertFieldType);
+        rule.setFieldType(type);
 
         AssertFieldRule.AssertRule valueRule = new 
AssertFieldRule.AssertRule();
-        valueRule.setRuleType(AssertFieldRule.AssertRuleType.NOT_NULL);
+        valueRule.setRuleType(
+                isNull
+                        ? AssertFieldRule.AssertRuleType.NULL
+                        : AssertFieldRule.AssertRuleType.NOT_NULL);
+
         rule.setFieldRules(Collections.singletonList(valueRule));
         rules.add(rule);
 
-        SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] 
{BigDecimal.valueOf(99999999.99)});
+        SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] {value});
         SeaTunnelRowType mockType =
-                new SeaTunnelRowType(
-                        new String[] {"c_mock"}, new SeaTunnelDataType[] {new 
DecimalType(10, 2)});
+                new SeaTunnelRowType(new String[] {"c_mock"}, new 
SeaTunnelDataType[] {type});
 
         AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType, 
rules).orElse(null);
-        assertNotNull(failRule);
-        assertEquals(assertFieldType, failRule.getFieldType());
-        assertEquals("c_mock", failRule.getFieldName());
+        assertNull(failRule);
+    }
+
+    private void assertFieldRuleEqualsTo(
+            ConfigValue equalsTo, SeaTunnelDataType<?> type, Object expected) {
+        assertFieldRuleEqualsTo(equalsTo, type, expected, true);
+    }
+
+    private void assertFieldRuleEqualsTo(
+            ConfigValue equalsTo, SeaTunnelDataType<?> type, Object expected, 
boolean isEqualsTo) {
+        List<AssertFieldRule> rules = Lists.newArrayList();
+        AssertFieldRule rule = new AssertFieldRule();
+        rule.setFieldName("c_mock");
+        rule.setFieldType(type);
+
+        AssertFieldRule.AssertRule valueRule = new 
AssertFieldRule.AssertRule();
+        valueRule.setEqualTo(equalsTo.unwrapped());
+
+        rule.setFieldRules(Collections.singletonList(valueRule));
+        rules.add(rule);
+
+        SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] {expected});
+        SeaTunnelRowType mockType =
+                new SeaTunnelRowType(new String[] {"c_mock"}, new 
SeaTunnelDataType[] {type});
+
+        AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType, 
rules).orElse(null);
+        if (isEqualsTo) {
+            assertNull(failRule);
+        } else {
+            assertNotNull(failRule);
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
 
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
index 494c3c4496..b4ebe7bb9e 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java
@@ -22,11 +22,14 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertRuleParser;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -38,7 +41,7 @@ public class AssertRuleParserTest {
     public void testParseRules() {
         List<? extends Config> ruleConfigList = assembleConfig();
         List<AssertFieldRule> assertFieldRules = 
parser.parseRules(ruleConfigList);
-        assertEquals(3, assertFieldRules.size());
+        assertEquals(4, assertFieldRules.size());
 
         AssertFieldRule nameRule = assertFieldRules.get(0);
         List<AssertFieldRule.AssertRule> nameValueRules = 
nameRule.getFieldRules();
@@ -70,7 +73,28 @@ public class AssertRuleParserTest {
         assertEquals(2, decimalValueRules.size());
         assertEquals(
                 AssertFieldRule.AssertRuleType.NOT_NULL, 
decimalValueRules.get(0).getRuleType());
-        assertEquals("12.12", decimalValueRules.get(1).getEqualTo());
+        assertEquals("12.12", (String) decimalValueRules.get(1).getEqualTo());
+
+        AssertFieldRule rowRule = assertFieldRules.get(3);
+        List<AssertFieldRule.AssertRule> rowValueRules = 
rowRule.getFieldRules();
+        SeaTunnelRowType expectedRowType =
+                new SeaTunnelRowType(
+                        new String[] {"c_0"},
+                        new SeaTunnelDataType[] {
+                            new SeaTunnelRowType(
+                                    new String[] {"c_0_0"},
+                                    new SeaTunnelDataType[] 
{BasicType.INT_TYPE})
+                        });
+        assertEquals("c_row", rowRule.getFieldName());
+        assertEquals(expectedRowType, rowRule.getFieldType());
+        assertEquals(2, rowValueRules.size());
+        assertEquals(AssertFieldRule.AssertRuleType.NOT_NULL, 
rowValueRules.get(0).getRuleType());
+
+        final List<List<?>> cRow = (List<List<?>>) 
rowValueRules.get(1).getEqualTo();
+        assertEquals(1, cRow.size());
+        assertEquals(ArrayList.class, cRow.get(0).getClass());
+        assertEquals(1, ((List) cRow.get(0)).size());
+        assertEquals(1, ((Integer) ((List) cRow.get(0)).get(0)));
     }
 
     private List<? extends Config> assembleConfig() {
@@ -119,6 +143,17 @@ public class AssertRuleParserTest {
                         + "                    equals_to = \"12.12\"\n"
                         + "                }\n"
                         + "            ]\n"
+                        + "        },{\n"
+                        + "            field_name = c_row\n"
+                        + "            field_type= {c_0 = {c_0_0=int}}\n"
+                        + "            field_value = [\n"
+                        + "                {\n"
+                        + "                    rule_type = NOT_NULL\n"
+                        + "                },\n"
+                        + "                {\n"
+                        + "                    equals_to = [[1]]\n"
+                        + "                }\n"
+                        + "            ]\n"
                         + "        }\n"
                         + "        ]\n"
                         + "    \n"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
index fbdf6733a9..277d048a6d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/java/org/apache/seatunnel/e2e/connector/assertion/FakeSourceToAssertIT.java
@@ -18,7 +18,9 @@
 package org.apache.seatunnel.e2e.connector.assertion;
 
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.TestTemplate;
@@ -35,4 +37,24 @@ public class FakeSourceToAssertIT extends TestSuiteBase {
                 container.executeJob("/assertion/fakesource_to_assert.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
     }
+
+    @TestTemplate
+    public void testFakeSourceToAssertRowSink(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/assertion/fake_row_to_assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.FLINK},
+            disabledReason = "Currently FLINK engine unsupported NULL type")
+    public void testFakeFullTypesToAssertSink(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/assertion/fake_full_types_to_assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fake_full_types_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fake_full_types_to_assert.conf
new file mode 100644
index 0000000000..96cf30306e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fake_full_types_to_assert.conf
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = BATCH
+  # checkpoint.interval = 10000
+}
+
+source {
+  FakeSource {
+    row.num = 1
+    schema = {
+      fields {
+        c_null = "null"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_date = date
+        c_timestamp = timestamp
+        c_time = time
+        c_bytes = bytes
+        c_array = "array<int>"
+        c_map = "map<date, string>"
+        c_map_nest = "map<string, {c_int = int, c_string = string}>"
+        c_row = {
+          c_null = "null"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_decimal = "decimal(30, 8)"
+          c_date = date
+          c_timestamp = timestamp
+          c_time = time
+          c_bytes = bytes
+          c_array = "array<int>"
+          c_map = "map<string, string>"
+        }
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [
+          null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, 
"2012-12-21", "2012-12-21T12:34:56", "12:34:56",
+          "bWlJWmo=",
+          [0, 1, 2],
+          { "2024-01-26" = v0 },
+          { k1 = [123, "BBB-BB"]},
+          [
+            null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 
99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
+            "bWlJWmo=",
+            [0, 1, 2],
+            { k0 = v0 }
+          ]
+        ]
+      }
+    ]
+    result_table_name = "fake"
+  }
+}
+
+sink{
+  Assert {
+    source_table_name = "fake"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MAX_ROW
+            rule_value = 1
+          },
+          {
+            rule_type = MIN_ROW
+            rule_value = 1
+          }
+        ],
+        field_rules = [
+            {
+                field_name = c_null
+                field_type = "null"
+                field_value = [
+                    {
+                        rule_type = NULL
+                    }
+                ]
+            },
+            {
+                field_name = c_string
+                field_type = string
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = "AAA"
+                    }
+                ]
+            },
+            {
+                field_name = c_boolean
+                field_type = boolean
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = false
+                    }
+                ]
+            },
+            {
+                field_name = c_tinyint
+                field_type = tinyint
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 1
+                    }
+                ]
+            },
+            {
+                field_name = c_smallint
+                field_type = smallint
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 1
+                    }
+                ]
+            },
+            {
+                field_name = c_int
+                field_type = int
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 333
+                    }
+                ]
+            },
+            {
+                field_name = c_bigint
+                field_type = bigint
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 323232
+                    }
+                ]
+            },
+            {
+                field_name = c_float
+                field_type = float
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 3.1
+                    }
+                ]
+            },
+            {
+                field_name = c_double
+                field_type = double
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 9.33333
+                    }
+                ]
+            },
+            {
+                field_name = c_decimal
+                field_type = "decimal(30, 8)"
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = 99999.99999999
+                    }
+                ]
+            },
+            {
+                field_name = c_date
+                field_type = date
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = "2012-12-21"
+                    }
+                ]
+            },
+            {
+                field_name = c_timestamp
+                field_type = timestamp
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = "2012-12-21T12:34:56"
+                    }
+                ]
+            },
+            {
+                field_name = c_time
+                field_type = time
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = "12:34:56"
+                    }
+                ]
+            },
+            {
+                field_name = c_bytes
+                field_type = bytes
+                field_value = [
+                      {
+                          rule_type = NOT_NULL
+                          equals_to = "bWlJWmo="
+                      }
+                ]
+            },
+            {
+                field_name = c_array
+                field_type = "array<int>"
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = [0, 1, 2]
+                    }
+                ]
+            },
+            {
+                field_name = c_map
+                field_type = "map<date, string>"
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = { "2024-01-26" = v0 }
+                    }
+                ]
+            },
+            {
+                field_name = c_map_nest
+                field_type = "map<string, {c_int = int, c_string = string}>"
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = { k1 = [123, "BBB-BB"] }
+                    }
+                ]
+            },
+            {
+                field_name = c_row
+                field_type = {
+                    c_null = "null"
+                    c_string = string
+                    c_boolean = boolean
+                    c_tinyint = tinyint
+                    c_smallint = smallint
+                    c_int = int
+                    c_bigint = bigint
+                    c_float = float
+                    c_double = double
+                    c_decimal = "decimal(30, 8)"
+                    c_date = date
+                    c_timestamp = timestamp
+                    c_time = time
+                    c_bytes = bytes
+                    c_array = "array<int>"
+                    c_map = "map<string, string>"
+                }
+                field_value = [
+                    {
+                        rule_type = NOT_NULL
+                        equals_to = [
+                           null, "AAA", false, 1, 1, 333, 323232, 3.1, 
9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
+                           "bWlJWmo=",
+                           [0, 1, 2],
+                           { k0 = v0 }
+                        ]
+                    }
+                ]
+            }
+        ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fake_row_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fake_row_to_assert.conf
new file mode 100644
index 0000000000..731724a417
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fake_row_to_assert.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = BATCH
+  # checkpoint.interval = 10000
+}
+
+source {
+  FakeSource {
+    row.num = 1
+    schema = {
+      fields {
+        c_array = "array<int>"
+        c_map = "map<string, string>"
+        c_row = {
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_decimal = "decimal(30, 8)"
+          c_date = date
+          c_timestamp = timestamp
+        }
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [[0, 1, 2], { k0 = v0 }, ["AAA", false, 1, 1, 333, 323232, 
3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56"]]
+      }
+    ]
+    result_table_name = "fake"
+  }
+}
+
+sink{
+  Assert {
+    source_table_name = "fake"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MAX_ROW
+            rule_value = 1
+          },
+          {
+            rule_type = MIN_ROW
+            rule_value = 1
+          }
+        ],
+        field_rules = [
+        {
+            field_name = c_array
+            field_type = "array<int>"
+            field_value = [
+                {
+                    equals_to = [0, 1, 2]
+                }
+            ]
+        },
+        {
+          field_name = c_map
+          field_type = "map<string, string>"
+          field_value = [
+            {
+              equals_to = { k0 = v0 }
+            }
+          ]
+        },
+        {
+          field_name = c_row
+          field_type = {
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_decimal = "decimal(30, 8)"
+            c_date = date
+            c_timestamp = timestamp
+          }
+          field_value = [
+            {
+              equals_to = ["AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 
99999.99999999, "2012-12-21", "2012-12-21T12:34:56"]
+            }
+          ]
+        }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_range.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_range.conf
index 49fd9807df..58f30a56cc 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_range.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_range.conf
@@ -79,7 +79,7 @@ sink {
         },
         {
           field_name = c_tinyint
-          field_type = byte
+          field_type = tinyint
           field_value = [
             {
               rule_type = MIN
@@ -93,7 +93,7 @@ sink {
         },
         {
           field_name = c_smallint
-          field_type = short
+          field_type = smallint
           field_value = [
             {
               rule_type = MIN
@@ -121,7 +121,7 @@ sink {
         },
         {
           field_name = c_bigint
-          field_type = long
+          field_type = bigint
           field_value = [
             {
               rule_type = MIN
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_template.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_template.conf
index 8a034f6094..9a1113b4b8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_template.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_template.conf
@@ -80,7 +80,7 @@ sink {
         },
         {
           field_name = c_tinyint
-          field_type = byte
+          field_type = tinyint
           field_value = [
             {
               rule_type = MIN
@@ -94,7 +94,7 @@ sink {
         },
         {
           field_name = c_smallint
-          field_type = short
+          field_type = smallint
           field_value = [
             {
               rule_type = MIN
@@ -122,7 +122,7 @@ sink {
         },
         {
           field_name = c_bigint
-          field_type = long
+          field_type = bigint
           field_value = [
             {
               rule_type = MIN
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf
index cfc734e6bf..e03ff0e52c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf
@@ -119,7 +119,7 @@ sink {
         },
         {
           field_name = c_smallint
-          field_type = short
+          field_type = smallint
           field_value = [
             {
               equals_to = 15920
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf
index a70a51ee87..a4d0f4468e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf
@@ -118,7 +118,7 @@ sink {
         },
         {
           field_name = c_smallint
-          field_type = short
+          field_type = smallint
           field_value = [
             {
               equals_to = 13846
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_file_json_lzo_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_file_json_lzo_to_console.conf
index a17fd8679b..05d50539c3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_file_json_lzo_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_file_json_lzo_to_console.conf
@@ -121,7 +121,7 @@ sink {
         },
         {
           field_name = c_smallint
-          field_type = short
+          field_type = smallint
           field_value = [
             {
               equals_to = 15920
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/text/oss_file_text_lzo_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/text/oss_file_text_lzo_to_assert.conf
index 7417e74d27..9709dd15e2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/text/oss_file_text_lzo_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/text/oss_file_text_lzo_to_assert.conf
@@ -120,7 +120,7 @@ sink {
         },
         {
           field_name = c_smallint
-          field_type = short
+          field_type = smallint
           field_value = [
             {
               equals_to = 13846
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
index 969e85e123..58cf96fe38 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -64,7 +64,7 @@ sink {
         field_rules = [
           {
             field_name = f1
-            field_type = long
+            field_type = bigint
             field_value = [
               {
                 rule_type = NOT_NULL
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
index 97637f9a04..35e27e7718 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -67,7 +67,7 @@ sink {
       field_rules = [
         {
           field_name = f1
-          field_type = long
+          field_type = bigint
           field_value = [
             {
               rule_type = NOT_NULL
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
index 0f2b909c12..b8dd877997 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
@@ -65,17 +65,17 @@ sink{
         },
         {
           field_name = hive_e2e_source_table.bigint_column
-          field_type = long
+          field_type = bigint
           field_value = [{equals_to = 1234567890}]
         },
         {
           field_name = hive_e2e_source_table.smallint_column
-          field_type = short
+          field_type = smallint
           field_value = [{equals_to = 32767}]
          },
         {
           field_name = hive_e2e_source_table.tinyint_column
-          field_type = byte
+          field_type = tinyint
           field_value = [{equals_to = 127}]
          },
         {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
index 05c64e0ebe..31fe77a3e2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
@@ -65,7 +65,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
               {
                 rule_type = NOT_NULL
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
index 58b3474065..f9a41e7987 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
@@ -64,7 +64,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
               {
                 rule_type = NOT_NULL
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
index 16251c344f..b5d63d204e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
@@ -55,7 +55,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
 
               {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
index 11b7b35042..b6db50989a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
@@ -74,7 +74,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
               {
                 rule_type = NOT_NULL
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
index 3419175228..45b29d1915 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
@@ -74,7 +74,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
               {
                 rule_type = NOT_NULL
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
index 57129b040c..dabbbdb7db 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
@@ -55,7 +55,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
 
               {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
index 100de6d2b6..46a8c5f0dc 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
@@ -53,7 +53,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
 
               {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
index cb898a759d..39c16a2a3b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
@@ -57,7 +57,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
 
               {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
index 26ee02d614..bab9f197c0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
@@ -54,7 +54,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
 
               {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
index 4d4d8536c7..36f01c0337 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
@@ -66,7 +66,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
               {
                 rule_type = NOT_NULL
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
index d757a36cb5..d7f875272b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
@@ -127,7 +127,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
               {
                 rule_type = NOT_NULL
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
index 0e1aa6a949..beff2cfac6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
@@ -103,7 +103,7 @@ sink {
           },
           {
             field_name = c_tinyint
-            field_type = byte
+            field_type = tinyint
             field_value = [
               {
                 rule_type = NOT_NULL
@@ -112,7 +112,7 @@ sink {
           },
           {
             field_name = c_smallint
-            field_type = short
+            field_type = smallint
             field_value = [
               {
                 rule_type = NOT_NULL
@@ -130,7 +130,7 @@ sink {
           },
           {
             field_name = c_bigint
-            field_type = long
+            field_type = bigint
             field_value = [
               {
                 rule_type = NOT_NULL
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
index b52dd20eb4..9ba89caf86 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
@@ -69,7 +69,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
               {
                 rule_type = NOT_NULL
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
index b60de4284f..09ab946964 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
@@ -70,7 +70,7 @@ sink {
       field_rules = [
         {
           field_name = id
-          field_type = long
+          field_type = bigint
           field_value = [
             {
               rule_type = NOT_NULL
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
index d047e310ba..b7c9aff4f6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
@@ -48,7 +48,7 @@ sink {
       field_rules = [
         {
           field_name = id
-          field_type = long
+          field_type = bigint
           field_value = [
             {
               rule_type = MIN
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
index b631e243b1..eabf53242b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
@@ -47,7 +47,7 @@ sink {
       field_rules = [
         {
           field_name = id
-          field_type = long
+          field_type = bigint
           field_value = [
 
             {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
index fac6fe305c..d9a6a51265 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
@@ -48,7 +48,7 @@ sink {
       field_rules = [
         {
           field_name = id
-          field_type = long
+          field_type = bigint
           field_value = [
             {
               rule_type = MIN
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
index 833ed1621d..cee9f1e273 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
@@ -52,7 +52,7 @@ sink {
       field_rules = [
         {
           field_name = id
-          field_type = long
+          field_type = bigint
           field_value = [
             {
               rule_type = MIN
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
index d3ce2a72df..0de010a26c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
@@ -50,7 +50,7 @@ sink {
         field_rules = [
           {
             field_name = id
-            field_type = long
+            field_type = bigint
             field_value = [
 
               {
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_datetime.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_datetime.conf
index a91a9eff74..042b66ad25 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_datetime.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_datetime.conf
@@ -123,49 +123,49 @@ sink {
         },
         {
           field_name = "test"
-          field_type = "long"
+          field_type = bigint
           field_value = [
             {equals_to = 283}
           ]
         },
         {
           field_name = "c2_1"
-          field_type = "long"
+          field_type = bigint
           field_value = [
             {equals_to = 283}
           ]
         },
         {
           field_name = "c2_2"
-          field_type = "long"
+          field_type = bigint
           field_value = [
             {equals_to = 0}
           ]
         },
         {
           field_name = "c2_3"
-          field_type = "long"
+          field_type = bigint
           field_value = [
             {equals_to = 9}
           ]
         },
         {
           field_name = "c2_4"
-          field_type = "long"
+          field_type = bigint
           field_value = [
             {equals_to = 6791}
           ]
         },
         {
           field_name = "c2_5"
-          field_type = "long"
+          field_type = bigint
           field_value = [
             {equals_to = 407460}
           ]
         },
         {
           field_name = "c2_6"
-          field_type = "long"
+          field_type = bigint
           field_value = [
             {equals_to = 24447611}
           ]
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
index 2cf38bb4d3..ebeea3659c 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_string.conf
@@ -79,21 +79,21 @@ sink {
         },
         {
           field_name = "c4_1"
-          field_type = "long"
+          field_type = bigint
           field_value = [
             {equals_to = 64}
           ]
         },
         {
           field_name = "c4_2"
-          field_type = "long"
+          field_type = bigint
           field_value = [
             {equals_to = 8}
           ]
         },
         {
           field_name = "c4_3"
-          field_type = "long"
+          field_type = bigint
           field_value = [
             {equals_to = 8}
           ]
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
index 648b2c98af..dd91ac5a87 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
 import java.io.IOException;
@@ -351,6 +352,7 @@ public class JsonToRowConverters implements Serializable {
     }
 
     private JsonToRowConverter createMapConverter(MapType<?, ?> type) {
+        JsonToRowConverter keyConverter = createConverter(type.getKeyType());
         JsonToRowConverter valueConverter = 
createConverter(type.getValueType());
         return new JsonToRowConverter() {
             @Override
@@ -361,8 +363,17 @@ public class JsonToRowConverters implements Serializable {
                                 new Consumer<Map.Entry<String, JsonNode>>() {
                                     @Override
                                     public void accept(Map.Entry<String, 
JsonNode> entry) {
+                                        JsonNode keyNode;
+                                        try {
+                                            keyNode =
+                                                    JsonUtils.stringToJsonNode(
+                                                            
JsonUtils.toJsonString(entry.getKey()));
+                                        } catch (Exception e) {
+                                            throw 
CommonError.jsonOperationError(
+                                                    FORMAT, entry.getKey(), e);
+                                        }
                                         value.put(
-                                                entry.getKey(),
+                                                keyConverter.convert(keyNode),
                                                 
valueConverter.convert(entry.getValue()));
                                     }
                                 });
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
index 109f8b496a..7a81625811 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
@@ -18,10 +18,13 @@
 
 package org.apache.seatunnel.format.json;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 
+import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -29,23 +32,30 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
 import org.junit.jupiter.api.Test;
 
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalQueries;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.seatunnel.api.table.type.ArrayType.INT_ARRAY_TYPE;
 import static org.apache.seatunnel.api.table.type.ArrayType.STRING_ARRAY_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.SHORT_TYPE;
 import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -391,4 +401,84 @@ public class JsonRowDataSerDeSchemaTest {
 
         assertEquals(actual.getMessage(), expected.getMessage());
     }
+
+    @Test
+    public void testMapConverterKeyType() throws JsonProcessingException {
+        MapType<String, String> stringKeyMapType = new MapType<>(STRING_TYPE, 
STRING_TYPE);
+        MapType<Boolean, String> booleanKeyMapType = new 
MapType<>(BOOLEAN_TYPE, STRING_TYPE);
+        MapType<Byte, String> tinyintKeyMapType = new MapType<>(BYTE_TYPE, 
STRING_TYPE);
+        MapType<Short, String> smallintKeyMapType = new MapType<>(SHORT_TYPE, 
STRING_TYPE);
+        MapType<Integer, String> intKeyMapType = new MapType<>(INT_TYPE, 
STRING_TYPE);
+        MapType<Long, String> bigintKeyMapType = new MapType<>(LONG_TYPE, 
STRING_TYPE);
+        MapType<Float, String> floatKeyMapType = new MapType<>(FLOAT_TYPE, 
STRING_TYPE);
+        MapType<Double, String> doubleKeyMapType = new MapType<>(DOUBLE_TYPE, 
STRING_TYPE);
+        MapType<LocalDate, String> dateKeyMapType =
+                new MapType<>(LocalTimeType.LOCAL_DATE_TYPE, STRING_TYPE);
+        MapType<LocalTime, String> timeKeyMapType =
+                new MapType<>(LocalTimeType.LOCAL_TIME_TYPE, STRING_TYPE);
+        MapType<LocalDateTime, String> timestampKeyMapType =
+                new MapType<>(LocalTimeType.LOCAL_DATE_TIME_TYPE, STRING_TYPE);
+        MapType<BigDecimal, String> decimalKeyMapType =
+                new MapType<>(new DecimalType(10, 2), STRING_TYPE);
+
+        JsonToRowConverters converters = new JsonToRowConverters(true, false);
+
+        JsonToRowConverters.JsonToRowConverter stringConverter =
+                converters.createConverter(stringKeyMapType);
+        JsonToRowConverters.JsonToRowConverter booleanConverter =
+                converters.createConverter(booleanKeyMapType);
+        JsonToRowConverters.JsonToRowConverter tinyintConverter =
+                converters.createConverter(tinyintKeyMapType);
+        JsonToRowConverters.JsonToRowConverter smallintConverter =
+                converters.createConverter(smallintKeyMapType);
+        JsonToRowConverters.JsonToRowConverter intConverter =
+                converters.createConverter(intKeyMapType);
+        JsonToRowConverters.JsonToRowConverter bigintConverter =
+                converters.createConverter(bigintKeyMapType);
+        JsonToRowConverters.JsonToRowConverter floatConverter =
+                converters.createConverter(floatKeyMapType);
+        JsonToRowConverters.JsonToRowConverter doubleConverter =
+                converters.createConverter(doubleKeyMapType);
+        JsonToRowConverters.JsonToRowConverter dateConverter =
+                converters.createConverter(dateKeyMapType);
+        JsonToRowConverters.JsonToRowConverter timeConverter =
+                converters.createConverter(timeKeyMapType);
+        JsonToRowConverters.JsonToRowConverter timestampConverter =
+                converters.createConverter(timestampKeyMapType);
+        JsonToRowConverters.JsonToRowConverter decimalConverter =
+                converters.createConverter(decimalKeyMapType);
+
+        assertMapKeyType("{\"abc\": \"xxx\"}", stringConverter, "abc");
+        assertMapKeyType("{\"false\": \"xxx\"}", booleanConverter, false);
+        assertMapKeyType("{\"1\": \"xxx\"}", tinyintConverter, (byte) 1);
+        assertMapKeyType("{\"12\": \"xxx\"}", smallintConverter, (short) 12);
+        assertMapKeyType("{\"123\": \"xxx\"}", intConverter, 123);
+        assertMapKeyType("{\"12345\": \"xxx\"}", bigintConverter, 12345L);
+        assertMapKeyType("{\"1.0001\": \"xxx\"}", floatConverter, 1.0001f);
+        assertMapKeyType("{\"999.9999\": \"xxx\"}", doubleConverter, 999.9999);
+        assertMapKeyType("{\"9999.23\": \"xxx\"}", decimalConverter, 
BigDecimal.valueOf(9999.23));
+
+        LocalDate date =
+                DateTimeFormatter.ISO_LOCAL_DATE
+                        .parse("2024-01-26")
+                        .query(TemporalQueries.localDate());
+        assertMapKeyType("{\"2024-01-26\": \"xxx\"}", dateConverter, date);
+
+        LocalTime time =
+                JsonToRowConverters.TIME_FORMAT
+                        .parse("12:00:12.001")
+                        .query(TemporalQueries.localTime());
+        assertMapKeyType("{\"12:00:12.001\": \"xxx\"}", timeConverter, time);
+
+        LocalDateTime timestamp = LocalDateTime.of(date, time);
+        assertMapKeyType("{\"2024-01-26T12:00:12.001\": \"xxx\"}", 
timestampConverter, timestamp);
+    }
+
+    private void assertMapKeyType(
+            String payload, JsonToRowConverters.JsonToRowConverter converter, 
Object expect)
+            throws JsonProcessingException {
+        JsonNode keyMapNode = JsonUtils.stringToJsonNode(payload);
+        Map<?, ?> keyMap = (Map<?, ?>) converter.convert(keyMapNode);
+        assertEquals(expect, keyMap.keySet().iterator().next());
+    }
 }

Reply via email to