This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 15ddb221 [FLINK-28560] Support Spark 3.3 profile for SparkSource
15ddb221 is described below

commit 15ddb22174fb94bd36b176f0eb487f5a1b39d443
Author: Nicholas Jiang <programg...@163.com>
AuthorDate: Fri Jul 15 15:02:59 2022 +0800

    [FLINK-28560] Support Spark 3.3 profile for SparkSource
    
    This closes #217
---
 docs/content/docs/engines/overview.md              |  1 +
 flink-table-store-spark/pom.xml                    |  6 ++++
 .../flink/table/store/spark/SparkCatalog.java      | 28 +++++++++++++++++-
 .../flink/table/store/spark/SparkTypeTest.java     | 34 +++++++++++-----------
 4 files changed, 51 insertions(+), 18 deletions(-)

diff --git a/docs/content/docs/engines/overview.md 
b/docs/content/docs/engines/overview.md
index dc4a3f1c..0bbc14ad 100644
--- a/docs/content/docs/engines/overview.md
+++ b/docs/content/docs/engines/overview.md
@@ -41,5 +41,6 @@ Apache Hive and Apache Spark.
 | Spark     | 3.0      | read                                                 
| Projection, Filter |
 | Spark     | 3.1      | read                                                 
| Projection, Filter |
 | Spark     | 3.2      | read                                                 
| Projection, Filter |
+| Spark     | 3.3      | read                                                 
| Projection, Filter |
 | Trino     | 358      | read                                                 
| Projection, Filter |
 | Trino     | 388      | read                                                 
| Projection, Filter |
\ No newline at end of file
diff --git a/flink-table-store-spark/pom.xml b/flink-table-store-spark/pom.xml
index 66609af6..1a29f56a 100644
--- a/flink-table-store-spark/pom.xml
+++ b/flink-table-store-spark/pom.xml
@@ -76,6 +76,12 @@ under the License.
 
     <!-- Activate these profiles with -Pspark-x.x to build and test against 
different Spark versions -->
     <profiles>
+        <profile>
+            <id>spark-3.3</id>
+            <properties>
+                <spark.version>3.3.0</spark.version>
+            </properties>
+        </profile>
         <profile>
             <id>spark-3.2</id>
             <properties>
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index b53cb33f..65f1387b 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -185,8 +185,34 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
         throw new UnsupportedOperationException("Alter namespace in Spark is 
not supported yet.");
     }
 
-    @Override
+    /**
+     * Drop a namespace from the catalog, recursively dropping all objects 
within the namespace.
+     * This interface implementation only supports the Spark 3.0, 3.1 and 3.2.
+     *
+     * <p>If the catalog implementation does not support this operation, it 
may throw {@link
+     * UnsupportedOperationException}.
+     *
+     * @param namespace a multi-part namespace
+     * @return true if the namespace was dropped
+     * @throws UnsupportedOperationException If drop is not a supported 
operation
+     */
     public boolean dropNamespace(String[] namespace) {
+        return dropNamespace(namespace, true);
+    }
+
+    /**
+     * Drop a namespace from the catalog with cascade mode, recursively 
dropping all objects within
+     * the namespace if cascade is true. This interface implementation 
supports the Spark 3.3+.
+     *
+     * <p>If the catalog implementation does not support this operation, it 
may throw {@link
+     * UnsupportedOperationException}.
+     *
+     * @param namespace a multi-part namespace
+     * @param cascade When true, deletes all objects under the namespace
+     * @return true if the namespace was dropped
+     * @throws UnsupportedOperationException If drop is not a supported 
operation
+     */
+    public boolean dropNamespace(String[] namespace, boolean cascade) {
         throw new UnsupportedOperationException("Drop namespace in Spark is 
not supported yet.");
     }
 
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
index abae4e2d..8ccea4f8 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
@@ -73,29 +73,29 @@ public class SparkTypeTest {
         String nestedRowMapType =
                 "StructField(locations,MapType("
                         + "StringType,"
-                        + "StructType(StructField(posX,DoubleType,false), 
StructField(posY,DoubleType,false)),true),true)";
+                        + 
"StructType(StructField(posX,DoubleType,false),StructField(posY,DoubleType,false)),true),true)";
         String expected =
                 "StructType("
-                        + "StructField(id,IntegerType,false), "
-                        + "StructField(name,StringType,true), "
-                        + "StructField(salary,DoubleType,false), "
+                        + "StructField(id,IntegerType,false),"
+                        + "StructField(name,StringType,true),"
+                        + "StructField(salary,DoubleType,false),"
                         + nestedRowMapType
-                        + ", "
-                        + 
"StructField(strArray,ArrayType(StringType,true),true), "
-                        + 
"StructField(intArray,ArrayType(IntegerType,true),true), "
-                        + "StructField(boolean,BooleanType,true), "
-                        + "StructField(tinyint,ByteType,true), "
-                        + "StructField(smallint,ShortType,true), "
-                        + "StructField(bigint,LongType,true), "
-                        + "StructField(bytes,BinaryType,true), "
-                        + "StructField(timestamp,TimestampType,true), "
-                        + "StructField(date,DateType,true), "
-                        + "StructField(decimal,DecimalType(2,2),true), "
-                        + "StructField(decimal2,DecimalType(38,2),true), "
+                        + ","
+                        + 
"StructField(strArray,ArrayType(StringType,true),true),"
+                        + 
"StructField(intArray,ArrayType(IntegerType,true),true),"
+                        + "StructField(boolean,BooleanType,true),"
+                        + "StructField(tinyint,ByteType,true),"
+                        + "StructField(smallint,ShortType,true),"
+                        + "StructField(bigint,LongType,true),"
+                        + "StructField(bytes,BinaryType,true),"
+                        + "StructField(timestamp,TimestampType,true),"
+                        + "StructField(date,DateType,true),"
+                        + "StructField(decimal,DecimalType(2,2),true),"
+                        + "StructField(decimal2,DecimalType(38,2),true),"
                         + "StructField(decimal3,DecimalType(10,1),true))";
 
         StructType sparkType = fromFlinkRowType(ALL_TYPES);
-        assertThat(sparkType.toString()).isEqualTo(expected);
+        assertThat(sparkType.toString().replace(", ", 
",")).isEqualTo(expected);
 
         assertThat(toFlinkType(sparkType)).isEqualTo(ALL_TYPES);
     }

Reply via email to