This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 77670204c9 [core] Remove Catalog.getTableLocation interface (#4718)
77670204c9 is described below

commit 77670204c98501f9e86f04bc16d651ba52f9594a
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 16 19:41:24 2024 +0800

    [core] Remove Catalog.getTableLocation interface (#4718)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |  1 -
 .../java/org/apache/paimon/catalog/Catalog.java    |  9 -----
 .../org/apache/paimon/catalog/DelegateCatalog.java |  6 ----
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  6 ----
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 15 --------
 .../paimon/flink/clone/CopyFileOperator.java       | 41 ++++++++++++++++++----
 .../org/apache/paimon/flink/FlinkCatalogTest.java  |  8 ++---
 .../sink/partition/PartitionMarkDoneTest.java      |  2 +-
 8 files changed, 38 insertions(+), 50 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index b56fec279a..a1b41e3b8a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -496,7 +496,6 @@ public abstract class AbstractCatalog implements Catalog {
         return Optional.empty();
     }
 
-    @Override
     public Path getTableLocation(Identifier identifier) {
         return new Path(newDatabasePath(identifier.getDatabaseName()), 
identifier.getTableName());
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index d919c59782..c3808caa13 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -20,7 +20,6 @@ package org.apache.paimon.catalog;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -136,14 +135,6 @@ public interface Catalog extends AutoCloseable {
      */
     Table getTable(Identifier identifier) throws TableNotExistException;
 
-    /**
-     * Get the table location in this catalog. If the table exists, return the 
location of the
-     * table; If the table does not exist, construct the location for table.
-     *
-     * @return the table location
-     */
-    Path getTableLocation(Identifier identifier);
-
     /**
      * Get names of all tables under this database. An empty list is returned 
if none exists.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index ec14d53a2b..2298626b0e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -147,11 +146,6 @@ public class DelegateCatalog implements Catalog {
         wrapped.renameView(fromView, toView, ignoreIfNotExists);
     }
 
-    @Override
-    public Path getTableLocation(Identifier identifier) {
-        return wrapped.getTableLocation(identifier);
-    }
-
     @Override
     public void createPartition(Identifier identifier, Map<String, String> 
partitions)
             throws TableNotExistException {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 03b257efbf..86b87e25e8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
@@ -177,11 +176,6 @@ public class RESTCatalog implements Catalog {
         throw new UnsupportedOperationException();
     }
 
-    @Override
-    public Path getTableLocation(Identifier identifier) {
-        throw new UnsupportedOperationException();
-    }
-
     @Override
     public List<String> listTables(String databaseName) throws 
DatabaseNotExistException {
         return new ArrayList<String>();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 3a7f9790cc..dd95c48af8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -25,7 +25,6 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.procedure.ProcedureUtil;
 import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
 import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.options.Options;
@@ -525,20 +524,6 @@ public class FlinkCatalog extends AbstractCatalog {
             registerLogSystem(catalog, identifier, options, classLoader);
         }
 
-        // remove table path
-        String path = options.remove(PATH.key());
-        if (path != null) {
-            Path expectedPath = catalog.getTableLocation(identifier);
-            if (!new Path(path).equals(expectedPath)) {
-                throw new CatalogException(
-                        String.format(
-                                "You specified the Path when creating the 
table, "
-                                        + "but the Path '%s' is different from 
where it should be '%s'. "
-                                        + "Please remove the Path.",
-                                path, expectedPath));
-            }
-        }
-
         if (catalogTable instanceof CatalogTable) {
             return fromCatalogTable(((CatalogTable) 
catalogTable).copy(options));
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
index d355424299..8bcaa2a207 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
@@ -18,12 +18,14 @@
 
 package org.apache.paimon.flink.clone;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.IOUtils;
 
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -32,6 +34,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 
 /** A Operator to copy files. */
@@ -43,8 +46,11 @@ public class CopyFileOperator extends 
AbstractStreamOperator<CloneFileInfo>
     private final Map<String, String> sourceCatalogConfig;
     private final Map<String, String> targetCatalogConfig;
 
-    private Catalog sourceCatalog;
-    private Catalog targetCatalog;
+    private transient Catalog sourceCatalog;
+    private transient Catalog targetCatalog;
+
+    private transient Map<String, Path> srcLocations;
+    private transient Map<String, Path> targetLocations;
 
     public CopyFileOperator(
             Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
@@ -58,6 +64,8 @@ public class CopyFileOperator extends 
AbstractStreamOperator<CloneFileInfo>
                 
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
         targetCatalog =
                 
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
+        srcLocations = new HashMap<>();
+        targetLocations = new HashMap<>();
     }
 
     @Override
@@ -66,12 +74,29 @@ public class CopyFileOperator extends 
AbstractStreamOperator<CloneFileInfo>
 
         FileIO sourceTableFileIO = sourceCatalog.fileIO();
         FileIO targetTableFileIO = targetCatalog.fileIO();
+
         Path sourceTableRootPath =
-                sourceCatalog.getTableLocation(
-                        
Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
+                srcLocations.computeIfAbsent(
+                        cloneFileInfo.getSourceIdentifier(),
+                        key -> {
+                            try {
+                                return pathOfTable(
+                                        
sourceCatalog.getTable(Identifier.fromString(key)));
+                            } catch (Catalog.TableNotExistException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
         Path targetTableRootPath =
-                targetCatalog.getTableLocation(
-                        
Identifier.fromString(cloneFileInfo.getTargetIdentifier()));
+                targetLocations.computeIfAbsent(
+                        cloneFileInfo.getTargetIdentifier(),
+                        key -> {
+                            try {
+                                return pathOfTable(
+                                        
targetCatalog.getTable(Identifier.fromString(key)));
+                            } catch (Catalog.TableNotExistException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
 
         String filePathExcludeTableRoot = 
cloneFileInfo.getFilePathExcludeTableRoot();
         Path sourcePath = new Path(sourceTableRootPath + 
filePathExcludeTableRoot);
@@ -110,6 +135,10 @@ public class CopyFileOperator extends 
AbstractStreamOperator<CloneFileInfo>
         output.collect(streamRecord);
     }
 
+    private Path pathOfTable(Table table) {
+        return new Path(table.options().get(CoreOptions.PATH.key()));
+    }
+
     @Override
     public void close() throws Exception {
         if (sourceCatalog != null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index e4286eb181..734a47dead 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -102,6 +102,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** Test for {@link FlinkCatalog}. */
 public class FlinkCatalogTest {
+
     private static final String TESTING_LOG_STORE = "testing";
 
     private final ObjectPath path1 = new ObjectPath("db1", "t1");
@@ -348,12 +349,7 @@ public class FlinkCatalogTest {
         CatalogTable table1 = createTable(options);
         assertThatThrownBy(() -> catalog.createTable(this.path1, table1, 
false))
                 .hasMessageContaining(
-                        "You specified the Path when creating the table, "
-                                + "but the Path '/unknown/path' is different 
from where it should be");
-
-        options.put(PATH.key(), warehouse + "/db1.db/t1");
-        CatalogTable table2 = createTable(options);
-        catalog.createTable(this.path1, table2, false);
+                        "The current catalog FileSystemCatalog does not 
support specifying the table path when creating a table.");
     }
 
     @ParameterizedTest
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index f0f4596c61..9e5fe7ff9f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -83,7 +83,7 @@ class PartitionMarkDoneTest extends TableTestBase {
                         .build();
         catalog.createTable(identifier, schema, true);
         FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
-        Path location = catalog.getTableLocation(identifier);
+        Path location = table.location();
         Path successFile = new Path(location, "a=0/_SUCCESS");
         PartitionMarkDone markDone =
                 PartitionMarkDone.create(false, false, new 
MockOperatorStateStore(), table).get();

Reply via email to