This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new aefc135195 [Feature][Spark] Support SeaTunnel Time Type. (#5188)
aefc135195 is described below

commit aefc135195f5a10e37aec84e0b7c29708342cfcd
Author: Chengyu Yan <[email protected]>
AuthorDate: Wed Oct 18 17:44:11 2023 +0800

    [Feature][Spark] Support SeaTunnel Time Type. (#5188)
---
 release-note.md                                    |  1 +
 .../spark/execution/SinkExecuteProcessor.java      |  7 +++-
 .../spark/execution/SinkExecuteProcessor.java      |  7 +++-
 .../spark/execution/SourceExecuteProcessor.java    |  5 +--
 .../spark/execution/TransformExecuteProcessor.java | 45 +++++++++++-----------
 .../resources/assertion/fakesource_to_assert.conf  | 16 ++++++--
 .../connectors/seatunnel/jdbc/JdbcMysqlIT.java     |  5 +++
 .../test/resources/jdbc_mysql_source_and_sink.conf |  6 +--
 .../jdbc_mysql_source_and_sink_parallel.conf       |  6 +--
 ...mysql_source_and_sink_parallel_upper_lower.conf |  6 +--
 .../resources/jdbc_mysql_source_and_sink_xa.conf   |  6 +--
 .../src/test/resources/sql_transform.conf          | 12 +++++-
 .../spark/serialization/InternalRowConverter.java  | 13 ++++---
 .../spark/serialization/SeaTunnelRowConverter.java | 15 ++++++--
 .../spark/utils/TypeConverterUtils.java            | 29 ++++++++++----
 15 files changed, 116 insertions(+), 63 deletions(-)

diff --git a/release-note.md b/release-note.md
index 6a777a12d8..bfb7d03856 100644
--- a/release-note.md
+++ b/release-note.md
@@ -151,6 +151,7 @@
 - [Core] [API] Add copy method to Catalog codes (#4414)
 - [Core] [API] Add options check before create source and sink and transform 
in FactoryUtil (#4424)
 - [Core] [Shade] Add guava shade module (#4358)
+- [Core] [Spark] Support SeaTunnel Time Type (#5188)
 - [Core] [Flink] Support Decimal Type with configurable precision and scale 
(#5419)
 
 ### Connector-V2
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 0e3b18fb7d..5546890e83 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -37,7 +38,6 @@ import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
-import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
 
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -94,7 +94,10 @@ public class SinkExecuteProcessor
             DatasetTableInfo datasetTableInfo =
                     fromSourceTable(sinkConfig, sparkRuntimeEnvironment, 
upstreamDataStreams)
                             .orElse(input);
+            SeaTunnelDataType<?> inputType =
+                    datasetTableInfo.getCatalogTable().getSeaTunnelRowType();
             Dataset<Row> dataset = datasetTableInfo.getDataset();
+
             int parallelism;
             if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
@@ -120,7 +123,7 @@ public class SinkExecuteProcessor
                                         
sinkConfig.getString(PLUGIN_NAME.key())),
                                 sinkConfig);
                 sink.setJobContext(jobContext);
-                sink.setTypeInfo((SeaTunnelRowType) 
TypeConverterUtils.convert(dataset.schema()));
+                sink.setTypeInfo((SeaTunnelRowType) inputType);
             } else {
                 TableSinkFactoryContext context =
                         new TableSinkFactoryContext(
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index c85a10389d..b3d978b1cb 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -37,7 +38,6 @@ import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
-import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
 
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -95,7 +95,10 @@ public class SinkExecuteProcessor
             DatasetTableInfo datasetTableInfo =
                     fromSourceTable(sinkConfig, sparkRuntimeEnvironment, 
upstreamDataStreams)
                             .orElse(input);
+            SeaTunnelDataType<?> inputType =
+                    datasetTableInfo.getCatalogTable().getSeaTunnelRowType();
             Dataset<Row> dataset = datasetTableInfo.getDataset();
+
             int parallelism;
             if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
@@ -121,7 +124,7 @@ public class SinkExecuteProcessor
                                         
sinkConfig.getString(PLUGIN_NAME.key())),
                                 sinkConfig);
                 sink.setJobContext(jobContext);
-                sink.setTypeInfo((SeaTunnelRowType) 
TypeConverterUtils.convert(dataset.schema()));
+                sink.setTypeInfo((SeaTunnelRowType) inputType);
             } else {
                 TableSinkFactoryContext context =
                         new TableSinkFactoryContext(
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 126f14edb1..a951b90c97 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -76,6 +76,7 @@ public class SourceExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<
                                         CommonOptions.PARALLELISM.key(),
                                         
CommonOptions.PARALLELISM.defaultValue());
             }
+            StructType schema = (StructType) 
TypeConverterUtils.convert(source.getProducedType());
             Dataset<Row> dataset =
                     sparkRuntimeEnvironment
                             .getSparkSession()
@@ -85,9 +86,7 @@ public class SourceExecuteProcessor extends 
SparkAbstractPluginExecuteProcessor<
                             .option(
                                     Constants.SOURCE_SERIALIZATION,
                                     SerializationUtils.objectToString(source))
-                            .schema(
-                                    (StructType)
-                                            
TypeConverterUtils.convert(source.getProducedType()))
+                            .schema(schema)
                             .load();
             sources.add(
                     new DatasetTableInfo(
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index da76e027db..3a731eaee2 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -50,6 +50,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
@@ -107,7 +108,7 @@ public class TransformExecuteProcessor
                 
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
                 SeaTunnelTransform transform = 
factory.createTransform(context).createTransform();
 
-                Dataset<Row> inputDataset = sparkTransform(transform, 
dataset.getDataset());
+                Dataset<Row> inputDataset = sparkTransform(transform, dataset);
                 registerInputTempView(pluginConfig, inputDataset);
                 upstreamDataStreams.add(
                         new DatasetTableInfo(
@@ -127,33 +128,33 @@ public class TransformExecuteProcessor
         return upstreamDataStreams;
     }
 
-    private Dataset<Row> sparkTransform(SeaTunnelTransform transform, 
Dataset<Row> stream)
+    private Dataset<Row> sparkTransform(SeaTunnelTransform transform, 
DatasetTableInfo tableInfo)
             throws IOException {
-        SeaTunnelDataType<?> seaTunnelDataType = 
TypeConverterUtils.convert(stream.schema());
+        Dataset<Row> stream = tableInfo.getDataset();
+        SeaTunnelDataType<?> seaTunnelDataType = 
tableInfo.getCatalogTable().getSeaTunnelRowType();
         transform.setTypeInfo(seaTunnelDataType);
-        StructType structType =
+        StructType outputSchema =
                 (StructType) 
TypeConverterUtils.convert(transform.getProducedType());
         SeaTunnelRowConverter inputRowConverter = new 
SeaTunnelRowConverter(seaTunnelDataType);
         SeaTunnelRowConverter outputRowConverter =
                 new SeaTunnelRowConverter(transform.getProducedType());
-        ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
-        return stream.mapPartitions(
-                        (MapPartitionsFunction<Row, Row>)
-                                (Iterator<Row> rowIterator) -> {
-                                    TransformIterator iterator =
-                                            new TransformIterator(
-                                                    rowIterator,
-                                                    transform,
-                                                    structType,
-                                                    inputRowConverter,
-                                                    outputRowConverter);
-                                    return iterator;
-                                },
-                        encoder)
-                .filter(
-                        (Row row) -> {
-                            return row != null;
-                        });
+        ExpressionEncoder<Row> encoder = RowEncoder.apply(outputSchema);
+        Dataset<Row> result =
+                stream.mapPartitions(
+                                (MapPartitionsFunction<Row, Row>)
+                                        (Iterator<Row> rowIterator) -> {
+                                            TransformIterator iterator =
+                                                    new TransformIterator(
+                                                            rowIterator,
+                                                            transform,
+                                                            outputSchema,
+                                                            inputRowConverter,
+                                                            
outputRowConverter);
+                                            return iterator;
+                                        },
+                                encoder)
+                        .filter(Objects::nonNull);
+        return result;
     }
 
     private static class TransformIterator implements Iterator<Row>, 
Serializable {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index edd7843286..ba2e311eee 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -35,6 +35,7 @@ source {
       fields {
         name = "string"
         age = "int"
+        c_time = "time"
       }
     }
   }
@@ -46,7 +47,7 @@ transform {
   Filter {
     source_table_name = "fake"
     result_table_name = "fake1"
-    fields = ["name", "age"]
+    fields = ["name", "age", "c_time"]
   }
 }
 
@@ -97,10 +98,17 @@ sink {
               rule_value = 2147483647
             }
           ]
+        }, {
+          field_name = c_time
+          field_type = time
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
         }
-        ]
-      }
-
+      ]
+    }
   }
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
   # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
