This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new aeeed9a4 [FLINK-30820] Support varchar/char in spark aeeed9a4 is described below commit aeeed9a490ae35589662e96b035fc732fdf4364a Author: shammon <zjur...@gmail.com> AuthorDate: Tue Jan 31 12:20:40 2023 +0800 [FLINK-30820] Support varchar/char in spark This closes #490 --- .../apache/flink/table/store/spark/SparkTypeUtils.java | 4 ++++ .../flink/table/store/spark/SparkReadITCase.java | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java index 6dc51c2b..d4fb0a44 100644 --- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java +++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java @@ -279,6 +279,10 @@ public class SparkTypeUtils { return new DoubleType(); } else if (atomic instanceof org.apache.spark.sql.types.StringType) { return new VarCharType(VarCharType.MAX_LENGTH); + } else if (atomic instanceof org.apache.spark.sql.types.VarcharType) { + return new VarCharType(((org.apache.spark.sql.types.VarcharType) atomic).length()); + } else if (atomic instanceof org.apache.spark.sql.types.CharType) { + return new CharType(((org.apache.spark.sql.types.CharType) atomic).length()); } else if (atomic instanceof org.apache.spark.sql.types.DateType) { return new DateType(); } else if (atomic instanceof org.apache.spark.sql.types.TimestampType) { diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java index 3639bd86..c70e246c 100644 --- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java @@ -112,6 +112,24 @@ public class SparkReadITCase extends SparkReadTestBase { .isEqualTo("[[tablestore,default]]"); } + @Test + public void testCreateTable() { + spark.sql( + "CREATE TABLE tablestore.default.testCreateTable(\n" + + "a BIGINT,\n" + + "b VARCHAR(10),\n" + + "c CHAR(10))"); + assertThat( + spark.sql("SELECT fields FROM tablestore.default.`testCreateTable$schemas`") + .collectAsList() + .toString()) + .isEqualTo( + "[[[" + + "{\"id\":0,\"name\":\"a\",\"type\":\"BIGINT\"}," + + "{\"id\":1,\"name\":\"b\",\"type\":\"VARCHAR(10)\"}," + + "{\"id\":2,\"name\":\"c\",\"type\":\"CHAR(10)\"}]]]"); + } + @Test public void testCreateTableWithNullablePk() { spark.sql("USE tablestore");