This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 15ddb221 [FLINK-28560] Support Spark 3.3 profile for SparkSource 15ddb221 is described below commit 15ddb22174fb94bd36b176f0eb487f5a1b39d443 Author: Nicholas Jiang <programg...@163.com> AuthorDate: Fri Jul 15 15:02:59 2022 +0800 [FLINK-28560] Support Spark 3.3 profile for SparkSource This closes #217 --- docs/content/docs/engines/overview.md | 1 + flink-table-store-spark/pom.xml | 6 ++++ .../flink/table/store/spark/SparkCatalog.java | 28 +++++++++++++++++- .../flink/table/store/spark/SparkTypeTest.java | 34 +++++++++++----------- 4 files changed, 51 insertions(+), 18 deletions(-) diff --git a/docs/content/docs/engines/overview.md b/docs/content/docs/engines/overview.md index dc4a3f1c..0bbc14ad 100644 --- a/docs/content/docs/engines/overview.md +++ b/docs/content/docs/engines/overview.md @@ -41,5 +41,6 @@ Apache Hive and Apache Spark. | Spark | 3.0 | read | Projection, Filter | | Spark | 3.1 | read | Projection, Filter | | Spark | 3.2 | read | Projection, Filter | +| Spark | 3.3 | read | Projection, Filter | | Trino | 358 | read | Projection, Filter | | Trino | 388 | read | Projection, Filter | \ No newline at end of file diff --git a/flink-table-store-spark/pom.xml b/flink-table-store-spark/pom.xml index 66609af6..1a29f56a 100644 --- a/flink-table-store-spark/pom.xml +++ b/flink-table-store-spark/pom.xml @@ -76,6 +76,12 @@ under the License. <!-- Activate these profiles with -Pspark-x.x to build and test against different Spark versions --> <profiles> + <profile> + <id>spark-3.3</id> + <properties> + <spark.version>3.3.0</spark.version> + </properties> + </profile> <profile> <id>spark-3.2</id> <properties> diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java index b53cb33f..65f1387b 100644 --- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java +++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java @@ -185,8 +185,34 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces { throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet."); } - @Override + /** + * Drop a namespace from the catalog, recursively dropping all objects within the namespace. + * This interface implementation only supports the Spark 3.0, 3.1 and 3.2. + * + * <p>If the catalog implementation does not support this operation, it may throw {@link + * UnsupportedOperationException}. + * + * @param namespace a multi-part namespace + * @return true if the namespace was dropped + * @throws UnsupportedOperationException If drop is not a supported operation + */ public boolean dropNamespace(String[] namespace) { + return dropNamespace(namespace, true); + } + + /** + * Drop a namespace from the catalog with cascade mode, recursively dropping all objects within + * the namespace if cascade is true. This interface implementation supports the Spark 3.3+. + * + * <p>If the catalog implementation does not support this operation, it may throw {@link + * UnsupportedOperationException}. + * + * @param namespace a multi-part namespace + * @param cascade When true, deletes all objects under the namespace + * @return true if the namespace was dropped + * @throws UnsupportedOperationException If drop is not a supported operation + */ + public boolean dropNamespace(String[] namespace, boolean cascade) { throw new UnsupportedOperationException("Drop namespace in Spark is not supported yet."); } diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java index abae4e2d..8ccea4f8 100644 --- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java @@ -73,29 +73,29 @@ public class SparkTypeTest { String nestedRowMapType = "StructField(locations,MapType(" + "StringType," - + "StructType(StructField(posX,DoubleType,false), StructField(posY,DoubleType,false)),true),true)"; + + "StructType(StructField(posX,DoubleType,false),StructField(posY,DoubleType,false)),true),true)"; String expected = "StructType(" - + "StructField(id,IntegerType,false), " - + "StructField(name,StringType,true), " - + "StructField(salary,DoubleType,false), " + + "StructField(id,IntegerType,false)," + + "StructField(name,StringType,true)," + + "StructField(salary,DoubleType,false)," + nestedRowMapType - + ", " - + "StructField(strArray,ArrayType(StringType,true),true), " - + "StructField(intArray,ArrayType(IntegerType,true),true), " - + "StructField(boolean,BooleanType,true), " - + "StructField(tinyint,ByteType,true), " - + "StructField(smallint,ShortType,true), " - + "StructField(bigint,LongType,true), " - + "StructField(bytes,BinaryType,true), " - + "StructField(timestamp,TimestampType,true), " - + "StructField(date,DateType,true), " - + "StructField(decimal,DecimalType(2,2),true), " - + "StructField(decimal2,DecimalType(38,2),true), " + + "," + + "StructField(strArray,ArrayType(StringType,true),true)," + + "StructField(intArray,ArrayType(IntegerType,true),true)," + + "StructField(boolean,BooleanType,true)," + + "StructField(tinyint,ByteType,true)," + + "StructField(smallint,ShortType,true)," + + "StructField(bigint,LongType,true)," + + "StructField(bytes,BinaryType,true)," + + "StructField(timestamp,TimestampType,true)," + + "StructField(date,DateType,true)," + + "StructField(decimal,DecimalType(2,2),true)," + + "StructField(decimal2,DecimalType(38,2),true)," + "StructField(decimal3,DecimalType(10,1),true))"; StructType sparkType = fromFlinkRowType(ALL_TYPES); - assertThat(sparkType.toString()).isEqualTo(expected); + assertThat(sparkType.toString().replace(", ", ",")).isEqualTo(expected); assertThat(toFlinkType(sparkType)).isEqualTo(ALL_TYPES); }