index d8fcda8512..c6a8a9eedc 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
@@ -60,9 +60,11 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.SQLException;
+import java.sql.Time;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -127,6 +129,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
                     + "    `c_longtext`             longtext,\n"
                     + "    `c_date`                 date                  
DEFAULT NULL,\n"
                     + "    `c_datetime`             datetime              
DEFAULT NULL,\n"
+                    + "    `c_time`                 time                  
DEFAULT NULL,\n"
                     + "    `c_timestamp`            timestamp NULL        
DEFAULT NULL,\n"
                     + "    `c_tinyblob`             tinyblob,\n"
                     + "    `c_mediumblob`           mediumblob,\n"
@@ -221,6 +224,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
                     "c_longtext",
                     "c_date",
                     "c_datetime",
+                    "c_time",
                     "c_timestamp",
                     "c_tinyblob",
                     "c_mediumblob",
@@ -278,6 +282,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
                                 String.format("f1_%s", i),
                                 Date.valueOf(LocalDate.now()),
                                 Timestamp.valueOf(LocalDateTime.now()),
+                                Time.valueOf(LocalTime.now()),
                                 new Timestamp(System.currentTimeMillis()),
                                 "test".getBytes(),
                                 "test".getBytes(),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
index 89309310bd..851af7b3f5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
@@ -50,12 +50,12 @@ sink {
                                                 c_mediumint, 
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
                                                 c_decimal, c_decimal_unsigned, 
c_float, c_float_unsigned, c_double, c_double_unsigned,
                                                 c_char, c_tinytext, 
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
-                                                c_datetime, c_timestamp, 
c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
+                                                c_datetime, c_time, 
c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
                                                 c_binary, c_year, 
c_int_unsigned, 
c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
-                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?);"""
+                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?);"""
     properties {
      useSSL=false
      rewriteBatchedStatements=true
     }
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
index c393e69cee..5bec8bc546 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
@@ -49,8 +49,8 @@ sink {
                                                 c_mediumint, 
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
                                                 c_decimal, c_decimal_unsigned, 
c_float, c_float_unsigned, c_double, c_double_unsigned,
                                                 c_char, c_tinytext, 
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
-                                                c_datetime, c_timestamp, 
c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
+                                                c_datetime, c_time, 
c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
                                                 c_binary, c_year, 
c_int_unsigned, 
c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
-                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?);"""
+                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?);"""
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
index 0460ccdf3b..1b092f1e91 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -50,8 +50,8 @@ sink {
                                                 c_mediumint, 
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
                                                 c_decimal, c_decimal_unsigned, 
c_float, c_float_unsigned, c_double, c_double_unsigned,
                                                 c_char, c_tinytext, 
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
-                                                c_datetime, c_timestamp, 
c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
+                                                c_datetime, c_time, 
c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
                                                 c_binary, c_year, 
c_int_unsigned, 
c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
-                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?);"""
+                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?);"""
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf
index 810f6c5076..cfdf7691d0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf
@@ -50,9 +50,9 @@ sink {
                                                 c_mediumint, 
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
                                                 c_decimal, c_decimal_unsigned, 
c_float, c_float_unsigned, c_double, c_double_unsigned,
                                                 c_char, c_tinytext, 
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
-                                                c_datetime, c_timestamp, 
c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
+                                                c_datetime, c_time, 
c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
                                                 c_binary, c_year, 
c_int_unsigned, c_integer_unsigned)
-                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
+                   values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
 
     # Non-root users need to grant XA_RECOVER_ADMIN permission on 
