This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 107dfa0104 [core] Extract loadTable in CatalogUtils (#4904)
107dfa0104 is described below

commit 107dfa0104728362ed838f9dcc8263084dd9772e
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jan 14 15:54:17 2025 +0800

    [core] Extract loadTable in CatalogUtils (#4904)
---
 .../java/org/apache/paimon/options/Options.java    |   4 +-
 .../org/apache/paimon/catalog/AbstractCatalog.java | 122 +++----------------
 .../java/org/apache/paimon/catalog/Catalog.java    |   4 +
 .../org/apache/paimon/catalog/CatalogUtils.java    | 135 ++++++++++++++-------
 .../org/apache/paimon/catalog/DelegateCatalog.java |   6 +
 .../apache/paimon/catalog/FileSystemCatalog.java   |   2 +-
 .../org/apache/paimon/catalog/TableMetadata.java   |  49 ++++++++
 .../java/org/apache/paimon/jdbc/JdbcCatalog.java   |   5 +-
 .../java/org/apache/paimon/rest/RESTCatalog.java   | 103 +++++-----------
 .../apache/paimon/table/object/ObjectTable.java    |   2 +
 .../org/apache/paimon/catalog/CatalogTestBase.java |   4 +-
 .../org/apache/paimon/rest/TestRESTCatalog.java    |  12 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   |  82 +++++--------
 .../org/apache/paimon/hive/HiveTableUtils.java     |  34 +++---
 .../org/apache/paimon/hive/HiveCatalogTest.java    |   4 -
 15 files changed, 261 insertions(+), 307 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/options/Options.java 
b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
index d292fef3bf..46e70e22af 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
@@ -149,8 +149,8 @@ public class Options implements Serializable {
         return new Options(convertToPropertiesPrefixKey(data, prefix));
     }
 
-    public synchronized void remove(String key) {
-        data.remove(key);
+    public synchronized String remove(String key) {
+        return data.remove(key);
     }
 
     public synchronized void remove(ConfigOption<?> option) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index dedfd5f112..f766df0c5c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.TableType;
 import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
@@ -32,18 +31,13 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.object.ObjectTable;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Preconditions;
-
-import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -55,13 +49,12 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.OBJECT_LOCATION;
+import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.CoreOptions.createCommitUser;
-import static 
org.apache.paimon.catalog.CatalogUtils.buildFormatTableByTableSchema;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
-import static org.apache.paimon.catalog.CatalogUtils.getTableType;
 import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
 import static 
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
 import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
@@ -69,7 +62,6 @@ import static 
org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Common implementation of {@link Catalog}. */
 public abstract class AbstractCatalog implements Catalog {
@@ -100,6 +92,11 @@ public abstract class AbstractCatalog implements Catalog {
         return fileIO;
     }
 
+    @Override
+    public FileIO fileIO(Path path) {
+        return fileIO;
+    }
+
     public Optional<CatalogLockFactory> lockFactory() {
         if (!lockEnabled()) {
             return Optional.empty();
@@ -370,67 +367,9 @@ public abstract class AbstractCatalog implements Catalog {
 
     @Override
     public Table getTable(Identifier identifier) throws TableNotExistException 
{
-        if (isSystemDatabase(identifier.getDatabaseName())) {
-            return 
CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
-        } else if (identifier.isSystemTable()) {
-            Table originTable =
-                    getDataOrFormatTable(
-                            new Identifier(
-                                    identifier.getDatabaseName(),
-                                    identifier.getTableName(),
-                                    identifier.getBranchName(),
-                                    null));
-            return CatalogUtils.createSystemTable(identifier, originTable);
-        } else {
-            return getDataOrFormatTable(identifier);
-        }
-    }
-
-    // hive override this method.
-    protected Table getDataOrFormatTable(Identifier identifier) throws 
TableNotExistException {
-        Preconditions.checkArgument(identifier.getSystemTableName() == null);
-        TableMeta tableMeta = getDataTableMeta(identifier);
-        TableType tableType = getTableType(tableMeta.schema().options());
-        if (tableType == TableType.FORMAT_TABLE) {
-            TableSchema schema = tableMeta.schema();
-            return buildFormatTableByTableSchema(
-                    identifier,
-                    schema.options(),
-                    schema.logicalRowType(),
-                    schema.partitionKeys(),
-                    schema.comment());
-        }
-        FileStoreTable table =
-                FileStoreTableFactory.create(
-                        fileIO,
-                        getTableLocation(identifier),
-                        tableMeta.schema,
-                        new CatalogEnvironment(
-                                identifier,
-                                tableMeta.uuid,
-                                Lock.factory(
-                                        lockFactory().orElse(null),
-                                        lockContext().orElse(null),
-                                        identifier),
-                                catalogLoader()));
-        if (tableType == TableType.OBJECT_TABLE) {
-            String objectLocation = table.coreOptions().objectLocation();
-            checkNotNull(objectLocation, "Object location should not be null 
for object table.");
-            table =
-                    ObjectTable.builder()
-                            .underlyingTable(table)
-                            .objectLocation(objectLocation)
-                            .objectFileIO(objectFileIO(objectLocation))
-                            .build();
-        }
-        return table;
-    }
-
-    /**
-     * Catalog implementation may override this method to provide {@link 
FileIO} to object table.
-     */
-    protected FileIO objectFileIO(String objectLocation) {
-        return fileIO;
+        Lock.Factory lockFactory =
+                Lock.factory(lockFactory().orElse(null), 
lockContext().orElse(null), identifier);
+        return CatalogUtils.loadTable(this, identifier, 
this::loadTableMetadata, lockFactory);
     }
 
     /**
@@ -455,11 +394,11 @@ public abstract class AbstractCatalog implements Catalog {
         return newDatabasePath(warehouse(), database);
     }
 
-    protected TableMeta getDataTableMeta(Identifier identifier) throws 
TableNotExistException {
-        return new TableMeta(getDataTableSchema(identifier), null);
+    protected TableMetadata loadTableMetadata(Identifier identifier) throws 
TableNotExistException {
+        return new TableMetadata(loadTableSchema(identifier), null);
     }
 
-    protected abstract TableSchema getDataTableSchema(Identifier identifier)
+    protected abstract TableSchema loadTableSchema(Identifier identifier)
             throws TableNotExistException;
 
     public Path getTableLocation(Identifier identifier) {
@@ -537,38 +476,17 @@ public abstract class AbstractCatalog implements Catalog {
     }
 
     public Optional<TableSchema> tableSchemaInFileSystem(Path tablePath, 
String branchName) {
-        return new SchemaManager(fileIO, tablePath, branchName)
-                .latest()
-                .map(
-                        s -> {
-                            if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+        Optional<TableSchema> schema = new SchemaManager(fileIO, tablePath, 
branchName).latest();
+        if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+            schema =
+                    schema.map(
+                            s -> {
                                 Options branchOptions = new 
Options(s.options());
                                 branchOptions.set(CoreOptions.BRANCH, 
branchName);
                                 return s.copy(branchOptions.toMap());
-                            } else {
-                                return s;
-                            }
-                        });
-    }
-
-    /** Table metadata. */
-    protected static class TableMeta {
-
-        private final TableSchema schema;
-        @Nullable private final String uuid;
-
-        public TableMeta(TableSchema schema, @Nullable String uuid) {
-            this.schema = schema;
-            this.uuid = uuid;
-        }
-
-        public TableSchema schema() {
-            return schema;
-        }
-
-        @Nullable
-        public String uuid() {
-            return uuid;
+                            });
         }
+        schema.ifPresent(s -> s.options().put(PATH.key(), 
tablePath.toString()));
+        return schema;
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index d0ad86c224..a0d78b2688 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.catalog;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -374,6 +375,9 @@ public interface Catalog extends AutoCloseable {
     /** {@link FileIO} of this catalog. It can access {@link #warehouse()} 
path. */
     FileIO fileIO();
 
+    /** {@link FileIO} of this catalog. */
+    FileIO fileIO(Path path);
+
     /** Catalog options for re-creating this catalog. */
     Map<String, String> options();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 3c245040cc..3ceef461e8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -20,18 +20,23 @@ package org.apache.paimon.catalog;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.TableType;
+import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.object.ObjectTable;
 import org.apache.paimon.table.system.AllTableOptionsTable;
 import org.apache.paimon.table.system.CatalogOptionsTable;
 import org.apache.paimon.table.system.SystemTableLoader;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Preconditions;
 
@@ -42,6 +47,7 @@ import java.util.Map;
 
 import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
 import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGCY_NAME;
+import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
 import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
@@ -129,7 +135,76 @@ public class CatalogUtils {
                         CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
     }
 
-    public static Table createGlobalSystemTable(String tableName, Catalog 
catalog)
+    public static List<Partition> listPartitionsFromFileSystem(Table table) {
+        Options options = Options.fromMap(table.options());
+        InternalRowPartitionComputer computer =
+                new InternalRowPartitionComputer(
+                        options.get(PARTITION_DEFAULT_NAME),
+                        table.rowType().project(table.partitionKeys()),
+                        table.partitionKeys().toArray(new String[0]),
+                        options.get(PARTITION_GENERATE_LEGCY_NAME));
+        List<PartitionEntry> partitionEntries =
+                table.newReadBuilder().newScan().listPartitionEntries();
+        List<Partition> partitions = new ArrayList<>(partitionEntries.size());
+        for (PartitionEntry entry : partitionEntries) {
+            partitions.add(
+                    new Partition(
+                            computer.generatePartValues(entry.partition()),
+                            entry.recordCount(),
+                            entry.fileSizeInBytes(),
+                            entry.fileCount(),
+                            entry.lastFileCreationTime()));
+        }
+        return partitions;
+    }
+
+    /**
+     * Load table from {@link Catalog}, this table can be:
+     *
+     * <ul>
+     *   <li>1. Global System table: contains the statistical information of 
all the tables exists.
+     *   <li>2. Format table: refers to a directory that contains multiple 
files of the same format.
+     *   <li>3. Data table: Normal {@link FileStoreTable}, primary key table 
or append table.
+     *   <li>4. Object table: provides metadata indexes for unstructured data 
in the location.
+     *   <li>5. System table: wraps Data table or Object table, such as the 
snapshots created.
+     * </ul>
+     */
+    public static Table loadTable(
+            Catalog catalog,
+            Identifier identifier,
+            TableMetadata.Loader metadataLoader,
+            Lock.Factory lockFactory)
+            throws Catalog.TableNotExistException {
+        if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
+            return 
CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog);
+        }
+
+        TableMetadata metadata = metadataLoader.load(identifier);
+        TableSchema schema = metadata.schema();
+        CoreOptions options = CoreOptions.fromMap(schema.options());
+        if (options.type() == TableType.FORMAT_TABLE) {
+            return toFormatTable(identifier, schema);
+        }
+
+        CatalogEnvironment catalogEnv =
+                new CatalogEnvironment(
+                        identifier, metadata.uuid(), lockFactory, 
catalog.catalogLoader());
+        Path path = new Path(schema.options().get(PATH.key()));
+        FileStoreTable table =
+                FileStoreTableFactory.create(catalog.fileIO(), path, schema, 
catalogEnv);
+
+        if (options.type() == TableType.OBJECT_TABLE) {
+            table = toObjectTable(catalog, table);
+        }
+
+        if (identifier.isSystemTable()) {
+            return CatalogUtils.createSystemTable(identifier, table);
+        }
+
+        return table;
+    }
+
+    private static Table createGlobalSystemTable(String tableName, Catalog 
catalog)
             throws Catalog.TableNotExistException {
         switch (tableName.toLowerCase()) {
             case ALL_TABLE_OPTIONS:
@@ -154,7 +229,7 @@ public class CatalogUtils {
         }
     }
 
-    public static Table createSystemTable(Identifier identifier, Table 
originTable)
+    private static Table createSystemTable(Identifier identifier, Table 
originTable)
             throws Catalog.TableNotExistException {
         if (!(originTable instanceof FileStoreTable)) {
             throw new UnsupportedOperationException(
@@ -172,41 +247,8 @@ public class CatalogUtils {
         return table;
     }
 
-    public static List<Partition> listPartitionsFromFileSystem(Table table) {
-        Options options = Options.fromMap(table.options());
-        InternalRowPartitionComputer computer =
-                new InternalRowPartitionComputer(
-                        options.get(PARTITION_DEFAULT_NAME),
-                        table.rowType().project(table.partitionKeys()),
-                        table.partitionKeys().toArray(new String[0]),
-                        options.get(PARTITION_GENERATE_LEGCY_NAME));
-        List<PartitionEntry> partitionEntries =
-                table.newReadBuilder().newScan().listPartitionEntries();
-        List<Partition> partitions = new ArrayList<>(partitionEntries.size());
-        for (PartitionEntry entry : partitionEntries) {
-            partitions.add(
-                    new Partition(
-                            computer.generatePartValues(entry.partition()),
-                            entry.recordCount(),
-                            entry.fileSizeInBytes(),
-                            entry.fileCount(),
-                            entry.lastFileCreationTime()));
-        }
-        return partitions;
-    }
-
-    public static TableType getTableType(Map<String, String> options) {
-        return options.containsKey(CoreOptions.TYPE.key())
-                ? TableType.fromString(options.get(CoreOptions.TYPE.key()))
-                : CoreOptions.TYPE.defaultValue();
-    }
-
-    public static FormatTable buildFormatTableByTableSchema(
-            Identifier identifier,
-            Map<String, String> options,
-            RowType rowType,
-            List<String> partitionKeys,
-            String comment) {
+    private static FormatTable toFormatTable(Identifier identifier, 
TableSchema schema) {
+        Map<String, String> options = schema.options();
         FormatTable.Format format =
                 FormatTable.parseFormat(
                         options.getOrDefault(
@@ -215,12 +257,23 @@ public class CatalogUtils {
         String location = options.get(CoreOptions.PATH.key());
         return FormatTable.builder()
                 .identifier(identifier)
-                .rowType(rowType)
-                .partitionKeys(partitionKeys)
+                .rowType(schema.logicalRowType())
+                .partitionKeys(schema.partitionKeys())
                 .location(location)
                 .format(format)
                 .options(options)
-                .comment(comment)
+                .comment(schema.comment())
+                .build();
+    }
+
+    private static ObjectTable toObjectTable(Catalog catalog, FileStoreTable 
underlyingTable) {
+        CoreOptions options = underlyingTable.coreOptions();
+        String objectLocation = options.objectLocation();
+        FileIO objectFileIO = catalog.fileIO(new Path(objectLocation));
+        return ObjectTable.builder()
+                .underlyingTable(underlyingTable)
+                .objectLocation(objectLocation)
+                .objectFileIO(objectFileIO)
                 .build();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index aa7852456e..847485a7a1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -61,6 +62,11 @@ public abstract class DelegateCatalog implements Catalog {
         return wrapped.fileIO();
     }
 
+    @Override
+    public FileIO fileIO(Path path) {
+        return wrapped.fileIO(path);
+    }
+
     @Override
     public List<String> listDatabases() {
         return wrapped.listDatabases();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 254826b91d..f8e9a2aabc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -103,7 +103,7 @@ public class FileSystemCatalog extends AbstractCatalog {
     }
 
     @Override
-    public TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
+    public TableSchema loadTableSchema(Identifier identifier) throws 
TableNotExistException {
         return tableSchemaInFileSystem(
                         getTableLocation(identifier), 
identifier.getBranchNameOrDefault())
                 .orElseThrow(() -> new TableNotExistException(identifier));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/TableMetadata.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/TableMetadata.java
new file mode 100644
index 0000000000..81904476a2
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/TableMetadata.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.schema.TableSchema;
+
+import javax.annotation.Nullable;
+
+/** Metadata for table. */
+public class TableMetadata {
+
+    private final TableSchema schema;
+    @Nullable private final String uuid;
+
+    public TableMetadata(TableSchema schema, @Nullable String uuid) {
+        this.schema = schema;
+        this.uuid = uuid;
+    }
+
+    public TableSchema schema() {
+        return schema;
+    }
+
+    @Nullable
+    public String uuid() {
+        return uuid;
+    }
+
+    /** Loader to load {@link TableMetadata}. */
+    public interface Loader {
+        TableMetadata load(Identifier identifier) throws 
Catalog.TableNotExistException;
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 327c6b9676..8db9e723c2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -357,15 +357,14 @@ public class JdbcCatalog extends AbstractCatalog {
     }
 
     @Override
-    protected TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
+    protected TableSchema loadTableSchema(Identifier identifier) throws 
TableNotExistException {
         assertMainBranch(identifier);
         if (!JdbcUtils.tableExists(
                 connections, catalogKey, identifier.getDatabaseName(), 
identifier.getTableName())) {
             throw new TableNotExistException(identifier);
         }
         Path tableLocation = getTableLocation(identifier);
-        return new SchemaManager(fileIO, tableLocation)
-                .latest()
+        return tableSchemaInFileSystem(tableLocation, 
identifier.getBranchNameOrDefault())
                 .orElseThrow(
                         () -> new RuntimeException("There is no paimon table 
in " + tableLocation));
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 4b4e90fb7c..97f9ffe567 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.rest;
 
-import org.apache.paimon.TableType;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogLoader;
@@ -26,6 +25,7 @@ import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.catalog.TableMetadata;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.FileStoreCommit;
@@ -60,14 +60,10 @@ import org.apache.paimon.rest.responses.ListTablesResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.object.ObjectTable;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Preconditions;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
@@ -83,20 +79,16 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
-import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.CoreOptions.createCommitUser;
-import static 
org.apache.paimon.catalog.CatalogUtils.buildFormatTableByTableSchema;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
-import static org.apache.paimon.catalog.CatalogUtils.getTableType;
 import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
 import static 
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
 import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
 import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
 import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;
 import static org.apache.paimon.rest.auth.AuthSession.createAuthSession;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
 import static 
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
 
 /** A catalog implementation for REST. */
@@ -171,6 +163,11 @@ public class RESTCatalog implements Catalog {
         return fileIO;
     }
 
+    @Override
+    public FileIO fileIO(Path path) {
+        return fileIO;
+    }
+
     @Override
     public List<String> listDatabases() {
         ListDatabasesResponse response =
@@ -281,13 +278,28 @@ public class RESTCatalog implements Catalog {
 
     @Override
     public Table getTable(Identifier identifier) throws TableNotExistException 
{
-        if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
-            return 
CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
-        } else if (identifier.isSystemTable()) {
-            return getSystemTable(identifier);
-        } else {
-            return getDataOrFormatTable(identifier);
+        // TODO add lock from server
+        return CatalogUtils.loadTable(
+                this, identifier, this::loadTableMetadata, 
Lock.emptyFactory());
+    }
+
+    private TableMetadata loadTableMetadata(Identifier identifier) throws 
TableNotExistException {
+        GetTableResponse response;
+        try {
+            response =
+                    client.get(
+                            resourcePaths.table(
+                                    identifier.getDatabaseName(), 
identifier.getTableName()),
+                            GetTableResponse.class,
+                            headers());
+        } catch (NoSuchResourceException e) {
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
         }
+
+        TableSchema schema = TableSchema.create(response.getSchemaId(), 
response.getSchema());
+        return new TableMetadata(schema, response.getId());
     }
 
     @Override
@@ -520,56 +532,6 @@ public class RESTCatalog implements Catalog {
         }
     }
 
-    private Table getDataOrFormatTable(Identifier identifier) throws 
TableNotExistException {
-        Preconditions.checkArgument(identifier.getSystemTableName() == null);
-
-        GetTableResponse response;
-        try {
-            response =
-                    client.get(
-                            resourcePaths.table(
-                                    identifier.getDatabaseName(), 
identifier.getTableName()),
-                            GetTableResponse.class,
-                            headers());
-        } catch (NoSuchResourceException e) {
-            throw new TableNotExistException(identifier);
-        } catch (ForbiddenException e) {
-            throw new TableNoPermissionException(identifier, e);
-        }
-        TableType tableType = getTableType(response.getSchema().options());
-        if (tableType == TableType.FORMAT_TABLE) {
-            Schema schema = response.getSchema();
-            return buildFormatTableByTableSchema(
-                    identifier,
-                    schema.options(),
-                    schema.rowType(),
-                    schema.partitionKeys(),
-                    schema.comment());
-        }
-        TableSchema schema = TableSchema.create(response.getSchemaId(), 
response.getSchema());
-        FileStoreTable table =
-                FileStoreTableFactory.create(
-                        fileIO(),
-                        new Path(schema.options().get(PATH.key())),
-                        schema,
-                        new CatalogEnvironment(
-                                identifier,
-                                response.getId(),
-                                Lock.emptyFactory(),
-                                catalogLoader()));
-        if (tableType == TableType.OBJECT_TABLE) {
-            String objectLocation = table.coreOptions().objectLocation();
-            checkNotNull(objectLocation, "Object location should not be null 
for object table.");
-            table =
-                    ObjectTable.builder()
-                            .underlyingTable(table)
-                            .objectLocation(objectLocation)
-                            .objectFileIO(this.fileIO())
-                            .build();
-        }
-        return table;
-    }
-
     private boolean isMetaStorePartitionedTable(Table table) {
         Options options = Options.fromMap(table.options());
         return Boolean.TRUE.equals(options.get(METASTORE_PARTITIONED_TABLE));
@@ -579,17 +541,6 @@ public class RESTCatalog implements Catalog {
         return catalogAuth.getHeaders();
     }
 
-    private Table getSystemTable(Identifier identifier) throws 
TableNotExistException {
-        Table originTable =
-                getDataOrFormatTable(
-                        new Identifier(
-                                identifier.getDatabaseName(),
-                                identifier.getTableName(),
-                                identifier.getBranchName(),
-                                null));
-        return CatalogUtils.createSystemTable(identifier, originTable);
-    }
-
     private ScheduledExecutorService tokenRefreshExecutor() {
         if (refreshExecutor == null) {
             synchronized (this) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
index 97acfe7299..992425ed52 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
 import java.util.Map;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /**
  * A object table refers to a directory that contains multiple objects 
(files), Object table
@@ -122,6 +123,7 @@ public interface ObjectTable extends FileStoreTable {
         }
 
         public ObjectTable build() {
+            checkNotNull(objectLocation, "Object location should not be null 
for object table.");
             return new ObjectTableImpl(underlyingTable, objectFileIO, 
objectLocation);
         }
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index c8b9192c1c..1c453a1b3b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -409,7 +409,7 @@ public abstract class CatalogTestBase {
                                 catalog.getTable(
                                         Identifier.create(
                                                 "test_db", 
"non_existing_table$snapshots")))
-                .withMessage("Table test_db.non_existing_table does not 
exist.");
+                .withMessage("Table test_db.non_existing_table$snapshots does 
not exist.");
 
         // Get system table throws TableNotExistException when system table 
type does not exist
         assertThatExceptionOfType(Catalog.TableNotExistException.class)
@@ -417,7 +417,7 @@ public abstract class CatalogTestBase {
                         () ->
                                 catalog.getTable(
                                         Identifier.create("test_db", 
"non_existing_table$schema1")))
-                .withMessage("Table test_db.non_existing_table does not 
exist.");
+                .withMessage("Table test_db.non_existing_table$schema1 does 
not exist.");
 
         // Get data table throws TableNotExistException when table does not 
exist
         assertThatExceptionOfType(Catalog.TableNotExistException.class)
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java
index a0f820e7ad..4f15049311 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java
@@ -18,12 +18,13 @@
 
 package org.apache.paimon.rest;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.TableType;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.FileSystemCatalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.TableMetadata;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
@@ -166,7 +167,8 @@ public class TestRESTCatalog extends FileSystemCatalog {
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
         if (tableFullName2Schema.containsKey(identifier.getFullName())) {
             TableSchema schema = 
tableFullName2Schema.get(identifier.getFullName());
-            if (CatalogUtils.getTableType(schema.options()) == 
TableType.FORMAT_TABLE) {
+            Options options = Options.fromMap(schema.options());
+            if (options.get(CoreOptions.TYPE) == TableType.FORMAT_TABLE) {
                 throw new UnsupportedOperationException("Only data table 
support alter table.");
             }
         } else {
@@ -189,12 +191,12 @@ public class TestRESTCatalog extends FileSystemCatalog {
     }
 
     @Override
-    protected TableMeta getDataTableMeta(Identifier identifier) throws 
TableNotExistException {
+    protected TableMetadata loadTableMetadata(Identifier identifier) throws 
TableNotExistException {
         if (tableFullName2Schema.containsKey(identifier.getFullName())) {
             TableSchema tableSchema = 
tableFullName2Schema.get(identifier.getFullName());
-            return new TableMeta(tableSchema, "uuid");
+            return new TableMetadata(tableSchema, "uuid");
         }
-        return super.getDataTableMeta(identifier);
+        return super.loadTableMetadata(identifier);
     }
 
     private Partition spec2Partition(Map<String, String> spec) {
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 4e2d7b6acb..438c6971aa 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -28,6 +28,7 @@ import org.apache.paimon.catalog.CatalogLockContext;
 import org.apache.paimon.catalog.CatalogLockFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.catalog.TableMetadata;
 import org.apache.paimon.client.ClientPool;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -40,10 +41,8 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.CatalogTableType;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
@@ -51,7 +50,6 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PartitionPathUtils;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewImpl;
 
@@ -115,7 +113,7 @@ import static 
org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
 import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
 import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
 import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
-import static org.apache.paimon.hive.HiveTableUtils.convertToFormatTable;
+import static org.apache.paimon.hive.HiveTableUtils.tryToFormatSchema;
 import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
 import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
@@ -354,7 +352,7 @@ public class HiveCatalog extends AbstractCatalog {
                 Identifier.create(identifier.getDatabaseName(), 
identifier.getTableName());
         Table hmsTable = getHmsTable(tableIdentifier);
         Path location = getTableLocation(tableIdentifier, hmsTable);
-        TableSchema schema = getDataTableSchema(tableIdentifier, hmsTable);
+        TableSchema schema = loadTableSchema(tableIdentifier, hmsTable);
 
         if (!metastorePartitioned(schema)) {
             return;
@@ -395,7 +393,7 @@ public class HiveCatalog extends AbstractCatalog {
     @Override
     public void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
             throws TableNotExistException {
-        TableSchema schema = getDataTableSchema(identifier);
+        TableSchema schema = this.loadTableSchema(identifier);
         CoreOptions options = CoreOptions.fromMap(schema.options());
         boolean tagToPart = options.tagToPartitionField() != null;
         if (metastorePartitioned(schema)) {
@@ -429,7 +427,7 @@ public class HiveCatalog extends AbstractCatalog {
     public void alterPartitions(
             Identifier identifier, List<org.apache.paimon.partition.Partition> 
partitions)
             throws TableNotExistException {
-        TableSchema tableSchema = getDataTableSchema(identifier);
+        TableSchema tableSchema = this.loadTableSchema(identifier);
         if (!tableSchema.partitionKeys().isEmpty()
                 && new 
CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {
             for (org.apache.paimon.partition.Partition partition : partitions) 
{
@@ -674,32 +672,41 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    protected TableMeta getDataTableMeta(Identifier identifier) throws 
TableNotExistException {
-        return getDataTableMeta(identifier, getHmsTable(identifier));
+    protected TableMetadata loadTableMetadata(Identifier identifier) throws 
TableNotExistException {
+        return loadTableMetadata(identifier, getHmsTable(identifier));
     }
 
-    private TableMeta getDataTableMeta(Identifier identifier, Table table)
+    private TableMetadata loadTableMetadata(Identifier identifier, Table table)
             throws TableNotExistException {
-        return new TableMeta(
-                getDataTableSchema(identifier, table),
+        return new TableMetadata(
+                loadTableSchema(identifier, table),
                 identifier.getFullName() + "." + table.getCreateTime());
     }
 
     @Override
-    public TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
+    public TableSchema loadTableSchema(Identifier identifier) throws 
TableNotExistException {
         Table table = getHmsTable(identifier);
-        return getDataTableSchema(identifier, table);
+        return loadTableSchema(identifier, table);
     }
 
-    private TableSchema getDataTableSchema(Identifier identifier, Table table)
+    private TableSchema loadTableSchema(Identifier identifier, Table table)
             throws TableNotExistException {
-        if (!isPaimonTable(table)) {
-            throw new TableNotExistException(identifier);
+        if (isPaimonTable(table)) {
+            return tableSchemaInFileSystem(
+                            getTableLocation(identifier, table),
+                            identifier.getBranchNameOrDefault())
+                    .orElseThrow(() -> new TableNotExistException(identifier));
+        }
+
+        if (!formatTableDisabled()) {
+            try {
+                Schema schema = tryToFormatSchema(table);
+                return TableSchema.create(0, schema);
+            } catch (UnsupportedOperationException ignored) {
+            }
         }
 
-        return tableSchemaInFileSystem(
-                        getTableLocation(identifier, table), 
identifier.getBranchNameOrDefault())
-                .orElseThrow(() -> new TableNotExistException(identifier));
+        throw new TableNotExistException(identifier);
     }
 
     @Override
@@ -835,39 +842,6 @@ public class HiveCatalog extends AbstractCatalog {
         renameHiveTable(fromView, toView);
     }
 
-    @Override
-    public org.apache.paimon.table.Table getDataOrFormatTable(Identifier 
identifier)
-            throws TableNotExistException {
-        Preconditions.checkArgument(identifier.getSystemTableName() == null);
-        Table table = getHmsTable(identifier);
-        try {
-            TableMeta tableMeta = getDataTableMeta(identifier, table);
-            return FileStoreTableFactory.create(
-                    fileIO,
-                    getTableLocation(identifier, table),
-                    tableMeta.schema(),
-                    new CatalogEnvironment(
-                            identifier,
-                            tableMeta.uuid(),
-                            Lock.factory(
-                                    lockFactory().orElse(null),
-                                    lockContext().orElse(null),
-                                    identifier),
-                            catalogLoader()));
-        } catch (TableNotExistException ignore) {
-        }
-
-        if (formatTableDisabled()) {
-            throw new TableNotExistException(identifier);
-        }
-
-        try {
-            return convertToFormatTable(table);
-        } catch (UnsupportedOperationException e) {
-            throw new TableNotExistException(identifier);
-        }
-    }
-
     @Override
     public void createFormatTable(Identifier identifier, Schema schema) {
         if (formatTableDisabled()) {
@@ -1278,7 +1252,7 @@ public class HiveCatalog extends AbstractCatalog {
 
     private boolean isFormatTable(Table table) {
         try {
-            convertToFormatTable(table);
+            tryToFormatSchema(table);
             return true;
         } catch (UnsupportedOperationException e) {
             return false;
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
index 5e5af75e52..0007247bff 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
@@ -18,8 +18,8 @@
 
 package org.apache.paimon.hive;
 
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FormatTable.Format;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
@@ -31,24 +31,25 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.TYPE;
+import static org.apache.paimon.TableType.FORMAT_TABLE;
 import static org.apache.paimon.catalog.Catalog.COMMENT_PROP;
 import static org.apache.paimon.hive.HiveCatalog.isView;
 import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
 
 class HiveTableUtils {
 
-    public static FormatTable convertToFormatTable(Table hiveTable) {
+    public static Schema tryToFormatSchema(Table hiveTable) {
         if (isView(hiveTable)) {
             throw new UnsupportedOperationException("Hive view is not 
supported.");
         }
 
-        Identifier identifier = new Identifier(hiveTable.getDbName(), 
hiveTable.getTableName());
-        Map<String, String> options = new HashMap<>(hiveTable.getParameters());
+        Options options = Options.fromMap(hiveTable.getParameters());
         List<String> partitionKeys = 
getFieldNames(hiveTable.getPartitionKeys());
         RowType rowType = createRowType(hiveTable);
         String comment = options.remove(COMMENT_PROP);
@@ -65,20 +66,19 @@ class HiveTableUtils {
         } else if (inputFormat.contains("Text")) {
             format = Format.CSV;
             // hive default field delimiter is '\u0001'
-            options.put(
-                    FIELD_DELIMITER.key(),
-                    serdeInfo.getParameters().getOrDefault(FIELD_DELIM, 
"\u0001"));
+            options.set(
+                    FIELD_DELIMITER, 
serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001"));
         } else {
             throw new UnsupportedOperationException("Unsupported table: " + 
hiveTable);
         }
 
-        return FormatTable.builder()
-                .identifier(identifier)
-                .rowType(rowType)
-                .partitionKeys(partitionKeys)
-                .location(location)
-                .format(format)
-                .options(options)
+        Schema.Builder builder = Schema.newBuilder();
+        rowType.getFields().forEach(f -> builder.column(f.name(), f.type(), 
f.description()));
+        options.set(PATH, location);
+        options.set(TYPE, FORMAT_TABLE);
+        options.set(FILE_FORMAT, format.name().toLowerCase());
+        return builder.partitionKeys(partitionKeys)
+                .options(options.toMap())
                 .comment(comment)
                 .build();
     }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
index ea669d254f..ff2bd04d1f 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
@@ -305,8 +305,6 @@ public class HiveCatalogTest extends CatalogTestBase {
             Thread thread1 =
                     new Thread(
                             () -> {
-                                System.out.println(
-                                        "First thread started at " + 
System.currentTimeMillis());
                                 try {
                                     
tables1.addAll(catalog.listTables(databaseName));
                                 } catch (Catalog.DatabaseNotExistException e) {
@@ -316,8 +314,6 @@ public class HiveCatalogTest extends CatalogTestBase {
             Thread thread2 =
                     new Thread(
                             () -> {
-                                System.out.println(
-                                        "Second thread started at " + 
System.currentTimeMillis());
                                 try {
                                     
tables2.addAll(catalog.listTables(databaseName));
                                 } catch (Catalog.DatabaseNotExistException e) {

Reply via email to