Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


mxm closed pull request #729: [FLINK-33770] Migrate legacy autoscaler config 
keys
URL: https://github.com/apache/flink-kubernetes-operator/pull/729


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


mxm commented on PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#issuecomment-1855734056

   While testing, I noticed that the abstraction in the PR doesn't fully hold 
because there is code in the reconciler which access autoscaler options and 
uses the observe config instead of the migrated config, e.g. 
[here](https://github.com/apache/flink-kubernetes-operator/blob/2136c7a3189753c45e167920bd509a687218a57f/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L190).
 I could fix this but I fear that the abstraction might become leaky again in 
the future.
   
   I'm closing this PR because I want to avoid adding too much complexity to 
the config key handling. 
   
   The current approach to use fallback instead of deprecated keys is fine. 
Fallback keys are logged at INFO, so we can simply set the logging for 
`Configuration` to WARN. See: 
https://github.com/apache/flink-kubernetes-operator/pull/734


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


mxm commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1426558636


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
-
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
 public static final ConfigOption SCALING_EVENT_INTERVAL =
 autoScalerConfig("scaling.event.interval")
 .durationType()
 .defaultValue(Duration.ofMinutes(30))
-
.withFallbackKeys(oldOperatorConfigKey("scaling.event.interval"))
 .withDescription("Time interval to resend the identical 
event");
 
 public static final ConfigOption FLINK_CLIENT_TIMEOUT =
 autoScalerConfig("flink.rest-client.timeout")
 .durationType()
 .defaultValue(Duration.ofSeconds(10))
-
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
 .withDescription("The timeout for waiting the flink rest 
client to return.");
+
+/** Migrate config keys still prefixed with the old Kubernetes operator 
prefix. */
+public static Configuration migrateOldConfigKeys(Configuration config) {
+Preconditions.checkNotNull(config);
+config = new Configuration(config);
+
+Set toBeMigrated = new HashSet<>();
+for (String key : config.keySet()) {
+if (key.startsWith(LEGACY_CONF_PREFIX)) {
+toBeMigrated.add(key);
+}
+}
+for (String key : toBeMigrated) {
+String migratedKey = key.substring(LEGACY_CONF_PREFIX.length());
+boolean keyDoesNotExist = config.getString(migratedKey, null) == 
null;
+if (keyDoesNotExist) {
+String migratedValue = 
Preconditions.checkNotNull(config.getString(key, null));
+config.setString(migratedKey, migratedValue);
+}
+config.removeKey(key);

Review Comment:
   I changed the PR to not delete them as you suggested. The reason is that 
while testing I found that operating on the Configuration API is dangerous 
(which is why the first version of this PR did intentionally operate directly 
on the internal Map). When you delete a key via `Configuration#removeKey` which 
is a prefix of another key, it will delete both.
   
   We won't get warnings because the deprecated config keys have been removed 
in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


mxm commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1426558636


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
-
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
 public static final ConfigOption SCALING_EVENT_INTERVAL =
 autoScalerConfig("scaling.event.interval")
 .durationType()
 .defaultValue(Duration.ofMinutes(30))
-
.withFallbackKeys(oldOperatorConfigKey("scaling.event.interval"))
 .withDescription("Time interval to resend the identical 
event");
 
 public static final ConfigOption FLINK_CLIENT_TIMEOUT =
 autoScalerConfig("flink.rest-client.timeout")
 .durationType()
 .defaultValue(Duration.ofSeconds(10))
-
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
 .withDescription("The timeout for waiting the flink rest 
client to return.");
+
+/** Migrate config keys still prefixed with the old Kubernetes operator 
prefix. */
+public static Configuration migrateOldConfigKeys(Configuration config) {
+Preconditions.checkNotNull(config);
+config = new Configuration(config);
+
+Set toBeMigrated = new HashSet<>();
+for (String key : config.keySet()) {
+if (key.startsWith(LEGACY_CONF_PREFIX)) {
+toBeMigrated.add(key);
+}
+}
+for (String key : toBeMigrated) {
+String migratedKey = key.substring(LEGACY_CONF_PREFIX.length());
+boolean keyDoesNotExist = config.getString(migratedKey, null) == 
null;
+if (keyDoesNotExist) {
+String migratedValue = 
Preconditions.checkNotNull(config.getString(key, null));
+config.setString(migratedKey, migratedValue);
+}
+config.removeKey(key);

Review Comment:
   I changed the PR to not delete them as you suggested. The reason is that 
while testing I found that operating on the Configuration API is dangerous 
(which is why the first version of this PR did intentionally operated directly 
on the internal Map). When you delete a key via `Configuration#removeKey` which 
is a prefix of another key, it will delete both.
   
   We won't get warnings because the deprecated config keys have been removed 
in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


gyfora commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1425609941


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
-
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
 public static final ConfigOption SCALING_EVENT_INTERVAL =
 autoScalerConfig("scaling.event.interval")
 .durationType()
 .defaultValue(Duration.ofMinutes(30))
-
.withFallbackKeys(oldOperatorConfigKey("scaling.event.interval"))
 .withDescription("Time interval to resend the identical 
event");
 
 public static final ConfigOption FLINK_CLIENT_TIMEOUT =
 autoScalerConfig("flink.rest-client.timeout")
 .durationType()
 .defaultValue(Duration.ofSeconds(10))
-
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
 .withDescription("The timeout for waiting the flink rest 
client to return.");
+
+/** Migrate config keys still prefixed with the old Kubernetes operator 
prefix. */
+public static Configuration migrateOldConfigKeys(Configuration config) {
+Preconditions.checkNotNull(config);
+config = new Configuration(config);
+
+Set toBeMigrated = new HashSet<>();
+for (String key : config.keySet()) {
+if (key.startsWith(LEGACY_CONF_PREFIX)) {
+toBeMigrated.add(key);
+}
+}
+for (String key : toBeMigrated) {
+String migratedKey = key.substring(LEGACY_CONF_PREFIX.length());
+boolean keyDoesNotExist = config.getString(migratedKey, null) == 
null;
+if (keyDoesNotExist) {
+String migratedValue = 
Preconditions.checkNotNull(config.getString(key, null));
+config.setString(migratedKey, migratedValue);
+}
+config.removeKey(key);

Review Comment:
   Actually I am not sure whether we will get the warn logs if we don't delete 
it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


1996fanrui commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424811105


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -247,21 +221,36 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
-
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
 public static final ConfigOption SCALING_EVENT_INTERVAL =
 autoScalerConfig("scaling.event.interval")
 .durationType()
 .defaultValue(Duration.ofMinutes(30))
-
.withFallbackKeys(oldOperatorConfigKey("scaling.event.interval"))
 .withDescription("Time interval to resend the identical 
event");
 
 public static final ConfigOption FLINK_CLIENT_TIMEOUT =
 autoScalerConfig("flink.rest-client.timeout")
 .durationType()
 .defaultValue(Duration.ofSeconds(10))
-
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
 .withDescription("The timeout for waiting the flink rest 
client to return.");
+
+/** Migrate config keys still prefixed with the old Kubernetes operator 
prefix. */
+public static Configuration migrateOldConfigKeys(Configuration config) {
+Preconditions.checkNotNull(config);
+Set allKeys = config.keySet();
+config = new Configuration(config);
+for (String key : allKeys) {
+if (key.startsWith(LEGACY_CONF_PREFIX)) {
+String migratedKey = 
key.substring(LEGACY_CONF_PREFIX.length());
+if (!config.containsKey(migratedKey)) {
+String migratedValue = config.getString(key, null);

Review Comment:
   Just a mark for removing the `@Depreated` for `getString(String key, String 
defaultValue)`, please ignore it for code review.
   
   Here is a 
[thread](https://lists.apache.org/thread/zzsf7glfcdjcjm1hfo1xdwc6jp37nb3m) to 
discuss removing `@Depreated` for `public String getString(String key, String 
defaultValue)` of `Configuration`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


1996fanrui commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424811105


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -247,21 +221,36 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
-
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
 public static final ConfigOption SCALING_EVENT_INTERVAL =
 autoScalerConfig("scaling.event.interval")
 .durationType()
 .defaultValue(Duration.ofMinutes(30))
-
.withFallbackKeys(oldOperatorConfigKey("scaling.event.interval"))
 .withDescription("Time interval to resend the identical 
event");
 
 public static final ConfigOption FLINK_CLIENT_TIMEOUT =
 autoScalerConfig("flink.rest-client.timeout")
 .durationType()
 .defaultValue(Duration.ofSeconds(10))
-
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
 .withDescription("The timeout for waiting the flink rest 
client to return.");
+
+/** Migrate config keys still prefixed with the old Kubernetes operator 
prefix. */
+public static Configuration migrateOldConfigKeys(Configuration config) {
+Preconditions.checkNotNull(config);
+Set allKeys = config.keySet();
+config = new Configuration(config);
+for (String key : allKeys) {
+if (key.startsWith(LEGACY_CONF_PREFIX)) {
+String migratedKey = 
key.substring(LEGACY_CONF_PREFIX.length());
+if (!config.containsKey(migratedKey)) {
+String migratedValue = config.getString(key, null);

Review Comment:
   Just a mark for removing the `@Depreated` for `getString(String key, String 
defaultValue)`, please ignore it for code review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


mxm commented on PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#issuecomment-1852520779

   Folks, any more comments? Thanks for your suggestions regarding the 
implementation. I had a good three attempts and I think it looks good now. The 
source of quirkiness is really `Configuration` which has some odd design 
choices.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


mxm commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424363838


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
-
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
 public static final ConfigOption SCALING_EVENT_INTERVAL =
 autoScalerConfig("scaling.event.interval")
 .durationType()
 .defaultValue(Duration.ofMinutes(30))
-
.withFallbackKeys(oldOperatorConfigKey("scaling.event.interval"))
 .withDescription("Time interval to resend the identical 
event");
 
 public static final ConfigOption FLINK_CLIENT_TIMEOUT =
 autoScalerConfig("flink.rest-client.timeout")
 .durationType()
 .defaultValue(Duration.ofSeconds(10))
-
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
 .withDescription("The timeout for waiting the flink rest 
client to return.");
+
+/** Migrate config keys still prefixed with the old Kubernetes operator 
prefix. */
+public static Configuration migrateOldConfigKeys(Configuration config) {
+Preconditions.checkNotNull(config);
+config = new Configuration(config);
+
+Set toBeMigrated = new HashSet<>();
+for (String key : config.keySet()) {
+if (key.startsWith(LEGACY_CONF_PREFIX)) {
+toBeMigrated.add(key);
+}
+}
+for (String key : toBeMigrated) {
+String migratedKey = key.substring(LEGACY_CONF_PREFIX.length());
+boolean keyDoesNotExist = config.getString(migratedKey, null) == 
null;
+if (keyDoesNotExist) {
+String migratedValue = 
Preconditions.checkNotNull(config.getString(key, null));
+config.setString(migratedKey, migratedValue);
+}
+config.removeKey(key);
+}
+return config;
+}

Review Comment:
   There are a lot of copy steps with this approach because `toMap()` converts 
every single value before it inserts it into the new map. `new 
Configuration(config)` directly passes on the internal map. 
   
   With that in mind, I tried incorporating your code suggestions, i.e. do a 
single pass.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


mxm commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424306855


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
-
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
 public static final ConfigOption SCALING_EVENT_INTERVAL =
 autoScalerConfig("scaling.event.interval")
 .durationType()
 .defaultValue(Duration.ofMinutes(30))
-
.withFallbackKeys(oldOperatorConfigKey("scaling.event.interval"))
 .withDescription("Time interval to resend the identical 
event");
 
 public static final ConfigOption FLINK_CLIENT_TIMEOUT =
 autoScalerConfig("flink.rest-client.timeout")
 .durationType()
 .defaultValue(Duration.ofSeconds(10))
-
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
 .withDescription("The timeout for waiting the flink rest 
client to return.");
+
+/** Migrate config keys still prefixed with the old Kubernetes operator 
prefix. */
+public static Configuration migrateOldConfigKeys(Configuration config) {
+Preconditions.checkNotNull(config);
+config = new Configuration(config);
+
+Set toBeMigrated = new HashSet<>();
+for (String key : config.keySet()) {
+if (key.startsWith(LEGACY_CONF_PREFIX)) {
+toBeMigrated.add(key);
+}
+}
+for (String key : toBeMigrated) {
+String migratedKey = key.substring(LEGACY_CONF_PREFIX.length());
+boolean keyDoesNotExist = config.getString(migratedKey, null) == 
null;
+if (keyDoesNotExist) {
+String migratedValue = 
Preconditions.checkNotNull(config.getString(key, null));
+config.setString(migratedKey, migratedValue);
+}
+config.removeKey(key);

Review Comment:
   I believe we should remove the key for clarify since it will be superseded 
by the non-legacy key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


gyfora commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424117478


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
-
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
 public static final ConfigOption SCALING_EVENT_INTERVAL =
 autoScalerConfig("scaling.event.interval")
 .durationType()
 .defaultValue(Duration.ofMinutes(30))
-
.withFallbackKeys(oldOperatorConfigKey("scaling.event.interval"))
 .withDescription("Time interval to resend the identical 
event");
 
 public static final ConfigOption FLINK_CLIENT_TIMEOUT =
 autoScalerConfig("flink.rest-client.timeout")
 .durationType()
 .defaultValue(Duration.ofSeconds(10))
-
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
 .withDescription("The timeout for waiting the flink rest 
client to return.");
+
+/** Migrate config keys still prefixed with the old Kubernetes operator 
prefix. */
+public static Configuration migrateOldConfigKeys(Configuration config) {
+Preconditions.checkNotNull(config);
+config = new Configuration(config);
+
+Set toBeMigrated = new HashSet<>();
+for (String key : config.keySet()) {
+if (key.startsWith(LEGACY_CONF_PREFIX)) {
+toBeMigrated.add(key);
+}
+}
+for (String key : toBeMigrated) {
+String migratedKey = key.substring(LEGACY_CONF_PREFIX.length());
+boolean keyDoesNotExist = config.getString(migratedKey, null) == 
null;
+if (keyDoesNotExist) {
+String migratedValue = 
Preconditions.checkNotNull(config.getString(key, null));
+config.setString(migratedKey, migratedValue);
+}
+config.removeKey(key);

Review Comment:
   We don't really need to remove the old key if it's not marked as 
deprecated/fallback



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


gyfora commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424115980


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
-
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
 public static final ConfigOption SCALING_EVENT_INTERVAL =
 autoScalerConfig("scaling.event.interval")
 .durationType()
 .defaultValue(Duration.ofMinutes(30))
-
.withFallbackKeys(oldOperatorConfigKey("scaling.event.interval"))
 .withDescription("Time interval to resend the identical 
event");
 
 public static final ConfigOption FLINK_CLIENT_TIMEOUT =
 autoScalerConfig("flink.rest-client.timeout")
 .durationType()
 .defaultValue(Duration.ofSeconds(10))
-
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
 .withDescription("The timeout for waiting the flink rest 
client to return.");
+
+/** Migrate config keys still prefixed with the old Kubernetes operator 
prefix. */
+public static Configuration migrateOldConfigKeys(Configuration config) {
+Preconditions.checkNotNull(config);
+config = new Configuration(config);
+
+Set toBeMigrated = new HashSet<>();
+for (String key : config.keySet()) {
+if (key.startsWith(LEGACY_CONF_PREFIX)) {
+toBeMigrated.add(key);
+}
+}
+for (String key : toBeMigrated) {
+String migratedKey = key.substring(LEGACY_CONF_PREFIX.length());
+boolean keyDoesNotExist = config.getString(migratedKey, null) == 
null;
+if (keyDoesNotExist) {

Review Comment:
   Can be simplified to `if(!config.containsKey(migratedKey))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


mxm commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424044305


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -46,107 +45,93 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 autoScalerConfig("enabled")
 .booleanType()
 .defaultValue(false)
-.withFallbackKeys(oldOperatorConfigKey("enabled"))

Review Comment:
   Yes, after migration the fallback keys are not needed anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


mxm commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1424043535


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -247,21 +222,41 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
-
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 
 public static final ConfigOption SCALING_EVENT_INTERVAL =
 autoScalerConfig("scaling.event.interval")
 .durationType()
 .defaultValue(Duration.ofMinutes(30))
-
.withFallbackKeys(oldOperatorConfigKey("scaling.event.interval"))
 .withDescription("Time interval to resend the identical 
event");
 
 public static final ConfigOption FLINK_CLIENT_TIMEOUT =
 autoScalerConfig("flink.rest-client.timeout")
 .durationType()
 .defaultValue(Duration.ofSeconds(10))
-
.withFallbackKeys(oldOperatorConfigKey("flink.rest-client.timeout"))
 .withDescription("The timeout for waiting the flink rest 
client to return.");
+
+/** Migrate config keys still prefixed with the old Kubernetes operator 
prefix. */
+public static Configuration migrateOldConfigKeys(Configuration config) {
+Preconditions.checkNotNull(config);
+config = new Configuration(config);
+
+Set toBeMigrated = new HashSet<>();
+for (String key : config.keySet()) {
+if (key.startsWith(LEGACY_CONF_PREFIX)) {
+toBeMigrated.add(key);
+}
+}
+for (String key : toBeMigrated) {
+String migratedKey = key.substring(LEGACY_CONF_PREFIX.length());
+boolean keyDoesNotExist = config.getString(migratedKey, null) == 
null;
+if (keyDoesNotExist) {
+String migratedValue = 
Preconditions.checkNotNull(config.getString(key, null));
+config.setString(migratedKey, migratedValue);
+}
+config.removeKey(key);
+}
+return config;
+}

Review Comment:
   The O(n) complexity of both approaches is the same. I still prefer the first 
commit in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


mxm commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1423833917


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##
@@ -65,7 +66,9 @@ public KubernetesJobAutoScalerContext 
getJobAutoScalerContext() {
 }
 
 private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
-Configuration conf = new Configuration(getObserveConfig());
+Configuration conf =
+new KeyMigratingConfiguration(
+AutoScalerOptions.LEGACY_CONF_PREFIX, 
getObserveConfig());

Review Comment:
   Semantically, there is no difference. The reason I chose the subclass is 
that I didn't want to touch any other configs apart from the deprecated 
options. `Configuration` doesn't allow to access the configuration map 
directly. One has to extract the configuration as string which will runs 
through a conversion process.
   
   I just did a quick conversion of the code (see latest commit). I don't 
prefer this approach but let me know if you find the code more readable.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -46,107 +45,93 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 autoScalerConfig("enabled")
 .booleanType()
 .defaultValue(false)
-.withFallbackKeys(oldOperatorConfigKey("enabled"))

Review Comment:
   The core idea of this fix is to remove deprecated/fallback keys because they 
generate WARN messages in the logs for every key and every reconciliation loop. 
The keys are deprecated in the sense that they are not documented anymore but 
we can't remove them, at least not until much later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


1996fanrui commented on code in PR #729:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/729#discussion_r1423772255


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##
@@ -65,7 +66,9 @@ public KubernetesJobAutoScalerContext 
getJobAutoScalerContext() {
 }
 
 private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
-Configuration conf = new Configuration(getObserveConfig());
+Configuration conf =
+new KeyMigratingConfiguration(
+AutoScalerOptions.LEGACY_CONF_PREFIX, 
getObserveConfig());

Review Comment:
   How about traversing the `getObserveConfig()` and put all keys and values to 
a new Configuration? We can removing the prefix directly during traversing.
   
   If so, we don't need to introduce the `KeyMigratingConfiguration` class.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -46,107 +45,93 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 autoScalerConfig("enabled")
 .booleanType()
 .defaultValue(false)
-.withFallbackKeys(oldOperatorConfigKey("enabled"))

Review Comment:
   Do we need to remove it? I'm not sure should we keep it to let old users 
know it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33770] Migrate legacy autoscaler config keys [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


mxm opened a new pull request, #729:
URL: https://github.com/apache/flink-kubernetes-operator/pull/729

   The previous PR in #725 made the legacy autoscaler config keys "fallback" 
keys to prevent logging a deprecation WARN message on every reconciliation loop 
in the operator. Turns out, fallback keys also log a warning.
   
   This change moves to migrating all config keys in the legacy 
"kubernetes.operator." before any autoscaler ConfigOptions are used. This 
ensures no warnings will be logged but the older keys can still be used.  The 
new keys always have precedence over the old keys.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org