Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
mxm closed pull request #729: [FLINK-33770] Migrate legacy autoscaler config keys URL: https://github.com/apache/flink-kubernetes-operator/pull/729 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
mxm commented on PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#issuecomment-1855734056 While testing, I noticed that the abstraction in the PR doesn't fully hold because there is code in the reconciler which access autoscaler options and uses the observe config instead of the migrated config, e.g. [here](https://github.com/apache/flink-kubernetes-operator/blob/2136c7a3189753c45e167920bd509a687218a57f/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L190). I could fix this but I fear that the abstraction might become leaky again in the future. I'm closing this PR because I want to avoid adding too much complexity to the config key handling. The current approach to use fallback instead of deprecated keys is fine. Fallback keys are logged at INFO, so we can simply set the logging for `Configuration` to WARN. See: https://github.com/apache/flink-kubernetes-operator/pull/734 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
mxm commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1426558636 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() - .withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); public static final ConfigOption SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofMinutes(30)) - .withFallbackKeys(oldOperatorConfigKey("scaling.event.interval")) .withDescription("Time interval to resend the identical event"); public static final ConfigOption FLINK_CLIENT_TIMEOUT = autoScalerConfig("flink.rest-client.timeout") .durationType() .defaultValue(Duration.ofSeconds(10)) - .withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout")) .withDescription("The timeout for waiting the flink rest client to return."); + +/** Migrate config keys still prefixed with the old Kubernetes operator prefix. */ +public static Configuration migrateOldConfigKeys(Configuration config) { +Preconditions.checkNotNull(config); +config = new Configuration(config); + +Set toBeMigrated = new HashSet<>(); +for (String key : config.keySet()) { +if (key.startsWith(LEGACY_CONF_PREFIX)) { +toBeMigrated.add(key); +} +} +for (String key : toBeMigrated) { +String migratedKey = key.substring(LEGACY_CONF_PREFIX.length()); +boolean keyDoesNotExist = config.getString(migratedKey, null) == null; +if (keyDoesNotExist) { +String migratedValue = Preconditions.checkNotNull(config.getString(key, null)); +config.setString(migratedKey, migratedValue); +} +config.removeKey(key); Review Comment: I changed the PR to not delete them as you suggested. The reason is that while testing I found that operating on the Configuration API is dangerous (which is why the first version of this PR did intentionally operate directly on the internal Map). When you delete a key via `Configuration#removeKey` which is a prefix of another key, it will delete both. We won't get warnings because the deprecated config keys have been removed in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
mxm commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1426558636 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() - .withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); public static final ConfigOption SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofMinutes(30)) - .withFallbackKeys(oldOperatorConfigKey("scaling.event.interval")) .withDescription("Time interval to resend the identical event"); public static final ConfigOption FLINK_CLIENT_TIMEOUT = autoScalerConfig("flink.rest-client.timeout") .durationType() .defaultValue(Duration.ofSeconds(10)) - .withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout")) .withDescription("The timeout for waiting the flink rest client to return."); + +/** Migrate config keys still prefixed with the old Kubernetes operator prefix. */ +public static Configuration migrateOldConfigKeys(Configuration config) { +Preconditions.checkNotNull(config); +config = new Configuration(config); + +Set toBeMigrated = new HashSet<>(); +for (String key : config.keySet()) { +if (key.startsWith(LEGACY_CONF_PREFIX)) { +toBeMigrated.add(key); +} +} +for (String key : toBeMigrated) { +String migratedKey = key.substring(LEGACY_CONF_PREFIX.length()); +boolean keyDoesNotExist = config.getString(migratedKey, null) == null; +if (keyDoesNotExist) { +String migratedValue = Preconditions.checkNotNull(config.getString(key, null)); +config.setString(migratedKey, migratedValue); +} +config.removeKey(key); Review Comment: I changed the PR to not delete them as you suggested. The reason is that while testing I found that operating on the Configuration API is dangerous (which is why the first version of this PR did intentionally operated directly on the internal Map). When you delete a key via `Configuration#removeKey` which is a prefix of another key, it will delete both. We won't get warnings because the deprecated config keys have been removed in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
gyfora commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1425609941 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() - .withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); public static final ConfigOption SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofMinutes(30)) - .withFallbackKeys(oldOperatorConfigKey("scaling.event.interval")) .withDescription("Time interval to resend the identical event"); public static final ConfigOption FLINK_CLIENT_TIMEOUT = autoScalerConfig("flink.rest-client.timeout") .durationType() .defaultValue(Duration.ofSeconds(10)) - .withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout")) .withDescription("The timeout for waiting the flink rest client to return."); + +/** Migrate config keys still prefixed with the old Kubernetes operator prefix. */ +public static Configuration migrateOldConfigKeys(Configuration config) { +Preconditions.checkNotNull(config); +config = new Configuration(config); + +Set toBeMigrated = new HashSet<>(); +for (String key : config.keySet()) { +if (key.startsWith(LEGACY_CONF_PREFIX)) { +toBeMigrated.add(key); +} +} +for (String key : toBeMigrated) { +String migratedKey = key.substring(LEGACY_CONF_PREFIX.length()); +boolean keyDoesNotExist = config.getString(migratedKey, null) == null; +if (keyDoesNotExist) { +String migratedValue = Preconditions.checkNotNull(config.getString(key, null)); +config.setString(migratedKey, migratedValue); +} +config.removeKey(key); Review Comment: Actually I am not sure whether we will get the warn logs if we don't delete it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
1996fanrui commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424811105 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -247,21 +221,36 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() - .withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); public static final ConfigOption SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofMinutes(30)) - .withFallbackKeys(oldOperatorConfigKey("scaling.event.interval")) .withDescription("Time interval to resend the identical event"); public static final ConfigOption FLINK_CLIENT_TIMEOUT = autoScalerConfig("flink.rest-client.timeout") .durationType() .defaultValue(Duration.ofSeconds(10)) - .withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout")) .withDescription("The timeout for waiting the flink rest client to return."); + +/** Migrate config keys still prefixed with the old Kubernetes operator prefix. */ +public static Configuration migrateOldConfigKeys(Configuration config) { +Preconditions.checkNotNull(config); +Set allKeys = config.keySet(); +config = new Configuration(config); +for (String key : allKeys) { +if (key.startsWith(LEGACY_CONF_PREFIX)) { +String migratedKey = key.substring(LEGACY_CONF_PREFIX.length()); +if (!config.containsKey(migratedKey)) { +String migratedValue = config.getString(key, null); Review Comment: Just a mark for removing the `@Depreated` for `getString(String key, String defaultValue)`, please ignore it for code review. Here is a [thread](https://lists.apache.org/thread/zzsf7glfcdjcjm1hfo1xdwc6jp37nb3m) to discuss removing `@Depreated` for `public String getString(String key, String defaultValue)` of `Configuration`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
1996fanrui commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424811105 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -247,21 +221,36 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() - .withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); public static final ConfigOption SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofMinutes(30)) - .withFallbackKeys(oldOperatorConfigKey("scaling.event.interval")) .withDescription("Time interval to resend the identical event"); public static final ConfigOption FLINK_CLIENT_TIMEOUT = autoScalerConfig("flink.rest-client.timeout") .durationType() .defaultValue(Duration.ofSeconds(10)) - .withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout")) .withDescription("The timeout for waiting the flink rest client to return."); + +/** Migrate config keys still prefixed with the old Kubernetes operator prefix. */ +public static Configuration migrateOldConfigKeys(Configuration config) { +Preconditions.checkNotNull(config); +Set allKeys = config.keySet(); +config = new Configuration(config); +for (String key : allKeys) { +if (key.startsWith(LEGACY_CONF_PREFIX)) { +String migratedKey = key.substring(LEGACY_CONF_PREFIX.length()); +if (!config.containsKey(migratedKey)) { +String migratedValue = config.getString(key, null); Review Comment: Just a mark for removing the `@Depreated` for `getString(String key, String defaultValue)`, please ignore it for code review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
mxm commented on PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#issuecomment-1852520779 Folks, any more comments? Thanks for your suggestions regarding the implementation. I had a good three attempts and I think it looks good now. The source of quirkiness is really `Configuration` which has some odd design choices. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
mxm commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424363838 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() - .withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); public static final ConfigOption SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofMinutes(30)) - .withFallbackKeys(oldOperatorConfigKey("scaling.event.interval")) .withDescription("Time interval to resend the identical event"); public static final ConfigOption FLINK_CLIENT_TIMEOUT = autoScalerConfig("flink.rest-client.timeout") .durationType() .defaultValue(Duration.ofSeconds(10)) - .withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout")) .withDescription("The timeout for waiting the flink rest client to return."); + +/** Migrate config keys still prefixed with the old Kubernetes operator prefix. */ +public static Configuration migrateOldConfigKeys(Configuration config) { +Preconditions.checkNotNull(config); +config = new Configuration(config); + +Set toBeMigrated = new HashSet<>(); +for (String key : config.keySet()) { +if (key.startsWith(LEGACY_CONF_PREFIX)) { +toBeMigrated.add(key); +} +} +for (String key : toBeMigrated) { +String migratedKey = key.substring(LEGACY_CONF_PREFIX.length()); +boolean keyDoesNotExist = config.getString(migratedKey, null) == null; +if (keyDoesNotExist) { +String migratedValue = Preconditions.checkNotNull(config.getString(key, null)); +config.setString(migratedKey, migratedValue); +} +config.removeKey(key); +} +return config; +} Review Comment: There are a lot of copy steps with this approach because `toMap()` converts every single value before it inserts it into the new map. `new Configuration(config)` directly passes on the internal map. With that in mind, I tried incorporating your code suggestions, i.e. do a single pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
mxm commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424306855 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() - .withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); public static final ConfigOption SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofMinutes(30)) - .withFallbackKeys(oldOperatorConfigKey("scaling.event.interval")) .withDescription("Time interval to resend the identical event"); public static final ConfigOption FLINK_CLIENT_TIMEOUT = autoScalerConfig("flink.rest-client.timeout") .durationType() .defaultValue(Duration.ofSeconds(10)) - .withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout")) .withDescription("The timeout for waiting the flink rest client to return."); + +/** Migrate config keys still prefixed with the old Kubernetes operator prefix. */ +public static Configuration migrateOldConfigKeys(Configuration config) { +Preconditions.checkNotNull(config); +config = new Configuration(config); + +Set toBeMigrated = new HashSet<>(); +for (String key : config.keySet()) { +if (key.startsWith(LEGACY_CONF_PREFIX)) { +toBeMigrated.add(key); +} +} +for (String key : toBeMigrated) { +String migratedKey = key.substring(LEGACY_CONF_PREFIX.length()); +boolean keyDoesNotExist = config.getString(migratedKey, null) == null; +if (keyDoesNotExist) { +String migratedValue = Preconditions.checkNotNull(config.getString(key, null)); +config.setString(migratedKey, migratedValue); +} +config.removeKey(key); Review Comment: I believe we should remove the key for clarify since it will be superseded by the non-legacy key. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
gyfora commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424117478 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() - .withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); public static final ConfigOption SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofMinutes(30)) - .withFallbackKeys(oldOperatorConfigKey("scaling.event.interval")) .withDescription("Time interval to resend the identical event"); public static final ConfigOption FLINK_CLIENT_TIMEOUT = autoScalerConfig("flink.rest-client.timeout") .durationType() .defaultValue(Duration.ofSeconds(10)) - .withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout")) .withDescription("The timeout for waiting the flink rest client to return."); + +/** Migrate config keys still prefixed with the old Kubernetes operator prefix. */ +public static Configuration migrateOldConfigKeys(Configuration config) { +Preconditions.checkNotNull(config); +config = new Configuration(config); + +Set toBeMigrated = new HashSet<>(); +for (String key : config.keySet()) { +if (key.startsWith(LEGACY_CONF_PREFIX)) { +toBeMigrated.add(key); +} +} +for (String key : toBeMigrated) { +String migratedKey = key.substring(LEGACY_CONF_PREFIX.length()); +boolean keyDoesNotExist = config.getString(migratedKey, null) == null; +if (keyDoesNotExist) { +String migratedValue = Preconditions.checkNotNull(config.getString(key, null)); +config.setString(migratedKey, migratedValue); +} +config.removeKey(key); Review Comment: We don't really need to remove the old key if it's not marked as deprecated/fallback -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
gyfora commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424115980 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() - .withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); public static final ConfigOption SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofMinutes(30)) - .withFallbackKeys(oldOperatorConfigKey("scaling.event.interval")) .withDescription("Time interval to resend the identical event"); public static final ConfigOption FLINK_CLIENT_TIMEOUT = autoScalerConfig("flink.rest-client.timeout") .durationType() .defaultValue(Duration.ofSeconds(10)) - .withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout")) .withDescription("The timeout for waiting the flink rest client to return."); + +/** Migrate config keys still prefixed with the old Kubernetes operator prefix. */ +public static Configuration migrateOldConfigKeys(Configuration config) { +Preconditions.checkNotNull(config); +config = new Configuration(config); + +Set toBeMigrated = new HashSet<>(); +for (String key : config.keySet()) { +if (key.startsWith(LEGACY_CONF_PREFIX)) { +toBeMigrated.add(key); +} +} +for (String key : toBeMigrated) { +String migratedKey = key.substring(LEGACY_CONF_PREFIX.length()); +boolean keyDoesNotExist = config.getString(migratedKey, null) == null; +if (keyDoesNotExist) { Review Comment: Can be simplified to `if(!config.containsKey(migratedKey))` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
mxm commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424044305 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -46,107 +45,93 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { autoScalerConfig("enabled") .booleanType() .defaultValue(false) -.withFallbackKeys(oldOperatorConfigKey("enabled")) Review Comment: Yes, after migration the fallback keys are not needed anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
mxm commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424043535 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .stringType() .asList() .defaultValues() - .withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids")) .withDescription( "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); public static final ConfigOption SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") .durationType() .defaultValue(Duration.ofMinutes(30)) - .withFallbackKeys(oldOperatorConfigKey("scaling.event.interval")) .withDescription("Time interval to resend the identical event"); public static final ConfigOption FLINK_CLIENT_TIMEOUT = autoScalerConfig("flink.rest-client.timeout") .durationType() .defaultValue(Duration.ofSeconds(10)) - .withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout")) .withDescription("The timeout for waiting the flink rest client to return."); + +/** Migrate config keys still prefixed with the old Kubernetes operator prefix. */ +public static Configuration migrateOldConfigKeys(Configuration config) { +Preconditions.checkNotNull(config); +config = new Configuration(config); + +Set toBeMigrated = new HashSet<>(); +for (String key : config.keySet()) { +if (key.startsWith(LEGACY_CONF_PREFIX)) { +toBeMigrated.add(key); +} +} +for (String key : toBeMigrated) { +String migratedKey = key.substring(LEGACY_CONF_PREFIX.length()); +boolean keyDoesNotExist = config.getString(migratedKey, null) == null; +if (keyDoesNotExist) { +String migratedValue = Preconditions.checkNotNull(config.getString(key, null)); +config.setString(migratedKey, migratedValue); +} +config.removeKey(key); +} +return config; +} Review Comment: The O(n) complexity of both approaches is the same. I still prefer the first commit in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
mxm commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1423833917 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java: ## @@ -65,7 +66,9 @@ public KubernetesJobAutoScalerContext getJobAutoScalerContext() { } private KubernetesJobAutoScalerContext createJobAutoScalerContext() { -Configuration conf = new Configuration(getObserveConfig()); +Configuration conf = +new KeyMigratingConfiguration( +AutoScalerOptions.LEGACY_CONF_PREFIX, getObserveConfig()); Review Comment: Semantically, there is no difference. The reason I chose the subclass is that I didn't want to touch any other configs apart from the deprecated options. `Configuration` doesn't allow to access the configuration map directly. One has to extract the configuration as string which will runs through a conversion process. I just did a quick conversion of the code (see latest commit). I don't prefer this approach but let me know if you find the code more readable. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -46,107 +45,93 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { autoScalerConfig("enabled") .booleanType() .defaultValue(false) -.withFallbackKeys(oldOperatorConfigKey("enabled")) Review Comment: The core idea of this fix is to remove deprecated/fallback keys because they generate WARN messages in the logs for every key and every reconciliation loop. The keys are deprecated in the sense that they are not documented anymore but we can't remove them, at least not until much later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
1996fanrui commented on code in PR #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1423772255 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java: ## @@ -65,7 +66,9 @@ public KubernetesJobAutoScalerContext getJobAutoScalerContext() { } private KubernetesJobAutoScalerContext createJobAutoScalerContext() { -Configuration conf = new Configuration(getObserveConfig()); +Configuration conf = +new KeyMigratingConfiguration( +AutoScalerOptions.LEGACY_CONF_PREFIX, getObserveConfig()); Review Comment: How about traversing the `getObserveConfig()` and put all keys and values to a new Configuration? We can removing the prefix directly during traversing. If so, we don't need to introduce the `KeyMigratingConfiguration` class. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -46,107 +45,93 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { autoScalerConfig("enabled") .booleanType() .defaultValue(false) -.withFallbackKeys(oldOperatorConfigKey("enabled")) Review Comment: Do we need to remove it? I'm not sure should we keep it to let old users know it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]
mxm opened a new pull request, #729: URL: https://github.com/apache/flink-kubernetes-operator/pull/729 The previous PR in #725 made the legacy autoscaler config keys "fallback" keys to prevent logging a deprecation WARN message on every reconciliation loop in the operator. Turns out, fallback keys also log a warning. This change moves to migrating all config keys in the legacy "kubernetes.operator." before any autoscaler ConfigOptions are used. This ensures no warnings will be logged but the older keys can still be used. The new keys always have precedence over the old keys. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org