lasdf1234 commented on code in PR #10517:
URL: https://github.com/apache/gravitino/pull/10517#discussion_r2987876242


##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java:
##########
@@ -570,7 +572,21 @@ protected CatalogBaseTable toFlinkTable(Table table, 
ObjectPath tablePath) {
         schemaAndTablePropertiesConverter.toFlinkTableProperties(
             catalogOptions, table.properties(), tablePath);
     List<String> partitionKeys = 
partitionConverter.toFlinkPartitionKeys(table.partitioning());
-    return CatalogTable.of(builder.build(), table.comment(), partitionKeys, 
flinkTableProperties);
+    return newCatalogTable(builder.build(), table.comment(), partitionKeys, 
flinkTableProperties);
+  }
+
+  protected CatalogTable newCatalogTable(
+      org.apache.flink.table.api.Schema schema,
+      String comment,
+      List<String> partitionKeys,
+      Map<String, String> options) {
+    return catalogCompat().createCatalogTable(schema, comment, partitionKeys, 
options);
+  }
+
+  protected CatalogCompat catalogCompat() {
+    // Versioned catalog entry classes override this hook when the Flink minor 
has a different
+    // catalog/table API path.
+    return DefaultCatalogCompat.INSTANCE;

Review Comment:
   Have any other oss project compatible with multiple versions,Is it 
implemented in the same way?
   



##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java:
##########
@@ -252,4 +251,13 @@ private void applyGenericTableAlter(
       throw new CatalogException(e);
     }
   }
+
+  protected Map<String, String> toGravitinoGenericTableProperties(
+      ResolvedCatalogTable resolvedTable) {
+    return 
FlinkGenericTableUtil.toGravitinoGenericTableProperties(resolvedTable, 
catalogCompat());
+  }
+
+  protected CatalogTable toFlinkGenericTable(Table table) {
+    return FlinkGenericTableUtil.toFlinkGenericTable(table, catalogCompat());
+  }

Review Comment:
   Can we move these methods back to the Util class?



##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactoryOptions.java:
##########
@@ -30,6 +30,20 @@ public class GravitinoJdbcCatalogFactoryOptions {
   /** Identifier for the {@link GravitinoJdbcCatalog}. */
   public static final String POSTGRESQL_IDENTIFIER = 
"gravitino-jdbc-postgresql";
 
+  public static final ConfigOption<String> BASE_URL =
+      
ConfigOptions.key(JdbcPropertiesConstants.FLINK_JDBC_URL).stringType().noDefaultValue();
+
+  public static final ConfigOption<String> USERNAME =
+      
ConfigOptions.key(JdbcPropertiesConstants.FLINK_JDBC_USER).stringType().noDefaultValue();
+
+  public static final ConfigOption<String> PASSWORD =
+      
ConfigOptions.key(JdbcPropertiesConstants.FLINK_JDBC_PASSWORD).stringType().noDefaultValue();
+
   public static final ConfigOption<String> DEFAULT_DATABASE =
-      ConfigOptions.key("default-database").stringType().noDefaultValue();
+      ConfigOptions.key(JdbcPropertiesConstants.FLINK_JDBC_DEFAULT_DATABASE)
+          .stringType()
+          .noDefaultValue();
+
+  public static final ConfigOption<String> DRIVER =
+      
ConfigOptions.key(JdbcPropertiesConstants.FLINK_DRIVER).stringType().noDefaultValue();
 }

Review Comment:
   I used old version and configured this properties,Flink can run normally.Is 
it necessary to put these attributes here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to