is_exactly_once = "true"
     is_exactly_once = "true"
@@ -65,4 +65,4 @@ sink {
       rewriteBatchedStatements=true
     }
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
index 78e21280f0..33a5a53150 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf
@@ -31,6 +31,7 @@ source {
         id = "int"
         name = "string"
         age = "int"
+        c_time = "time"
         c_timestamp = "timestamp"
         c_date = "date"
         c_map = "map<string, string>"
@@ -51,7 +52,7 @@ transform {
     source_table_name = "fake"
     result_table_name = "fake1"
     # the query table name must same as field 'source_table_name'
-    query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, 
pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake"
+    query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, 
pi() as pi, c_time, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from 
fake"
   }
   # The SQL transform support base function and criteria operation
   # But the complex SQL unsupported yet, include: multi source table/rows JOIN 
and AGGREGATE operation and the like
@@ -116,6 +117,15 @@ sink {
               }
             ]
           },
+          {
+            field_name = c_time
+            field_type = time
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
           {
             field_name = c_timestamp
             field_type = timestamp
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
index 7f8fadc4e4..79269ecf8e 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java
@@ -57,6 +57,7 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -87,9 +88,7 @@ public final class InternalRowConverter extends 
RowConverter<InternalRow> {
             case DATE:
                 return (int) ((LocalDate) field).toEpochDay();
             case TIME:
-                // TODO: Support TIME Type
-                throw new RuntimeException(
-                        "time type is not supported now, but will be supported 
in the future.");
+                return ((LocalTime) field).toNanoOfDay();
             case TIMESTAMP:
                 return InstantConverterUtils.toEpochMicro(
                         Timestamp.valueOf((LocalDateTime) field).toInstant());
@@ -202,6 +201,7 @@ public final class InternalRowConverter extends 
RowConverter<InternalRow> {
             case DATE:
                 return new MutableInt();
             case BIGINT:
+            case TIME:
             case TIMESTAMP:
                 return new MutableLong();
             case FLOAT:
@@ -231,9 +231,10 @@ public final class InternalRowConverter extends 
RowConverter<InternalRow> {
                 }
                 return LocalDate.ofEpochDay((int) field);
             case TIME:
-                // TODO: Support TIME Type
-                throw new RuntimeException(
-                        "SeaTunnel not support time type, it will be supported 
in the future.");
+                if (field instanceof Timestamp) {
+                    return LocalTime.ofNanoOfDay(((Timestamp) 
field).getNanos());
+                }
+                return LocalTime.ofNanoOfDay((long) field);
             case TIMESTAMP:
                 if (field instanceof Timestamp) {
                     return ((Timestamp) field).toLocalDateTime();
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
index 15357204cd..000e0baa06 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
@@ -36,7 +36,6 @@ import scala.collection.mutable.WrappedArray;
 
 import java.io.IOException;
 import java.sql.Date;
-import java.sql.Time;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -51,6 +50,7 @@ public class SeaTunnelRowConverter extends 
RowConverter<SeaTunnelRow> {
         super(dataType);
     }
 
+    // SeaTunnelRow To GenericRow
     @Override
     public SeaTunnelRow convert(SeaTunnelRow seaTunnelRow) throws IOException {
         validate(seaTunnelRow);
@@ -75,7 +75,12 @@ public class SeaTunnelRowConverter extends 
RowConverter<SeaTunnelRow> {
             case TIMESTAMP:
                 return Timestamp.valueOf((LocalDateTime) field);
             case TIME:
-                return Time.valueOf((LocalTime) field);
+                if (field instanceof LocalTime) {
+                    return ((LocalTime) field).toNanoOfDay();
+                }
+                if (field instanceof Long) {
+                    return field;
+                }
             case STRING:
                 return field.toString();
             case MAP:
@@ -145,6 +150,7 @@ public class SeaTunnelRowConverter extends 
RowConverter<SeaTunnelRow> {
         return new WrappedArray.ofRef<>(arrayData);
     }
 
+    // GenericRow To SeaTunnel
     @Override
     public SeaTunnelRow reconvert(SeaTunnelRow engineRow) throws IOException {
         return (SeaTunnelRow) reconvert(engineRow, dataType);
@@ -166,7 +172,10 @@ public class SeaTunnelRowConverter extends 
RowConverter<SeaTunnelRow> {
             case TIMESTAMP:
                 return ((Timestamp) field).toLocalDateTime();
             case TIME:
-                return ((Time) field).toLocalTime();
+                if (field instanceof Timestamp) {
+                    return ((Timestamp) field).toLocalDateTime().toLocalTime();
+                }
+                return LocalTime.ofNanoOfDay((Long) field);
             case STRING:
                 return field.toString();
             case MAP:
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
index 72fba02382..a4dbcd0c72 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
@@ -25,10 +25,12 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
 
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.MetadataBuilder;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
@@ -38,9 +40,11 @@ import java.util.Map;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 public class TypeConverterUtils {
+
     private static final Map<DataType, SeaTunnelDataType<?>> 
TO_SEA_TUNNEL_TYPES =
             new HashMap<>(16);
     public static final String ROW_KIND_FIELD = "op";
+    public static final String LOGICAL_TIME_TYPE_FLAG = "logical_time_type";
 
     static {
         TO_SEA_TUNNEL_TYPES.put(DataTypes.NullType, BasicType.VOID_TYPE);
@@ -87,8 +91,8 @@ public class TypeConverterUtils {
                 return DataTypes.BinaryType;
             case DATE:
                 return DataTypes.DateType;
-                // case TIME:
-                // TODO: not support now, how reconvert?
+            case TIME:
+                return DataTypes.LongType;
             case TIMESTAMP:
                 return DataTypes.TimestampType;
             case ARRAY:
@@ -113,12 +117,14 @@ public class TypeConverterUtils {
         // TODO: row kind
         StructField[] fields = new StructField[rowType.getFieldNames().length];
         for (int i = 0; i < rowType.getFieldNames().length; i++) {
+            SeaTunnelDataType<?> fieldType = rowType.getFieldTypes()[i];
+            Metadata metadata =
+                    fieldType.getSqlType() == SqlType.TIME
+                            ? new 
MetadataBuilder().putBoolean(LOGICAL_TIME_TYPE_FLAG, true).build()
+                            : Metadata.empty();
+
             fields[i] =
-                    new StructField(
-                            rowType.getFieldNames()[i],
-                            convert(rowType.getFieldTypes()[i]),
-                            true,
-                            Metadata.empty());
+                    new StructField(rowType.getFieldNames()[i], 
convert(fieldType), true, metadata);
         }
         return new StructType(fields);
     }
@@ -178,7 +184,14 @@ public class TypeConverterUtils {
         SeaTunnelDataType<?>[] fieldTypes = new 
SeaTunnelDataType[structFields.length];
         for (int i = 0; i < structFields.length; i++) {
             fieldNames[i] = structFields[i].name();
-            fieldTypes[i] = convert(structFields[i].dataType());
+            Metadata metadata = structFields[i].metadata();
+            if (metadata != null
+                    && metadata.contains(LOGICAL_TIME_TYPE_FLAG)
+                    && metadata.getBoolean(LOGICAL_TIME_TYPE_FLAG)) {
+                fieldTypes[i] = LocalTimeType.LOCAL_TIME_TYPE;
+            } else {
+                fieldTypes[i] = convert(structFields[i].dataType());
+            }
         }
         return new SeaTunnelRowType(fieldNames, fieldTypes);
     }


Reply via email to