FANNG1 commented on code in PR #10517:
URL: https://github.com/apache/gravitino/pull/10517#discussion_r2988177122
##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java:
##########
@@ -252,4 +251,13 @@ private void applyGenericTableAlter(
throw new CatalogException(e);
}
}
+
+ protected Map<String, String> toGravitinoGenericTableProperties(
+ ResolvedCatalogTable resolvedTable) {
+ return
FlinkGenericTableUtil.toGravitinoGenericTableProperties(resolvedTable,
catalogCompat());
+ }
+
+ protected CatalogTable toFlinkGenericTable(Table table) {
+ return FlinkGenericTableUtil.toFlinkGenericTable(table, catalogCompat());
+ }
Review Comment:
I prefer to keep these thin wrapper methods on the catalog class. They
delegate to `FlinkGenericTableUtil`, but they also bind the call to the catalog
instance so the version-specific `catalogCompat()` hook can be applied
naturally by subclasses such as the Flink 1.18 catalog. Moving them back to the
util class would make the compat path more scattered rather than simpler.
##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactoryOptions.java:
##########
@@ -30,6 +30,20 @@ public class GravitinoJdbcCatalogFactoryOptions {
/** Identifier for the {@link GravitinoJdbcCatalog}. */
public static final String POSTGRESQL_IDENTIFIER =
"gravitino-jdbc-postgresql";
+ public static final ConfigOption<String> BASE_URL =
+
ConfigOptions.key(JdbcPropertiesConstants.FLINK_JDBC_URL).stringType().noDefaultValue();
+
+ public static final ConfigOption<String> USERNAME =
+
ConfigOptions.key(JdbcPropertiesConstants.FLINK_JDBC_USER).stringType().noDefaultValue();
+
+ public static final ConfigOption<String> PASSWORD =
+
ConfigOptions.key(JdbcPropertiesConstants.FLINK_JDBC_PASSWORD).stringType().noDefaultValue();
+
public static final ConfigOption<String> DEFAULT_DATABASE =
- ConfigOptions.key("default-database").stringType().noDefaultValue();
+ ConfigOptions.key(JdbcPropertiesConstants.FLINK_JDBC_DEFAULT_DATABASE)
+ .stringType()
+ .noDefaultValue();
+
+ public static final ConfigOption<String> DRIVER =
+
ConfigOptions.key(JdbcPropertiesConstants.FLINK_DRIVER).stringType().noDefaultValue();
}
Review Comment:
I kept these options here intentionally. This does not introduce a new
user-facing requirement compared with the old implementation; it centralizes
the Flink-side JDBC option definitions so `requiredOptions()`,
`optionalOptions()`, and option lookups use the same source of truth after the
versioned refactor. That keeps the factory logic clearer and avoids scattering
the option keys across the implementation.
##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java:
##########
@@ -570,7 +572,21 @@ protected CatalogBaseTable toFlinkTable(Table table,
ObjectPath tablePath) {
schemaAndTablePropertiesConverter.toFlinkTableProperties(
catalogOptions, table.properties(), tablePath);
List<String> partitionKeys =
partitionConverter.toFlinkPartitionKeys(table.partitioning());
- return CatalogTable.of(builder.build(), table.comment(), partitionKeys,
flinkTableProperties);
+ return newCatalogTable(builder.build(), table.comment(), partitionKeys,
flinkTableProperties);
+ }
+
+ protected CatalogTable newCatalogTable(
+ org.apache.flink.table.api.Schema schema,
+ String comment,
+ List<String> partitionKeys,
+ Map<String, String> options) {
+ return catalogCompat().createCatalogTable(schema, comment, partitionKeys,
options);
+ }
+
+ protected CatalogCompat catalogCompat() {
+ // Versioned catalog entry classes override this hook when the Flink minor
has a different
+ // catalog/table API path.
+ return DefaultCatalogCompat.INSTANCE;
Review Comment:
I do not think this needs a code change in this PR. The current structure
uses a shared common layer plus version-specific adapters/hooks
(`catalogCompat()` and the versioned catalog/factory classes) to isolate Flink
minor-version API differences. That is the main reason for this split: keep the
common behavior in one place and localize the Flink-version-specific parts in
each version module.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]