twalthr commented on a change in pull request #18283:
URL: https://github.com/apache/flink/pull/18283#discussion_r780229275



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, 
ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = 
formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, 
formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, 
formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super 
Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only 
if the original
+         * configuration contains the format config option. It will fail if 
there is a mismatch of
+         * the identifier between the format in originalMap and the one in 
catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' 
specified, while the catalog table has it with value '%s'. "

Review comment:
       also:
   ```
   while the enriching table has it with
   ```

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, 
ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = 
formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, 
formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, 
formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super 
Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only 
if the original
+         * configuration contains the format config option. It will fail if 
there is a mismatch of
+         * the identifier between the format in originalMap and the one in 
catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' 
specified, while the catalog table has it with value '%s'. "
+                                        + "This is invalid, as either only the 
persisted plan table defines the format, "
+                                        + "or both the persisted plan table 
and the catalog table defines the same format",

Review comment:
       also:
   
   ```
   and enriching table defines...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to