eolivelli commented on code in PR #20116: URL: https://github.com/apache/pulsar/pull/20116#discussion_r1174842575
########## pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java: ########## @@ -870,6 +872,56 @@ private void setupInput(ContextImpl contextImpl) throws Exception { Thread.currentThread().setContextClassLoader(this.instanceClassLoader); } } + private Map<String, Object> parseComponentConfig(String connectorConfigs) throws IOException { + return parseComponentConfig(connectorConfigs, instanceConfig, componentClassLoader, componentType); + } + + static Map<String, Object> parseComponentConfig(String connectorConfigs, + InstanceConfig instanceConfig, + ClassLoader componentClassLoader, + org.apache.pulsar.functions.proto.Function + .FunctionDetails.ComponentType componentType) + throws IOException { + final Map<String, Object> config = ObjectMapperFactory + .getMapper() + .reader() + .forType(new TypeReference<Map<String, Object>>() {}) + .readValue(connectorConfigs); + if (instanceConfig.isIgnoreUnknownConfigFields() && componentClassLoader instanceof NarClassLoader) { + final String configClass; + if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) { + configClass = ConnectorUtils + .getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass(); + } else if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) { + configClass = ConnectorUtils + .getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass(); + } else { + return config; + } + if (configClass != null) { + final Object configInstance = Reflections.createInstance(configClass, + Thread.currentThread().getContextClassLoader()); + final List<String> allFields = Review Comment: I am not sure that this is correct. ObjectMapper should follow Java Beans conventions and use getter/setters together with public fields. We should use some ObjectMapper utilities here in order to ensure that we are doing the right thing ########## pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java: ########## @@ -870,6 +872,56 @@ private void setupInput(ContextImpl contextImpl) throws Exception { Thread.currentThread().setContextClassLoader(this.instanceClassLoader); } } + private Map<String, Object> parseComponentConfig(String connectorConfigs) throws IOException { + return parseComponentConfig(connectorConfigs, instanceConfig, componentClassLoader, componentType); + } + + static Map<String, Object> parseComponentConfig(String connectorConfigs, + InstanceConfig instanceConfig, + ClassLoader componentClassLoader, + org.apache.pulsar.functions.proto.Function + .FunctionDetails.ComponentType componentType) + throws IOException { + final Map<String, Object> config = ObjectMapperFactory + .getMapper() + .reader() + .forType(new TypeReference<Map<String, Object>>() {}) + .readValue(connectorConfigs); + if (instanceConfig.isIgnoreUnknownConfigFields() && componentClassLoader instanceof NarClassLoader) { + final String configClass; + if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) { + configClass = ConnectorUtils + .getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass(); + } else if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) { + configClass = ConnectorUtils + .getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass(); + } else { + return config; + } + if (configClass != null) { + final Object configInstance = Reflections.createInstance(configClass, + Thread.currentThread().getContextClassLoader()); + final List<String> allFields = + Reflections + .getAllFields(configInstance.getClass()) + .stream() + .map(Field::getName) + .collect(Collectors.toList()); + + for (String s : config.keySet()) { + if (!allFields.contains(s)) { + log.warn("Field '{}' not defined in the {} configuration {}, the field will be ignored", Review Comment: maybe this should be logged as "ERROR", WARNINGs tend to be ignored by alert systems ########## pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java: ########## @@ -738,6 +738,17 @@ public String getFunctionAuthProviderClassName() { ) private List<String> additionalJavaRuntimeArguments = new ArrayList<>(); + @FieldContext( + category = CATEGORY_CONNECTORS, + doc = "Whether to ignore unknown properties when deserializing the connector configuration. " + + "After upgrading a connector to a new version with a new configuration, " + + "the new configuration may not be compatible with the old connector. " + + "In case of rollback, it's required to also rollback the connector configuration. " + + "Ignoring unknown fields makes possible to keep the new configuration and " + + "only rollback the connector." + ) + private boolean ignoreUnknownConfigFields = false; Review Comment: This configuration applies to the instances of the functions/connectors, and not to the function worker (that already ignores unknown fields) what about 'functionsIgnoreUnknownConfigFields' ? (maybe we can do better, but connectors are actually functions) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org