danhuawang opened a new issue, #11596:
URL: https://github.com/apache/gravitino/issues/11596

   ### Version
   
   main branch
   
   ### Describe what's wrong
   
   The Gravitino Flink connector cannot open an Iceberg catalog whose Gravitino 
`catalog-backend` is `jdbc`. When Flink calls `useCatalog` on such a catalog, 
`GravitinoIcebergCatalog.open()` rebuilds a native Iceberg catalog client-side 
via `org.apache.iceberg.flink.FlinkCatalogFactory`, passing the Gravitino 
backend value straight through as the Iceberg `catalog-type`.
   
   The root cause is in 
`GravitinoIcebergCatalogFactory.toIcebergCatalogOptions()`, which maps the 
Gravitino `catalog-backend` value directly to the Iceberg Flink `catalog-type`:
   
   ```java
   String catalogBackend = 
catalogOptions.get(GRAVITINO_ICEBERG_CATALOG_BACKEND); // "jdbc"
   icebergCatalogOptions.put(ICEBERG_CATALOG_TYPE, catalogBackend);             
  // catalog-type=jdbc
   ```
   
   Iceberg's `FlinkCatalogFactory.createCatalogLoader()` only accepts `hive`, 
`hadoop`, or `rest` as `catalog-type`. A `jdbc` backend therefore throws 
`UnsupportedOperationException`. JDBC-backed Iceberg catalogs are supported by 
Iceberg natively, but in the Flink integration they must be loaded via 
`catalog-impl` (e.g. `org.apache.iceberg.jdbc.JdbcCatalog`), not 
`catalog-type`. The connector does not perform this translation.
   
   Impact by backend:
   - Iceberg Hive backend → `catalog-type=hive` ✅ works
   - Iceberg REST backend → `catalog-type=rest` ✅ works
   - Iceberg JDBC backend → `catalog-type=jdbc` ❌ crashes
   
   ### Error message and/or stacktrace
   
   ```
   java.lang.UnsupportedOperationException: Unknown catalog-type: jdbc (Must be 
'hive', 'hadoop' or 'rest')
       at 
org.apache.iceberg.flink.FlinkCatalogFactory.createCatalogLoader(FlinkCatalogFactory.java:119)
       at 
org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:144)
       at 
org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:139)
       at 
org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalog.open(GravitinoIcebergCatalog.java:98)
       at 
org.apache.flink.table.catalog.CatalogManager.lambda$getCatalog$0(CatalogManager.java:405)
       at 
org.apache.flink.table.catalog.CatalogManager.getCatalog(CatalogManager.java:402)
       at 
org.apache.flink.table.catalog.CatalogManager.setCurrentCatalog(CatalogManager.java:458)
       at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.useCatalog(TableEnvironmentImpl.java:1222)
   ```
   
   ### How to reproduce
   
   + Gravitino version: main branch
   + Create an Iceberg catalog in Gravitino with `catalog-backend=jdbc` (e.g. 
JDBC URI plus `jdbc-user` / `jdbc-password` / `jdbc-driver`).
   + Register the Gravitino catalog store in a Flink `TableEnvironment` using 
the `gravitino` catalog store.
   + Run `USE CATALOG <iceberg_jdbc_catalog>;` (or any statement that triggers 
catalog `open()`).
   + Observe the `UnsupportedOperationException: Unknown catalog-type: jdbc`.
   
   ### Additional context
   
   In this environment, the affected catalog (`catalog_1`) is also the default 
catalog of Gravitino's built-in Iceberg REST (IRC) service, configured as a 
dynamic config provider:
   
   ```properties
   gravitino.iceberg-rest.catalog-config-provider = dynamic-config-provider
   gravitino.iceberg-rest.gravitino-uri = <gravitino-uri>
   gravitino.iceberg-rest.gravitino-metalake = test
   gravitino.iceberg-rest.default-catalog-name = catalog_1
   ```
   
   So the same JDBC-backed Iceberg catalog is reachable through the Gravitino 
IRC endpoint, but the Flink connector still resolves it via the native 
`catalog-type=jdbc` path rather than going through REST, which is what triggers 
the failure.
   
   `GravitinoIcebergCatalogFactory.toIcebergCatalogOptions()` should translate 
a `jdbc` backend into the Iceberg Flink `catalog-impl` form (e.g. 
`catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog`) instead of passing 
`catalog-type=jdbc`, or otherwise fail with a clear "unsupported backend" 
message. The Gravitino-side property maps in `IcebergPropertiesUtils` already 
carry JDBC properties (`jdbc-user`, `jdbc-password`, `jdbc-driver`), so the 
server side supports JDBC backends — only the Flink connector path is broken. 
Discovered while building Flink connector view E2E tests for Iceberg (related 
to PR #11349).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to