This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a3cecf5d3f [flink] Use Paimon format table read for flink (#6246)
a3cecf5d3f is described below

commit a3cecf5d3f06e408b05bfe77971d9b99203de4d4
Author: jerry <[email protected]>
AuthorDate: Sat Sep 13 16:37:15 2025 +0800

    [flink] Use Paimon format table read for flink (#6246)
---
 .../paimon/flink/AbstractFlinkTableFactory.java    |  41 ++++----
 .../org/apache/paimon/flink/FlinkTableFactory.java |   4 -
 .../apache/paimon/flink/FormatCatalogTable.java    |  12 ---
 .../apache/paimon/flink/RESTCatalogITCaseBase.java |   2 +-
 .../flink/source/FormatTableReadITCaseBase.java    | 113 +++++++++++++++++++++
 5 files changed, 136 insertions(+), 36 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 7ff5b31c99..871b1a657f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -206,32 +206,35 @@ public abstract class AbstractFlinkTableFactory
         // dynamic options should override origin options
         newOptions.putAll(dynamicOptions);
 
-        FileStoreTable fileStoreTable;
-        if (origin instanceof DataCatalogTable) {
-            fileStoreTable = (FileStoreTable) ((DataCatalogTable) 
origin).table();
-        } else if (flinkCatalog == null) {
-            // In case Paimon is directly used as a Flink connector, instead 
of through catalog.
-            fileStoreTable = 
FileStoreTableFactory.create(createCatalogContext(context));
+        Table table;
+        if (origin instanceof FormatCatalogTable) {
+            table = ((FormatCatalogTable) origin).table();
         } else {
-            // In cases like materialized table, the Paimon table might not be 
DataCatalogTable,
-            // but can still be acquired through the catalog.
-            Identifier identifier =
-                    Identifier.create(
-                            context.getObjectIdentifier().getDatabaseName(),
-                            context.getObjectIdentifier().getObjectName());
-            try {
-                fileStoreTable = (FileStoreTable) 
flinkCatalog.catalog().getTable(identifier);
-            } catch (Catalog.TableNotExistException e) {
-                throw new RuntimeException(e);
+            FileStoreTable fileStoreTable;
+            if (origin instanceof DataCatalogTable) {
+                fileStoreTable = (FileStoreTable) ((DataCatalogTable) 
origin).table();
+            } else if (flinkCatalog == null) {
+                // In case Paimon is directly used as a Flink connector, 
instead of through catalog.
+                fileStoreTable = 
FileStoreTableFactory.create(createCatalogContext(context));
+            } else {
+                // In cases like materialized table, the Paimon table might 
not be DataCatalogTable,
+                // but can still be acquired through the catalog.
+                Identifier identifier =
+                        Identifier.create(
+                                
context.getObjectIdentifier().getDatabaseName(),
+                                context.getObjectIdentifier().getObjectName());
+                try {
+                    fileStoreTable = (FileStoreTable) 
flinkCatalog.catalog().getTable(identifier);
+                } catch (Catalog.TableNotExistException e) {
+                    throw new RuntimeException(e);
+                }
             }
+            table = fileStoreTable.copyWithoutTimeTravel(newOptions);
         }
-        FileStoreTable table = 
fileStoreTable.copyWithoutTimeTravel(newOptions);
-
         if 
(Options.fromMap(table.options()).get(FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED)) {
             Map<String, String> runtimeContext = getAllOptions(context);
             table.fileIO().setRuntimeContext(runtimeContext);
         }
-
         // notice that the Paimon table schema must be the same with the 
Flink's
         Schema schema = 
FlinkCatalog.fromCatalogTable(context.getCatalogTable());
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
index d5c1ed043b..bd24ef7dac 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
@@ -52,10 +52,6 @@ public class FlinkTableFactory extends 
AbstractFlinkTableFactory {
 
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
-        CatalogTable table = context.getCatalogTable().getOrigin();
-        if (table instanceof FormatCatalogTable) {
-            return ((FormatCatalogTable) table).createTableSource(context);
-        }
         createTableIfNeeded(context);
         return super.createDynamicTableSource(context);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
index 34df2f566d..7ee09b27ec 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
@@ -23,7 +23,6 @@ import org.apache.paimon.table.FormatTable;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.logical.RowType;
@@ -128,17 +127,6 @@ public class FormatCatalogTable implements CatalogTable {
         return getDescription();
     }
 
-    public DynamicTableSource createTableSource(DynamicTableFactory.Context 
context) {
-        return FactoryUtil.createDynamicTableSource(
-                null,
-                context.getObjectIdentifier(),
-                context.getCatalogTable(),
-                new HashMap<>(),
-                context.getConfiguration(),
-                context.getClassLoader(),
-                context.isTemporary());
-    }
-
     public DynamicTableSink createTableSink(DynamicTableFactory.Context 
context) {
         return FactoryUtil.createDynamicTableSink(
                 null,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
index 7da91520dd..adfa219a65 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCaseBase.java
@@ -50,7 +50,7 @@ public abstract class RESTCatalogITCaseBase extends 
CatalogITCaseBase {
     protected RESTCatalogServer restCatalogServer;
 
     private String serverUrl;
-    private String dataPath;
+    protected String dataPath;
     protected String warehouse;
     @TempDir java.nio.file.Path tempFile;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableReadITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableReadITCaseBase.java
new file mode 100644
index 0000000000..2cf159cfb3
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableReadITCaseBase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.RESTCatalogITCaseBase;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SupportsDirectWrite;
+import org.apache.paimon.format.parquet.ParquetFileFormatFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.ResolvingFileIO;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTToken;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for format table. */
+public class FormatTableReadITCaseBase extends RESTCatalogITCaseBase {
+
+    @Test
+    public void testParquetFileFormat() throws IOException {
+        FileFormatFactory formatFactory = new ParquetFileFormatFactory();
+        RowType rowType =
+                RowType.builder()
+                        .field("a", DataTypes.INT())
+                        .field("b", DataTypes.INT())
+                        .field("c", DataTypes.INT())
+                        .build();
+        FormatWriterFactory factory =
+                (formatFactory.create(
+                                new FileFormatFactory.FormatContext(new 
Options(), 1024, 1024)))
+                        .createWriterFactory(rowType);
+        InternalRow[] datas = new InternalRow[2];
+        datas[0] = GenericRow.of(1, 1, 1);
+        datas[1] = GenericRow.of(2, 2, 2);
+        String tableName = "format_table_test";
+        sql(
+                "CREATE TABLE %s (a INT, b INT, c INT) WITH 
('file.format'='parquet', 'type'='format-table')",
+                tableName);
+        write(
+                factory,
+                new Path(dataPath, 
String.format("default.db/%s/data-1.parquet", tableName)),
+                datas);
+        RESTToken expiredDataToken =
+                new RESTToken(
+                        ImmutableMap.of(
+                                "akId", "akId-expire", "akSecret", 
UUID.randomUUID().toString()),
+                        System.currentTimeMillis() + 1000_000);
+        Identifier identifier = Identifier.create("default", tableName);
+        restCatalogServer.setDataToken(identifier, expiredDataToken);
+        assertThat(sql("SELECT a FROM %s", tableName))
+                .containsExactlyInAnyOrder(Row.of(1), Row.of(2));
+        sql("Drop TABLE %s", tableName);
+    }
+
+    protected void write(FormatWriterFactory factory, Path file, 
InternalRow... rows)
+            throws IOException {
+        FileIO fileIO = new ResolvingFileIO();
+        Options catalogOptions = new Options();
+        catalogOptions.set(CatalogOptions.WAREHOUSE, dataPath);
+        CatalogContext catalogContext = CatalogContext.create(catalogOptions);
+        fileIO.configure(catalogContext);
+        FormatWriter writer;
+        PositionOutputStream out = null;
+        if (factory instanceof SupportsDirectWrite) {
+            writer = ((SupportsDirectWrite) factory).create(fileIO, file, 
"zstd");
+        } else {
+            out = fileIO.newOutputStream(file, true);
+            writer = factory.create(out, "zstd");
+        }
+        for (InternalRow row : rows) {
+            writer.addElement(row);
+        }
+        writer.close();
+        if (out != null) {
+            out.close();
+        }
+    }
+}

Reply via email to