[GitHub] [flink] rkhachatryan commented on a diff in pull request #20523: [WIP][FLINK-26372][runtime][state] Allow to configure Changelog Storage per program
rkhachatryan commented on code in PR #20523: URL: https://github.com/apache/flink/pull/20523#discussion_r942373536 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java: ## @@ -634,14 +634,19 @@ public void writeUserArtifactEntriesToConfiguration() { } } -public void setChangelogStateBackendEnabled(TernaryBoolean changelogStateBackendEnabled) { +public void setChangelogStateBackendEnabled( +TernaryBoolean changelogStateBackendEnabled, Configuration changelogConfiguration) { if (changelogStateBackendEnabled == null || TernaryBoolean.UNDEFINED.equals(changelogStateBackendEnabled)) { return; } this.jobConfiguration.setBoolean( StateChangelogOptionsInternal.ENABLE_CHANGE_LOG_FOR_APPLICATION, changelogStateBackendEnabled.getAsBoolean()); +if (changelogStateBackendEnabled.getOrDefault(false)) { +StateChangelogOptionsInternal.putConfiguration( +jobConfiguration, changelogConfiguration); +} Review Comment: Here, changelog configuration is passed from JM to TM inside the `jobGraph.jobConfiguration`. It is then merged on TM with its "local" TM configuration. It's not strongly typed (something like `ChangelogConfig`) because different implementations might have different parameters. And it's not a serialized factory because passing a string map seems safer and easier. It's added to `jobConfiguration` as serialized value under a single key (rather than merged) because semantically they are different. `jobConfiguration` is some internal Flink config, while changelog configuration contains some user-facing keys; so they might clash. @zentol WDYT about this approach? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #20523: [WIP][FLINK-26372][runtime][state] Allow to configure Changelog Storage per program
rkhachatryan commented on code in PR #20523: URL: https://github.com/apache/flink/pull/20523#discussion_r942373004 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java: ## @@ -634,14 +634,19 @@ public void writeUserArtifactEntriesToConfiguration() { } } -public void setChangelogStateBackendEnabled(TernaryBoolean changelogStateBackendEnabled) { +public void setChangelogStateBackendEnabled( +TernaryBoolean changelogStateBackendEnabled, Configuration changelogConfiguration) { if (changelogStateBackendEnabled == null || TernaryBoolean.UNDEFINED.equals(changelogStateBackendEnabled)) { return; } this.jobConfiguration.setBoolean( StateChangelogOptionsInternal.ENABLE_CHANGE_LOG_FOR_APPLICATION, changelogStateBackendEnabled.getAsBoolean()); +if (changelogStateBackendEnabled.getOrDefault(false)) { +StateChangelogOptionsInternal.putConfiguration( +jobConfiguration, changelogConfiguration); +} Review Comment: Here, changelog configuration is passed from JM to TM inside the `jobGraph.jobConfiguration`. It is then merged on TM with its "local" TM configuration. It's not strongly typed (something like `ChangelogConfig`) because different implementations might have different parameters. And it's not a serialized factory because passing a string map seems safer and easier. It's added to `jobConfiguration` as serialized value under a single key (rather than merged) because semantically they are different. `jobConfiguration` is some internal Flink config, while changelog configuration contains some user-facing keys; so they might clash. @zentol WDYT about this approach? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org