yunfengzhou-hub commented on code in PR #4381:
URL: https://github.com/apache/paimon/pull/4381#discussion_r1821938741


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java:
##########
@@ -243,16 +257,34 @@ static Table buildPaimonTable(DynamicTableFactory.Context 
context) {
         newOptions.putAll(origin.getOptions());
         newOptions.putAll(dynamicOptions);
 
-        // notice that the Paimon table schema must be the same with the 
Flink's
-        if (origin instanceof DataCatalogTable) {
-            FileStoreTable fileStoreTable = (FileStoreTable) 
((DataCatalogTable) origin).table();
-            table = fileStoreTable.copyWithoutTimeTravel(newOptions);
+        FileStoreTable fileStoreTable;
+
+        // The following if conditions provide a shortcut to acquire Paimon 
table.
+        if (origin instanceof FormatCatalogTable) {
+            fileStoreTable = (FileStoreTable) ((FormatCatalogTable) 
origin).table();
+        } else if (origin instanceof DataCatalogTable) {
+            fileStoreTable = (FileStoreTable) ((DataCatalogTable) 
origin).table();
+        } else if (flinkCatalog == null) {
+            LOG.warn(
+                    "FlinkCatalog is null. The process to find out Paimon 
table might be incorrect.");
+            fileStoreTable = 
FileStoreTableFactory.create(createCatalogContext(context));
         } else {
-            table =
-                    FileStoreTableFactory.create(createCatalogContext(context))
-                            .copyWithoutTimeTravel(newOptions);
+            // In case the shortcut is not matched, the paimon table can still 
be acquired from
+            // catalog.
+            Identifier identifier =
+                    Identifier.create(
+                            context.getObjectIdentifier().getDatabaseName(),
+                            context.getObjectIdentifier().getObjectName());
+            try {
+                fileStoreTable = (FileStoreTable) 
flinkCatalog.catalog().getTable(identifier);

Review Comment:
   The return type of 
`DynamicTableFactory.Context#getCatalogTable().getOrigin()` is `CatalogTable`, 
and `CatalogMaterializedTable` is not assignable from `CatalogTable`.
   
   The problem I want to solve is that in the following code 
   ```java
   if (origin instanceof DataCatalogTable) {
       FileStoreTable fileStoreTable = (FileStoreTable) ((DataCatalogTable) 
origin).table();
       table = fileStoreTable.copyWithoutTimeTravel(newOptions);
   } else {
       table =
               FileStoreTableFactory.create(createCatalogContext(context))
                       .copyWithoutTimeTravel(newOptions);
   }
   ```
   Flink Paimon catalog assumed that if the origin table is not a 
`DataCatalogTable`, then it must support `FileStoreTableFactory#create` (i.e. 
the scheme of the origin table's path url can be matched with a FileIOLoader). 
While in practice I encountered a custom FileIO that does not support SPI 
dynamic loading, and thus would throw exceptions when running into 
`FileStoreTableFactory#create`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to