This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a3cecf5d3f [flink] Use Paimon format table read for flink (#6246)
a3cecf5d3f is described below
commit a3cecf5d3f06e408b05bfe77971d9b99203de4d4
Author: jerry <[email protected]>
AuthorDate: Sat Sep 13 16:37:15 2025 +0800
[flink] Use Paimon format table read for flink (#6246)
---
.../paimon/flink/AbstractFlinkTableFactory.java | 41 ++++----
.../org/apache/paimon/flink/FlinkTableFactory.java | 4 -
.../apache/paimon/flink/FormatCatalogTable.java | 12 ---
.../apache/paimon/flink/RESTCatalogITCaseBase.java | 2 +-
.../flink/source/FormatTableReadITCaseBase.java | 113 +++++++++++++++++++++
5 files changed, 136 insertions(+), 36 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 7ff5b31c99..871b1a657f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -206,32 +206,35 @@ public abstract class AbstractFlinkTableFactory
// dynamic options should override origin options
newOptions.putAll(dynamicOptions);
- FileStoreTable fileStoreTable;
- if (origin instanceof DataCatalogTable) {
- fileStoreTable = (FileStoreTable) ((DataCatalogTable)
origin).table();
- } else if (flinkCatalog == null) {
- // In case Paimon is directly used as a Flink connector, instead
of through catalog.
- fileStoreTable =
FileStoreTableFactory.create(createCatalogContext(context));
+ Table table;
+ if (origin instanceof FormatCatalogTable) {
+ table = ((FormatCatalogTable) origin).table();
} else {
- // In cases like materialized table, the Paimon table might not be
DataCatalogTable,
- // but can still be acquired through the catalog.
- Identifier identifier =
- Identifier.create(
- context.getObjectIdentifier().getDatabaseName(),
- context.getObjectIdentifier().getObjectName());
- try {
- fileStoreTable = (FileStoreTable)
flinkCatalog.catalog().getTable(identifier);
- } catch (Catalog.TableNotExistException e) {
- throw new RuntimeException(e);
+ FileStoreTable fileStoreTable;
+ if (origin instanceof DataCatalogTable) {
+ fileStoreTable = (FileStoreTable) ((DataCatalogTable)
origin).table();
+ } else if (flinkCatalog == null) {
+ // In case Paimon is directly used as a Flink connector,
instead of through catalog.
+ fileStoreTable =
FileStoreTableFactory.create(createCatalogContext(context));
+ } else {
+ // In cases like materialized table, the Paimon table might
not be DataCatalogTable,
+ // but can still be acquired through the catalog.
+ Identifier identifier =
+ Identifier.create(
+
context.getObjectIdentifier().getDatabaseName(),
+ context.getObjectIdentifier().getObjectName());
+ try {
+ fileStoreTable = (FileStoreTable)
flinkCatalog.catalog().getTable(identifier);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
}
+ table = fileStoreTable.copyWithoutTimeTravel(newOptions);
}
- FileStoreTable table =
fileStoreTable.copyWithoutTimeTravel(newOptions);
-
if
(Options.fromMap(table.options()).get(FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED)) {
Map<String, String> runtimeContext = getAllOptions(context);
table.fileIO().setRuntimeContext(runtimeContext);
}
-
// notice that the Paimon table schema must be the same with the
Flink's
Schema schema =
FlinkCatalog.fromCatalogTable(context.getCatalogTable());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
index d5c1ed043b..bd24ef7dac 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
@@ -52,10 +52,6 @@ public class FlinkTableFactory extends
AbstractFlinkTableFactory {
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
- CatalogTable table = context.getCatalogTable().getOrigin();
- if (table instanceof FormatCatalogTable) {
- return ((FormatCatalogTable) table).createTableSource(context);
- }
createTableIfNeeded(context);
return super.createDynamicTableSource(context);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
index 34df2f566d..7ee09b27ec 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
@@ -23,7 +23,6 @@ import org.apache.paimon.table.FormatTable;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.logical.RowType;
@@ -128,17 +127,6 @@ public class FormatCatalogTable implements CatalogTable {
return getDescription();
}
- public DynamicTableSource createTableSource(DynamicTableFactory.Context
context) {
- return FactoryUtil.createDynamicTableSource(
- null,
- context.getObjectIdentifier(),
- context.getCatalogTable(),
- new HashMap<>(),
- context.getConfiguration(),
- context.getClassLoader(),
- context.isTemporary());
- }
-
public DynamicTableSink createTableSink(DynamicTableFactory.Context
context) {
return FactoryUtil.createDynamicTableSink(
null,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
index 7da91520dd..adfa219a65 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
@@ -50,7 +50,7 @@ public abstract class RESTCatalogITCaseBase extends
CatalogITCaseBase {
protected RESTCatalogServer restCatalogServer;
private String serverUrl;
- private String dataPath;
+ protected String dataPath;
protected String warehouse;
@TempDir java.nio.file.Path tempFile;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableReadITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableReadITCaseBase.java
new file mode 100644
index 0000000000..2cf159cfb3
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableReadITCaseBase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.RESTCatalogITCaseBase;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SupportsDirectWrite;
+import org.apache.paimon.format.parquet.ParquetFileFormatFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.ResolvingFileIO;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTToken;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for format table. */
+public class FormatTableReadITCaseBase extends RESTCatalogITCaseBase {
+
+ @Test
+ public void testParquetFileFormat() throws IOException {
+ FileFormatFactory formatFactory = new ParquetFileFormatFactory();
+ RowType rowType =
+ RowType.builder()
+ .field("a", DataTypes.INT())
+ .field("b", DataTypes.INT())
+ .field("c", DataTypes.INT())
+ .build();
+ FormatWriterFactory factory =
+ (formatFactory.create(
+ new FileFormatFactory.FormatContext(new
Options(), 1024, 1024)))
+ .createWriterFactory(rowType);
+ InternalRow[] datas = new InternalRow[2];
+ datas[0] = GenericRow.of(1, 1, 1);
+ datas[1] = GenericRow.of(2, 2, 2);
+ String tableName = "format_table_test";
+ sql(
+ "CREATE TABLE %s (a INT, b INT, c INT) WITH
('file.format'='parquet', 'type'='format-table')",
+ tableName);
+ write(
+ factory,
+ new Path(dataPath,
String.format("default.db/%s/data-1.parquet", tableName)),
+ datas);
+ RESTToken expiredDataToken =
+ new RESTToken(
+ ImmutableMap.of(
+ "akId", "akId-expire", "akSecret",
UUID.randomUUID().toString()),
+ System.currentTimeMillis() + 1000_000);
+ Identifier identifier = Identifier.create("default", tableName);
+ restCatalogServer.setDataToken(identifier, expiredDataToken);
+ assertThat(sql("SELECT a FROM %s", tableName))
+ .containsExactlyInAnyOrder(Row.of(1), Row.of(2));
+ sql("Drop TABLE %s", tableName);
+ }
+
+ protected void write(FormatWriterFactory factory, Path file,
InternalRow... rows)
+ throws IOException {
+ FileIO fileIO = new ResolvingFileIO();
+ Options catalogOptions = new Options();
+ catalogOptions.set(CatalogOptions.WAREHOUSE, dataPath);
+ CatalogContext catalogContext = CatalogContext.create(catalogOptions);
+ fileIO.configure(catalogContext);
+ FormatWriter writer;
+ PositionOutputStream out = null;
+ if (factory instanceof SupportsDirectWrite) {
+ writer = ((SupportsDirectWrite) factory).create(fileIO, file,
"zstd");
+ } else {
+ out = fileIO.newOutputStream(file, true);
+ writer = factory.create(out, "zstd");
+ }
+ for (InternalRow row : rows) {
+ writer.addElement(row);
+ }
+ writer.close();
+ if (out != null) {
+ out.close();
+ }
+ }
+}