This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new aeeed9a4 [FLINK-30820] Support varchar/char in spark
aeeed9a4 is described below

commit aeeed9a490ae35589662e96b035fc732fdf4364a
Author: shammon <zjur...@gmail.com>
AuthorDate: Tue Jan 31 12:20:40 2023 +0800

    [FLINK-30820] Support varchar/char in spark
    
    This closes #490
---
 .../apache/flink/table/store/spark/SparkTypeUtils.java |  4 ++++
 .../flink/table/store/spark/SparkReadITCase.java       | 18 ++++++++++++++++++
 2 files changed, 22 insertions(+)

diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
index 6dc51c2b..d4fb0a44 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
@@ -279,6 +279,10 @@ public class SparkTypeUtils {
                 return new DoubleType();
             } else if (atomic instanceof 
org.apache.spark.sql.types.StringType) {
                 return new VarCharType(VarCharType.MAX_LENGTH);
+            } else if (atomic instanceof 
org.apache.spark.sql.types.VarcharType) {
+                return new 
VarCharType(((org.apache.spark.sql.types.VarcharType) atomic).length());
+            } else if (atomic instanceof org.apache.spark.sql.types.CharType) {
+                return new CharType(((org.apache.spark.sql.types.CharType) 
atomic).length());
             } else if (atomic instanceof org.apache.spark.sql.types.DateType) {
                 return new DateType();
             } else if (atomic instanceof 
org.apache.spark.sql.types.TimestampType) {
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 3639bd86..c70e246c 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -112,6 +112,24 @@ public class SparkReadITCase extends SparkReadTestBase {
                 .isEqualTo("[[tablestore,default]]");
     }
 
+    @Test
+    public void testCreateTable() {
+        spark.sql(
+                "CREATE TABLE tablestore.default.testCreateTable(\n"
+                        + "a BIGINT,\n"
+                        + "b VARCHAR(10),\n"
+                        + "c CHAR(10))");
+        assertThat(
+                        spark.sql("SELECT fields FROM 
tablestore.default.`testCreateTable$schemas`")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo(
+                        "[[["
+                                + 
"{\"id\":0,\"name\":\"a\",\"type\":\"BIGINT\"},"
+                                + 
"{\"id\":1,\"name\":\"b\",\"type\":\"VARCHAR(10)\"},"
+                                + 
"{\"id\":2,\"name\":\"c\",\"type\":\"CHAR(10)\"}]]]");
+    }
+
     @Test
     public void testCreateTableWithNullablePk() {
         spark.sql("USE tablestore");

Reply via email to