This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 16a405882c [core] Improve object table for fileIO, Privileged and 
parent_path (#4575)
16a405882c is described below

commit 16a405882cd8f0ca9afbe8566b919aef930144ef
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Nov 22 19:14:47 2024 +0800

    [core] Improve object table for fileIO, Privileged and parent_path (#4575)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java | 46 ++++++++---
 .../apache/paimon/privilege/PrivilegedCatalog.java |  2 +-
 .../paimon/privilege/PrivilegedFileStoreTable.java | 78 ++++++++++--------
 .../paimon/privilege/PrivilegedObjectTable.java    | 92 ++++++++++++++++++++++
 .../apache/paimon/table/FileStoreTableFactory.java | 16 +---
 .../apache/paimon/table/object/ObjectRefresh.java  | 11 +--
 .../apache/paimon/table/object/ObjectTable.java    | 55 ++++++++++---
 .../org/apache/paimon/flink/ObjectTableITCase.java | 27 +++++++
 8 files changed, 252 insertions(+), 75 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index fff593aabb..d3a8d628a2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.TableType;
 import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
@@ -66,6 +67,7 @@ import static 
org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Common implementation of {@link Catalog}. */
 public abstract class AbstractCatalog implements Catalog {
@@ -430,17 +432,39 @@ public abstract class AbstractCatalog implements Catalog {
     protected Table getDataOrFormatTable(Identifier identifier) throws 
TableNotExistException {
         Preconditions.checkArgument(identifier.getSystemTableName() == null);
         TableMeta tableMeta = getDataTableMeta(identifier);
-        return FileStoreTableFactory.create(
-                fileIO,
-                getTableLocation(identifier),
-                tableMeta.schema,
-                new CatalogEnvironment(
-                        identifier,
-                        tableMeta.uuid,
-                        Lock.factory(
-                                lockFactory().orElse(null), 
lockContext().orElse(null), identifier),
-                        metastoreClientFactory(identifier, 
tableMeta.schema).orElse(null),
-                        lineageMetaFactory));
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        fileIO,
+                        getTableLocation(identifier),
+                        tableMeta.schema,
+                        new CatalogEnvironment(
+                                identifier,
+                                tableMeta.uuid,
+                                Lock.factory(
+                                        lockFactory().orElse(null),
+                                        lockContext().orElse(null),
+                                        identifier),
+                                metastoreClientFactory(identifier, 
tableMeta.schema).orElse(null),
+                                lineageMetaFactory));
+        CoreOptions options = table.coreOptions();
+        if (options.type() == TableType.OBJECT_TABLE) {
+            String objectLocation = options.objectLocation();
+            checkNotNull(objectLocation, "Object location should not be null 
for object table.");
+            table =
+                    ObjectTable.builder()
+                            .underlyingTable(table)
+                            .objectLocation(objectLocation)
+                            .objectFileIO(objectFileIO(objectLocation))
+                            .build();
+        }
+        return table;
+    }
+
+    /**
+     * Catalog implementation may override this method to provide {@link 
FileIO} to object table.
+     */
+    protected FileIO objectFileIO(String objectLocation) {
+        return fileIO;
     }
 
     /**
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
index c9b9c21937..2e88213a24 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -127,7 +127,7 @@ public class PrivilegedCatalog extends DelegateCatalog {
     public Table getTable(Identifier identifier) throws TableNotExistException 
{
         Table table = wrapped.getTable(identifier);
         if (table instanceof FileStoreTable) {
-            return new PrivilegedFileStoreTable(
+            return PrivilegedFileStoreTable.wrap(
                     (FileStoreTable) table, 
privilegeManager.getPrivilegeChecker(), identifier);
         } else {
             return table;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 37990ed5a1..52c806c7c5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -27,6 +27,7 @@ import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.DelegatedFileStoreTable;
 import org.apache.paimon.table.ExpireSnapshots;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.object.ObjectTable;
 import org.apache.paimon.table.query.LocalTableQuery;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
@@ -48,10 +49,10 @@ import java.util.OptionalLong;
 /** {@link FileStoreTable} with privilege checks. */
 public class PrivilegedFileStoreTable extends DelegatedFileStoreTable {
 
-    private final PrivilegeChecker privilegeChecker;
-    private final Identifier identifier;
+    protected final PrivilegeChecker privilegeChecker;
+    protected final Identifier identifier;
 
-    public PrivilegedFileStoreTable(
+    protected PrivilegedFileStoreTable(
             FileStoreTable wrapped, PrivilegeChecker privilegeChecker, 
Identifier identifier) {
         super(wrapped);
         this.privilegeChecker = privilegeChecker;
@@ -106,18 +107,6 @@ public class PrivilegedFileStoreTable extends 
DelegatedFileStoreTable {
         return wrapped.statistics();
     }
 
-    @Override
-    public FileStoreTable copy(Map<String, String> dynamicOptions) {
-        return new PrivilegedFileStoreTable(
-                wrapped.copy(dynamicOptions), privilegeChecker, identifier);
-    }
-
-    @Override
-    public FileStoreTable copy(TableSchema newTableSchema) {
-        return new PrivilegedFileStoreTable(
-                wrapped.copy(newTableSchema), privilegeChecker, identifier);
-    }
-
     @Override
     public void rollbackTo(long snapshotId) {
         privilegeChecker.assertCanInsert(identifier);
@@ -202,18 +191,6 @@ public class PrivilegedFileStoreTable extends 
DelegatedFileStoreTable {
         return wrapped.newExpireChangelog();
     }
 
-    @Override
-    public FileStoreTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
-        return new PrivilegedFileStoreTable(
-                wrapped.copyWithoutTimeTravel(dynamicOptions), 
privilegeChecker, identifier);
-    }
-
-    @Override
-    public FileStoreTable copyWithLatestSchema() {
-        return new PrivilegedFileStoreTable(
-                wrapped.copyWithLatestSchema(), privilegeChecker, identifier);
-    }
-
     @Override
     public DataTableScan newScan() {
         privilegeChecker.assertCanSelect(identifier);
@@ -262,11 +239,7 @@ public class PrivilegedFileStoreTable extends 
DelegatedFileStoreTable {
         return wrapped.newLocalTableQuery();
     }
 
-    @Override
-    public FileStoreTable switchToBranch(String branchName) {
-        return new PrivilegedFileStoreTable(
-                wrapped.switchToBranch(branchName), privilegeChecker, 
identifier);
-    }
+    // ======================= equals ============================
 
     @Override
     public boolean equals(Object o) {
@@ -281,4 +254,45 @@ public class PrivilegedFileStoreTable extends 
DelegatedFileStoreTable {
                 && Objects.equals(privilegeChecker, that.privilegeChecker)
                 && Objects.equals(identifier, that.identifier);
     }
+
+    // ======================= copy ============================
+
+    @Override
+    public PrivilegedFileStoreTable copy(Map<String, String> dynamicOptions) {
+        return new PrivilegedFileStoreTable(
+                wrapped.copy(dynamicOptions), privilegeChecker, identifier);
+    }
+
+    @Override
+    public PrivilegedFileStoreTable copy(TableSchema newTableSchema) {
+        return new PrivilegedFileStoreTable(
+                wrapped.copy(newTableSchema), privilegeChecker, identifier);
+    }
+
+    @Override
+    public PrivilegedFileStoreTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
+        return new PrivilegedFileStoreTable(
+                wrapped.copyWithoutTimeTravel(dynamicOptions), 
privilegeChecker, identifier);
+    }
+
+    @Override
+    public PrivilegedFileStoreTable copyWithLatestSchema() {
+        return new PrivilegedFileStoreTable(
+                wrapped.copyWithLatestSchema(), privilegeChecker, identifier);
+    }
+
+    @Override
+    public PrivilegedFileStoreTable switchToBranch(String branchName) {
+        return new PrivilegedFileStoreTable(
+                wrapped.switchToBranch(branchName), privilegeChecker, 
identifier);
+    }
+
+    public static PrivilegedFileStoreTable wrap(
+            FileStoreTable table, PrivilegeChecker privilegeChecker, 
Identifier identifier) {
+        if (table instanceof ObjectTable) {
+            return new PrivilegedObjectTable((ObjectTable) table, 
privilegeChecker, identifier);
+        } else {
+            return new PrivilegedFileStoreTable(table, privilegeChecker, 
identifier);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedObjectTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedObjectTable.java
new file mode 100644
index 0000000000..c5a319c1fe
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedObjectTable.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.object.ObjectTable;
+
+import java.util.Map;
+
+/** A {@link PrivilegedFileStoreTable} for {@link ObjectTable}. */
+public class PrivilegedObjectTable extends PrivilegedFileStoreTable implements 
ObjectTable {
+
+    private final ObjectTable objectTable;
+
+    protected PrivilegedObjectTable(
+            ObjectTable wrapped, PrivilegeChecker privilegeChecker, Identifier 
identifier) {
+        super(wrapped, privilegeChecker, identifier);
+        this.objectTable = wrapped;
+    }
+
+    @Override
+    public String objectLocation() {
+        return objectTable.objectLocation();
+    }
+
+    @Override
+    public FileStoreTable underlyingTable() {
+        return objectTable.underlyingTable();
+    }
+
+    @Override
+    public FileIO objectFileIO() {
+        return objectTable.objectFileIO();
+    }
+
+    @Override
+    public long refresh() {
+        privilegeChecker.assertCanInsert(identifier);
+        return objectTable.refresh();
+    }
+
+    // ======================= copy ============================
+
+    @Override
+    public PrivilegedObjectTable copy(Map<String, String> dynamicOptions) {
+        return new PrivilegedObjectTable(
+                objectTable.copy(dynamicOptions), privilegeChecker, 
identifier);
+    }
+
+    @Override
+    public PrivilegedObjectTable copy(TableSchema newTableSchema) {
+        return new PrivilegedObjectTable(
+                objectTable.copy(newTableSchema), privilegeChecker, 
identifier);
+    }
+
+    @Override
+    public PrivilegedObjectTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
+        return new PrivilegedObjectTable(
+                objectTable.copyWithoutTimeTravel(dynamicOptions), 
privilegeChecker, identifier);
+    }
+
+    @Override
+    public PrivilegedObjectTable copyWithLatestSchema() {
+        return new PrivilegedObjectTable(
+                objectTable.copyWithLatestSchema(), privilegeChecker, 
identifier);
+    }
+
+    @Override
+    public PrivilegedObjectTable switchToBranch(String branchName) {
+        return new PrivilegedObjectTable(
+                objectTable.switchToBranch(branchName), privilegeChecker, 
identifier);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 47d8777241..423dc17263 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -19,14 +19,12 @@
 package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.TableType;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.object.ObjectTable;
 import org.apache.paimon.utils.StringUtils;
 
 import java.io.IOException;
@@ -35,7 +33,6 @@ import java.util.Optional;
 
 import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Factory to create {@link FileStoreTable}. */
 public class FileStoreTableFactory {
@@ -127,17 +124,6 @@ public class FileStoreTableFactory {
                                 fileIO, tablePath, tableSchema, 
catalogEnvironment)
                         : new PrimaryKeyFileStoreTable(
                                 fileIO, tablePath, tableSchema, 
catalogEnvironment);
-        table = table.copy(dynamicOptions.toMap());
-        CoreOptions options = table.coreOptions();
-        if (options.type() == TableType.OBJECT_TABLE) {
-            String objectLocation = options.objectLocation();
-            checkNotNull(objectLocation, "Object location should not be null 
for object table.");
-            table =
-                    ObjectTable.builder()
-                            .underlyingTable(table)
-                            .objectLocation(objectLocation)
-                            .build();
-        }
-        return table;
+        return table.copy(dynamicOptions.toMap());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java 
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java
index 326efbc0ea..b1be840c51 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java
@@ -26,7 +26,6 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -41,13 +40,14 @@ public class ObjectRefresh {
 
     public static long refresh(ObjectTable table) throws Exception {
         String location = table.objectLocation();
-        FileStoreTable underlyingTable = table.underlyingTable();
-        FileIO fileIO = underlyingTable.fileIO();
 
+        // 1. collect all files for object table
         List<FileStatus> fileCollector = new ArrayList<>();
-        listAllFiles(fileIO, new Path(location), fileCollector);
+        listAllFiles(table.objectFileIO(), new Path(location), fileCollector);
 
-        BatchWriteBuilder writeBuilder = 
underlyingTable.newBatchWriteBuilder().withOverwrite();
+        // 2. write to underlying table
+        BatchWriteBuilder writeBuilder =
+                table.underlyingTable().newBatchWriteBuilder().withOverwrite();
         try (BatchTableWrite write = writeBuilder.newWrite();
                 BatchTableCommit commit = writeBuilder.newCommit()) {
             for (FileStatus file : fileCollector) {
@@ -78,6 +78,7 @@ public class ObjectRefresh {
     private static InternalRow toRow(FileStatus file) {
         return toRow(
                 file.getPath().toString(),
+                file.getPath().getParent().toString(),
                 file.getPath().getName(),
                 file.getLen(),
                 Timestamp.fromEpochMillis(file.getModificationTime()),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
index 65689108ca..97acfe7299 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.object;
 
+import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.DelegatedFileStoreTable;
@@ -46,6 +47,7 @@ public interface ObjectTable extends FileStoreTable {
     RowType SCHEMA =
             RowType.builder()
                     .field("path", DataTypes.STRING().notNull())
+                    .field("parent_path", DataTypes.STRING().notNull())
                     .field("name", DataTypes.STRING().notNull())
                     .field("length", DataTypes.BIGINT().notNull())
                     .field("mtime", DataTypes.TIMESTAMP_LTZ_MILLIS())
@@ -66,11 +68,26 @@ public interface ObjectTable extends FileStoreTable {
     /** Underlying table to store metadata. */
     FileStoreTable underlyingTable();
 
+    /** File io for object file system. */
+    FileIO objectFileIO();
+
     long refresh();
 
     @Override
     ObjectTable copy(Map<String, String> dynamicOptions);
 
+    @Override
+    ObjectTable copy(TableSchema newTableSchema);
+
+    @Override
+    ObjectTable copyWithoutTimeTravel(Map<String, String> dynamicOptions);
+
+    @Override
+    ObjectTable copyWithLatestSchema();
+
+    @Override
+    ObjectTable switchToBranch(String branchName);
+
     /** Create a new builder for {@link ObjectTable}. */
     static ObjectTable.Builder builder() {
         return new ObjectTable.Builder();
@@ -80,6 +97,7 @@ public interface ObjectTable extends FileStoreTable {
     class Builder {
 
         private FileStoreTable underlyingTable;
+        private FileIO objectFileIO;
         private String objectLocation;
 
         public ObjectTable.Builder underlyingTable(FileStoreTable 
underlyingTable) {
@@ -93,23 +111,31 @@ public interface ObjectTable extends FileStoreTable {
             return this;
         }
 
+        public ObjectTable.Builder objectFileIO(FileIO objectFileIO) {
+            this.objectFileIO = objectFileIO;
+            return this;
+        }
+
         public ObjectTable.Builder objectLocation(String objectLocation) {
             this.objectLocation = objectLocation;
             return this;
         }
 
         public ObjectTable build() {
-            return new ObjectTableImpl(underlyingTable, objectLocation);
+            return new ObjectTableImpl(underlyingTable, objectFileIO, 
objectLocation);
         }
     }
 
     /** An implementation for {@link ObjectTable}. */
     class ObjectTableImpl extends DelegatedFileStoreTable implements 
ObjectTable {
 
+        private final FileIO objectFileIO;
         private final String objectLocation;
 
-        public ObjectTableImpl(FileStoreTable underlyingTable, String 
objectLocation) {
+        public ObjectTableImpl(
+                FileStoreTable underlyingTable, FileIO objectFileIO, String 
objectLocation) {
             super(underlyingTable);
+            this.objectFileIO = objectFileIO;
             this.objectLocation = objectLocation;
         }
 
@@ -148,6 +174,11 @@ public interface ObjectTable extends FileStoreTable {
             return wrapped;
         }
 
+        @Override
+        public FileIO objectFileIO() {
+            return objectFileIO;
+        }
+
         @Override
         public long refresh() {
             try {
@@ -159,28 +190,30 @@ public interface ObjectTable extends FileStoreTable {
 
         @Override
         public ObjectTable copy(Map<String, String> dynamicOptions) {
-            return new ObjectTableImpl(wrapped.copy(dynamicOptions), 
objectLocation);
+            return new ObjectTableImpl(wrapped.copy(dynamicOptions), 
objectFileIO, objectLocation);
         }
 
         @Override
-        public FileStoreTable copy(TableSchema newTableSchema) {
-            return new ObjectTableImpl(wrapped.copy(newTableSchema), 
objectLocation);
+        public ObjectTable copy(TableSchema newTableSchema) {
+            return new ObjectTableImpl(wrapped.copy(newTableSchema), 
objectFileIO, objectLocation);
         }
 
         @Override
-        public FileStoreTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
+        public ObjectTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
             return new ObjectTableImpl(
-                    wrapped.copyWithoutTimeTravel(dynamicOptions), 
objectLocation);
+                    wrapped.copyWithoutTimeTravel(dynamicOptions), 
objectFileIO, objectLocation);
         }
 
         @Override
-        public FileStoreTable copyWithLatestSchema() {
-            return new ObjectTableImpl(wrapped.copyWithLatestSchema(), 
objectLocation);
+        public ObjectTable copyWithLatestSchema() {
+            return new ObjectTableImpl(
+                    wrapped.copyWithLatestSchema(), objectFileIO, 
objectLocation);
         }
 
         @Override
-        public FileStoreTable switchToBranch(String branchName) {
-            return new ObjectTableImpl(wrapped.switchToBranch(branchName), 
objectLocation);
+        public ObjectTable switchToBranch(String branchName) {
+            return new ObjectTableImpl(
+                    wrapped.switchToBranch(branchName), objectFileIO, 
objectLocation);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
index b9e30035b0..d3ad1d4a52 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
@@ -80,4 +80,31 @@ public class ObjectTableITCase extends CatalogITCaseBase {
                 .hasMessageContaining("Object table does not support Write.");
         assertThat(sql("SELECT name, length FROM 
T")).containsExactlyInAnyOrder(Row.of("f1", 5L));
     }
+
+    @Test
+    public void testObjectTableRefreshInPrivileged() throws IOException {
+        sql("CALL sys.init_file_based_privilege('root-passwd')");
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG rootcat WITH (\n"
+                                + "  'type' = 'paimon',\n"
+                                + "  'warehouse' = '%s',\n"
+                                + "  'user' = 'root',\n"
+                                + "  'password' = 'root-passwd'\n"
+                                + ")",
+                        path));
+        tEnv.useCatalog("rootcat");
+
+        Path objectLocation = new Path(path + "/object-location");
+        FileIO fileIO = LocalFileIO.create();
+        sql(
+                "CREATE TABLE T WITH ('type' = 'object-table', 
'object-location' = '%s')",
+                objectLocation);
+
+        // add new file
+        fileIO.overwriteFileUtf8(new Path(objectLocation, "f0"), "1,2,3");
+        sql("CALL sys.refresh_object_table('default.T')");
+        assertThat(sql("SELECT name, length FROM 
T")).containsExactlyInAnyOrder(Row.of("f0", 5L));
+    }
 }

Reply via email to