This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e99ffdb3 [hotfix] Minor code refactor: avoid casting in SparkWrite
e99ffdb3 is described below

commit e99ffdb3150f3d136fb82f69b611952f66e3e71a
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Thu Dec 1 17:43:28 2022 +0800

    [hotfix] Minor code refactor: avoid casting in SparkWrite
    
    This closes #412
---
 .../apache/flink/table/store/spark/SparkTable.java | 14 +++++++++++---
 .../apache/flink/table/store/spark/SparkWrite.java | 22 +++++++++-------------
 .../flink/table/store/spark/SparkWriteBuilder.java |  6 +++---
 3 files changed, 23 insertions(+), 19 deletions(-)

diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index f5e0dc60..426c7476 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -96,7 +96,7 @@ public class SparkTable
 
     @Override
     public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
-        return new SparkWriteBuilder(table, info.queryId(), lockFactory);
+        return new SparkWriteBuilder(castToWritable(table), info.queryId(), 
lockFactory);
     }
 
     @Override
@@ -112,7 +112,15 @@ public class SparkTable
         }
 
         String commitUser = UUID.randomUUID().toString();
-        ((org.apache.flink.table.store.table.SupportsWrite) table)
-                .deleteWhere(commitUser, predicates, lockFactory);
+        castToWritable(table).deleteWhere(commitUser, predicates, lockFactory);
+    }
+
+    private static org.apache.flink.table.store.table.SupportsWrite 
castToWritable(Table table) {
+        if (!(table instanceof 
org.apache.flink.table.store.table.SupportsWrite)) {
+            throw new UnsupportedOperationException(
+                    "Unsupported table for writing: " + table.getClass());
+        }
+
+        return (org.apache.flink.table.store.table.SupportsWrite) table;
     }
 }
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
index 0e85c46a..504dfd7e 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.spark;
 
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.table.SupportsWrite;
-import org.apache.flink.table.store.table.Table;
 import org.apache.flink.table.store.table.sink.BucketComputer;
 import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.SerializableCommittable;
@@ -41,14 +40,11 @@ import java.util.stream.Collectors;
 /** Spark {@link V1Write}, it is required to use v1 write for grouping by 
bucket. */
 public class SparkWrite implements V1Write {
 
-    private final Table table;
+    private final SupportsWrite table;
     private final String queryId;
     private final Lock.Factory lockFactory;
 
-    public SparkWrite(Table table, String queryId, Lock.Factory lockFactory) {
-        if (!(table instanceof SupportsWrite)) {
-            throw new UnsupportedOperationException("Unsupported table: " + 
table.getClass());
-        }
+    public SparkWrite(SupportsWrite table, String queryId, Lock.Factory 
lockFactory) {
         this.table = table;
         this.queryId = queryId;
         this.lockFactory = lockFactory;
@@ -69,7 +65,7 @@ public class SparkWrite implements V1Write {
                             .values()
                             .reduce(new ListConcat<>());
             try (TableCommit tableCommit =
-                    ((SupportsWrite) 
table).newCommit(queryId).withLock(lockFactory.create())) {
+                    table.newCommit(queryId).withLock(lockFactory.create())) {
                 tableCommit.commit(
                         identifier,
                         committables.stream()
@@ -83,19 +79,19 @@ public class SparkWrite implements V1Write {
 
     private static class ComputeBucket implements Function<Row, Integer> {
 
-        private final Table table;
+        private final SupportsWrite table;
         private final RowType type;
 
         private transient BucketComputer lazyComputer;
 
-        private ComputeBucket(Table table) {
+        private ComputeBucket(SupportsWrite table) {
             this.table = table;
             this.type = table.rowType();
         }
 
         private BucketComputer computer() {
             if (lazyComputer == null) {
-                lazyComputer = ((SupportsWrite) table).bucketComputer();
+                lazyComputer = table.bucketComputer();
             }
             return lazyComputer;
         }
@@ -109,12 +105,12 @@ public class SparkWrite implements V1Write {
     private static class WriteRecords
             implements Function<Iterable<Row>, List<SerializableCommittable>> {
 
-        private final Table table;
+        private final SupportsWrite table;
         private final RowType type;
         private final String queryId;
         private final long commitIdentifier;
 
-        private WriteRecords(Table table, String queryId, long 
commitIdentifier) {
+        private WriteRecords(SupportsWrite table, String queryId, long 
commitIdentifier) {
             this.table = table;
             this.type = table.rowType();
             this.queryId = queryId;
@@ -123,7 +119,7 @@ public class SparkWrite implements V1Write {
 
         @Override
         public List<SerializableCommittable> call(Iterable<Row> iterables) 
throws Exception {
-            try (TableWrite write = ((SupportsWrite) table).newWrite(queryId)) 
{
+            try (TableWrite write = table.newWrite(queryId)) {
                 for (Row row : iterables) {
                     write.write(new SparkRowData(type, row));
                 }
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
index 4f761692..aef473f0 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.table.store.file.operation.Lock;
-import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.store.table.SupportsWrite;
 
 import org.apache.spark.sql.connector.write.Write;
 import org.apache.spark.sql.connector.write.WriteBuilder;
@@ -31,11 +31,11 @@ import org.apache.spark.sql.connector.write.WriteBuilder;
  */
 public class SparkWriteBuilder implements WriteBuilder {
 
-    private final Table table;
+    private final SupportsWrite table;
     private final String queryId;
     private final Lock.Factory lockFactory;
 
-    public SparkWriteBuilder(Table table, String queryId, Lock.Factory 
lockFactory) {
+    public SparkWriteBuilder(SupportsWrite table, String queryId, Lock.Factory 
lockFactory) {
         this.table = table;
         this.queryId = queryId;
         this.lockFactory = lockFactory;

Reply via email to