FANNG1 commented on code in PR #10517:
URL: https://github.com/apache/gravitino/pull/10517#discussion_r2987012016


##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java:
##########
@@ -73,6 +73,24 @@ public Catalog createCatalog(Context context) {
         helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION));
   }
 
+  protected Catalog newCatalog(
+      String catalogName,
+      String defaultDatabase,
+      java.util.Map<String, String> catalogOptions,
+      SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
+      PartitionConverter partitionConverter,
+      @javax.annotation.Nullable HiveConf hiveConf,

Review Comment:
   Fixed in b68f1e918. I replaced the fully qualified 
`@javax.annotation.Nullable` with the normal import to match the repository 
style.



##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java:
##########
@@ -46,15 +47,25 @@ public org.apache.flink.table.catalog.Catalog 
createCatalog(Context context) {
     context.getOptions().remove(JdbcPropertiesConstants.FLINK_DRIVER);
     final FactoryUtil.CatalogFactoryHelper helper =
         FactoryUtils.createCatalogFactoryHelper(this, context);
+    helper.validate();

Review Comment:
   I kept this validation step. The JDBC factory needs to validate the 
effective option set after the versioned refactor, especially because the 
Flink-facing options and the backend JDBC catalog options are not identical. 
Keeping the check here fails invalid configurations early instead of letting 
them surface later during catalog creation.



##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java:
##########
@@ -67,11 +78,15 @@ public PartitionConverter partitionConverter() {
 
   @Override
   public Set<ConfigOption<?>> requiredOptions() {
-    return Collections.emptySet();
+    return ImmutableSet.of(

Review Comment:
   I kept this change. The required option set still needs to be enforced in 
the common JDBC factory after the versioned split; otherwise the factory can 
accept incomplete definitions and fail later in a less obvious way. This is a 
behavior-preserving validation change rather than a formatting change.



##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java:
##########
@@ -43,10 +43,19 @@ public Catalog createCatalog(Context context) {
         FactoryUtils.createCatalogFactoryHelper(this, context);
     String defaultDatabase =
         
helper.getOptions().get(GravitinoPaimonCatalogFactoryOptions.DEFAULT_DATABASE);
-    return new GravitinoPaimonCatalog(
+    return newCatalog(
         context, defaultDatabase, schemaAndTablePropertiesConverter(), 
partitionConverter());
   }
 
+  protected Catalog newCatalog(

Review Comment:
   I left this as-is for now. The current protected template method keeps the 
version extension point while still allowing a shared default path in the 
common factory. Making it abstract would force every version module to 
re-declare the method even when the common behavior is still valid.



##########
flink-connector/v1.18/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFlink118.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.iceberg;
+
+import java.util.Map;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.gravitino.flink.connector.PartitionConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
+import org.apache.gravitino.flink.connector.utils.CatalogCompat;
+import org.apache.gravitino.flink.connector.utils.CatalogCompatFlink118;
+import org.apache.iceberg.flink.FlinkCatalogFactory;
+
+public class GravitinoIcebergCatalogFlink118 extends GravitinoIcebergCatalog {
+
+  protected GravitinoIcebergCatalogFlink118(
+      String catalogName,
+      String defaultDatabase,
+      SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
+      PartitionConverter partitionConverter,
+      Map<String, String> catalogOptions,
+      Map<String, String> icebergCatalogProperties,
+      CatalogFactory.Context context) {
+    super(

Review Comment:
   Fixed in b68f1e918. The 1.18 catalog implementation does not use 
`CatalogFactory.Context`, so I removed the dead constructor parameter from 
`GravitinoIcebergCatalogFlink118` and kept the versioned factory path aligned 
with the actual usage.



##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java:
##########
@@ -93,4 +117,16 @@ public SchemaAndTablePropertiesConverter 
schemaAndTablePropertiesConverter() {
   public PartitionConverter partitionConverter() {
     return DefaultPartitionConverter.INSTANCE;
   }
+
+  private Map<String, String> toIcebergCatalogOptions(Map<String, String> 
catalogOptions) {

Review Comment:
   I kept this method. In the versioned split, the common Iceberg factory still 
needs one place to translate Gravitino catalog properties into the 
Iceberg-specific option set before delegating to the versioned catalog 
implementation. This helper keeps that mapping local and avoids duplicating the 
conversion logic in each version module.



##########
flink-connector/flink-common/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java:
##########
@@ -46,15 +47,25 @@ public org.apache.flink.table.catalog.Catalog 
createCatalog(Context context) {
     context.getOptions().remove(JdbcPropertiesConstants.FLINK_DRIVER);
     final FactoryUtil.CatalogFactoryHelper helper =
         FactoryUtils.createCatalogFactoryHelper(this, context);
+    helper.validate();
     String defaultDatabase =
         
helper.getOptions().get(GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE);
     Preconditions.checkArgument(
         defaultDatabase != null,
         GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE.key() + " should 
not be null.");
-    return new GravitinoJdbcCatalog(
+    return newCatalog(
         context, defaultDatabase, schemaAndTablePropertiesConverter(), 
partitionConverter());
   }
 
+  protected org.apache.flink.table.catalog.Catalog newCatalog(

Review Comment:
   Fixed in b68f1e918. I switched this back to the imported `Catalog` type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to