This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new e99ffdb3 [hotfix] Minor code refactor: avoid casting in SparkWrite e99ffdb3 is described below commit e99ffdb3150f3d136fb82f69b611952f66e3e71a Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Thu Dec 1 17:43:28 2022 +0800 [hotfix] Minor code refactor: avoid casting in SparkWrite This closes #412 --- .../apache/flink/table/store/spark/SparkTable.java | 14 +++++++++++--- .../apache/flink/table/store/spark/SparkWrite.java | 22 +++++++++------------- .../flink/table/store/spark/SparkWriteBuilder.java | 6 +++--- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java index f5e0dc60..426c7476 100644 --- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java +++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java @@ -96,7 +96,7 @@ public class SparkTable @Override public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { - return new SparkWriteBuilder(table, info.queryId(), lockFactory); + return new SparkWriteBuilder(castToWritable(table), info.queryId(), lockFactory); } @Override @@ -112,7 +112,15 @@ public class SparkTable } String commitUser = UUID.randomUUID().toString(); - ((org.apache.flink.table.store.table.SupportsWrite) table) - .deleteWhere(commitUser, predicates, lockFactory); + castToWritable(table).deleteWhere(commitUser, predicates, lockFactory); + } + + private static org.apache.flink.table.store.table.SupportsWrite castToWritable(Table table) { + if (!(table instanceof org.apache.flink.table.store.table.SupportsWrite)) { + throw new UnsupportedOperationException( + "Unsupported table for writing: " + table.getClass()); + } + + return (org.apache.flink.table.store.table.SupportsWrite) table; } } diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java index 0e85c46a..504dfd7e 100644 --- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java +++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java @@ -20,7 +20,6 @@ package org.apache.flink.table.store.spark; import org.apache.flink.table.store.file.operation.Lock; import org.apache.flink.table.store.table.SupportsWrite; -import org.apache.flink.table.store.table.Table; import org.apache.flink.table.store.table.sink.BucketComputer; import org.apache.flink.table.store.table.sink.FileCommittable; import org.apache.flink.table.store.table.sink.SerializableCommittable; @@ -41,14 +40,11 @@ import java.util.stream.Collectors; /** Spark {@link V1Write}, it is required to use v1 write for grouping by bucket. */ public class SparkWrite implements V1Write { - private final Table table; + private final SupportsWrite table; private final String queryId; private final Lock.Factory lockFactory; - public SparkWrite(Table table, String queryId, Lock.Factory lockFactory) { - if (!(table instanceof SupportsWrite)) { - throw new UnsupportedOperationException("Unsupported table: " + table.getClass()); - } + public SparkWrite(SupportsWrite table, String queryId, Lock.Factory lockFactory) { this.table = table; this.queryId = queryId; this.lockFactory = lockFactory; @@ -69,7 +65,7 @@ public class SparkWrite implements V1Write { .values() .reduce(new ListConcat<>()); try (TableCommit tableCommit = - ((SupportsWrite) table).newCommit(queryId).withLock(lockFactory.create())) { + table.newCommit(queryId).withLock(lockFactory.create())) { tableCommit.commit( identifier, committables.stream() @@ -83,19 +79,19 @@ public class SparkWrite implements V1Write { private static class ComputeBucket implements Function<Row, Integer> { - private final Table table; + private final SupportsWrite table; private final RowType type; private transient BucketComputer lazyComputer; - private ComputeBucket(Table table) { + private ComputeBucket(SupportsWrite table) { this.table = table; this.type = table.rowType(); } private BucketComputer computer() { if (lazyComputer == null) { - lazyComputer = ((SupportsWrite) table).bucketComputer(); + lazyComputer = table.bucketComputer(); } return lazyComputer; } @@ -109,12 +105,12 @@ public class SparkWrite implements V1Write { private static class WriteRecords implements Function<Iterable<Row>, List<SerializableCommittable>> { - private final Table table; + private final SupportsWrite table; private final RowType type; private final String queryId; private final long commitIdentifier; - private WriteRecords(Table table, String queryId, long commitIdentifier) { + private WriteRecords(SupportsWrite table, String queryId, long commitIdentifier) { this.table = table; this.type = table.rowType(); this.queryId = queryId; @@ -123,7 +119,7 @@ public class SparkWrite implements V1Write { @Override public List<SerializableCommittable> call(Iterable<Row> iterables) throws Exception { - try (TableWrite write = ((SupportsWrite) table).newWrite(queryId)) { + try (TableWrite write = table.newWrite(queryId)) { for (Row row : iterables) { write.write(new SparkRowData(type, row)); } diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java index 4f761692..aef473f0 100644 --- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java +++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java @@ -19,7 +19,7 @@ package org.apache.flink.table.store.spark; import org.apache.flink.table.store.file.operation.Lock; -import org.apache.flink.table.store.table.Table; +import org.apache.flink.table.store.table.SupportsWrite; import org.apache.spark.sql.connector.write.Write; import org.apache.spark.sql.connector.write.WriteBuilder; @@ -31,11 +31,11 @@ import org.apache.spark.sql.connector.write.WriteBuilder; */ public class SparkWriteBuilder implements WriteBuilder { - private final Table table; + private final SupportsWrite table; private final String queryId; private final Lock.Factory lockFactory; - public SparkWriteBuilder(Table table, String queryId, Lock.Factory lockFactory) { + public SparkWriteBuilder(SupportsWrite table, String queryId, Lock.Factory lockFactory) { this.table = table; this.queryId = queryId; this.lockFactory = lockFactory;