This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit c576c41742b0844c7506c37c456073a65d60ed09 Author: Rui Fan <1996fan...@gmail.com> AuthorDate: Wed Jan 31 17:09:23 2024 +0800 [FLINK-33636][autoscaler] Autoscaler Standalone supports the JdbcAutoScalerEventHandler --- .../autoscaler_standalone_configuration.html | 20 +++-- .../standalone/AutoscalerEventHandlerFactory.java | 88 ++++++++++++++++++++++ .../standalone/AutoscalerStateStoreFactory.java | 17 ++--- .../standalone/StandaloneAutoscalerEntrypoint.java | 5 +- .../config/AutoscalerStandaloneOptions.java | 42 +++++++---- ...java => AutoscalerEventHandlerFactoryTest.java} | 53 ++++++------- .../AutoscalerStateStoreFactoryTest.java | 6 +- 7 files changed, 168 insertions(+), 63 deletions(-) diff --git a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html index 4cfdc52f..7d724b7f 100644 --- a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html +++ b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html @@ -20,6 +20,12 @@ <td>Integer</td> <td>The parallelism of autoscaler standalone control loop.</td> </tr> + <tr> + <td><h5>autoscaler.standalone.event-handler.type</h5></td> + <td style="word-wrap: break-word;">LOGGING</td> + <td><p>Enum</p></td> + <td>The autoscaler event handler type.<br /><br />Possible values:<ul><li>"LOGGING": The event handler based on logging.</li><li>"JDBC": The event handler which persists all events in JDBC related database. It's recommended in production.</li></ul></td> + </tr> <tr> <td><h5>autoscaler.standalone.fetcher.flink-cluster.host</h5></td> <td style="word-wrap: break-word;">"localhost"</td> @@ -33,22 +39,22 @@ <td>The port of flink cluster when the flink-cluster fetcher is used.</td> </tr> <tr> - <td><h5>autoscaler.standalone.state-store.jdbc.password-env-variable</h5></td> - <td style="word-wrap: break-word;">"STATE_STORE_JDBC_PWD"</td> + <td><h5>autoscaler.standalone.jdbc.password-env-variable</h5></td> + <td style="word-wrap: break-word;">"JDBC_PWD"</td> <td>String</td> - <td>The environment variable name of jdbc state store password when <code class="highlighter-rouge">autoscaler.standalone.state-store.type</code> has been set to <code class="highlighter-rouge">JDBC</code>. In general, the environment variable name doesn't need to be changed. Users need to export the password using this environment variable.</td> + <td>The environment variable name of jdbc password when <code class="highlighter-rouge">autoscaler.standalone.state-store.type</code> or <code class="highlighter-rouge">autoscaler.standalone.event-handler.type</code> has been set to <code class="highlighter-rouge">JDBC</code>. In general, the environment variable name doesn't need to be changed. Users need to export the password using this environment variable.</td> </tr> <tr> - <td><h5>autoscaler.standalone.state-store.jdbc.url</h5></td> + <td><h5>autoscaler.standalone.jdbc.url</h5></td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> - <td>The jdbc url of jdbc state store when <code class="highlighter-rouge">autoscaler.standalone.state-store.type</code> has been set to <code class="highlighter-rouge">JDBC</code>, such as: <code class="highlighter-rouge">jdbc:mysql://localhost:3306/flink_autoscaler</code>.<br />This option is required when using JDBC state store.</td> + <td>The jdbc url when <code class="highlighter-rouge">autoscaler.standalone.state-store.type</code> or <code class="highlighter-rouge">autoscaler.standalone.event-handler.type</code> has been set to <code class="highlighter-rouge">JDBC</code>, such as: <code class="highlighter-rouge">jdbc:mysql://localhost:3306/flink_autoscaler</code>.<br />This option is required when using JDBC state store or JDBC event handler.</td> </tr> <tr> - <td><h5>autoscaler.standalone.state-store.jdbc.username</h5></td> + <td><h5>autoscaler.standalone.jdbc.username</h5></td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> - <td>The jdbc username of jdbc state store when <code class="highlighter-rouge">autoscaler.standalone.state-store.type</code> has been set to <code class="highlighter-rouge">JDBC</code>.</td> + <td>The jdbc username when <code class="highlighter-rouge">autoscaler.standalone.state-store.type</code> or <code class="highlighter-rouge">autoscaler.standalone.event-handler.type</code> has been set to <code class="highlighter-rouge">JDBC</code>.</td> </tr> <tr> <td><h5>autoscaler.standalone.state-store.type</h5></td> diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java new file mode 100644 index 00000000..cc69c453 --- /dev/null +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.standalone; + +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.autoscaler.event.LoggingEventHandler; +import org.apache.flink.autoscaler.jdbc.event.JdbcAutoScalerEventHandler; +import org.apache.flink.autoscaler.jdbc.event.JdbcEventInteractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DescribedEnum; +import org.apache.flink.configuration.description.InlineElement; + +import java.sql.DriverManager; + +import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.JDBC; +import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.LOGGING; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.EVENT_HANDLER_TYPE; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME; +import static org.apache.flink.configuration.description.TextElement.text; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** The factory of {@link AutoScalerEventHandler}. */ +public class AutoscalerEventHandlerFactory { + + /** Out-of-box event handler type. */ + public enum EventHandlerType implements DescribedEnum { + LOGGING("The event handler based on logging."), + JDBC( + "The event handler which persists all events in JDBC related database. It's recommended in production."); + + private final InlineElement description; + + EventHandlerType(String description) { + this.description = text(description); + } + + @Override + public InlineElement getDescription() { + return description; + } + } + + public static <KEY, Context extends JobAutoScalerContext<KEY>> + AutoScalerEventHandler<KEY, Context> create(Configuration conf) throws Exception { + var eventHandlerType = conf.get(EVENT_HANDLER_TYPE); + switch (eventHandlerType) { + case LOGGING: + return new LoggingEventHandler<>(); + case JDBC: + return createJdbcEventHandler(conf); + default: + throw new IllegalArgumentException( + String.format( + "Unknown event handler type : %s. Optional event handlers are: %s and %s.", + eventHandlerType, LOGGING, JDBC)); + } + } + + private static <KEY, Context extends JobAutoScalerContext<KEY>> + AutoScalerEventHandler<KEY, Context> createJdbcEventHandler(Configuration conf) + throws Exception { + final var jdbcUrl = conf.get(JDBC_URL); + checkArgument(jdbcUrl != null, "%s is required for jdbc event handler.", JDBC_URL.key()); + var user = conf.get(JDBC_USERNAME); + var password = System.getenv().get(conf.get(JDBC_PASSWORD_ENV_VARIABLE)); + + var conn = DriverManager.getConnection(jdbcUrl, user, password); + return new JdbcAutoScalerEventHandler<>(new JdbcEventInteractor(conn)); + } +} diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java index 4b0ed170..d5a62bdf 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java @@ -31,9 +31,9 @@ import java.sql.DriverManager; import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC; import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY; -import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_PASSWORD_ENV_VARIABLE; -import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_URL; -import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_USERNAME; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME; import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE; import static org.apache.flink.configuration.description.TextElement.text; import static org.apache.flink.util.Preconditions.checkArgument; @@ -79,13 +79,10 @@ public class AutoscalerStateStoreFactory { private static <KEY, Context extends JobAutoScalerContext<KEY>> AutoScalerStateStore<KEY, Context> createJdbcStateStore(Configuration conf) throws Exception { - final var jdbcUrl = conf.get(STATE_STORE_JDBC_URL); - checkArgument( - jdbcUrl != null, - "%s is required for jdbc state store.", - STATE_STORE_JDBC_URL.key()); - var user = conf.get(STATE_STORE_JDBC_USERNAME); - var password = System.getenv().get(conf.get(STATE_STORE_JDBC_PASSWORD_ENV_VARIABLE)); + final var jdbcUrl = conf.get(JDBC_URL); + checkArgument(jdbcUrl != null, "%s is required for jdbc state store.", JDBC_URL.key()); + var user = conf.get(JDBC_USERNAME); + var password = System.getenv().get(conf.get(JDBC_PASSWORD_ENV_VARIABLE)); var conn = DriverManager.getConnection(jdbcUrl, user, password); return new JdbcAutoScalerStateStore<>(new JdbcStateStore(new JdbcStateInteractor(conn))); diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java index 35eba18c..6d8a495e 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java @@ -26,7 +26,6 @@ import org.apache.flink.autoscaler.RestApiMetricsCollector; import org.apache.flink.autoscaler.ScalingExecutor; import org.apache.flink.autoscaler.ScalingMetricEvaluator; import org.apache.flink.autoscaler.event.AutoScalerEventHandler; -import org.apache.flink.autoscaler.event.LoggingEventHandler; import org.apache.flink.autoscaler.standalone.flinkcluster.FlinkClusterJobListFetcher; import org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer; import org.apache.flink.autoscaler.state.AutoScalerStateStore; @@ -53,10 +52,12 @@ public class StandaloneAutoscalerEntrypoint { LOG.info("The standalone autoscaler is started, configuration: {}", conf); // Initialize JobListFetcher and JobAutoScaler. - var eventHandler = new LoggingEventHandler<KEY, Context>(); JobListFetcher<KEY, Context> jobListFetcher = createJobListFetcher(conf); AutoScalerStateStore<KEY, Context> stateStore = AutoscalerStateStoreFactory.create(conf); + AutoScalerEventHandler<KEY, Context> eventHandler = + AutoscalerEventHandlerFactory.create(conf); + var autoScaler = createJobAutoscaler(eventHandler, stateStore); var autoscalerExecutor = diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java index 510042fe..edc30549 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java @@ -17,6 +17,7 @@ package org.apache.flink.autoscaler.standalone.config; +import org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType; import org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -24,7 +25,6 @@ import org.apache.flink.configuration.description.Description; import java.time.Duration; -import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC; import static org.apache.flink.configuration.description.TextElement.code; /** Config options related to the autoscaler standalone module. */ @@ -71,42 +71,54 @@ public class AutoscalerStandaloneOptions { .defaultValue(StateStoreType.MEMORY) .withDescription("The autoscaler state store type."); - public static final ConfigOption<String> STATE_STORE_JDBC_URL = - autoscalerStandaloneConfig("state-store.jdbc.url") + public static final ConfigOption<EventHandlerType> EVENT_HANDLER_TYPE = + autoscalerStandaloneConfig("event-handler.type") + .enumType(EventHandlerType.class) + .defaultValue(EventHandlerType.LOGGING) + .withDescription("The autoscaler event handler type."); + + public static final ConfigOption<String> JDBC_URL = + autoscalerStandaloneConfig("jdbc.url") .stringType() .noDefaultValue() .withDescription( Description.builder() .text( - "The jdbc url of jdbc state store when %s has been set to %s, such as: %s.", + "The jdbc url when %s or %s has been set to %s, such as: %s.", code(STATE_STORE_TYPE.key()), - code(JDBC.toString()), + code(EVENT_HANDLER_TYPE.key()), + code("JDBC"), code("jdbc:mysql://localhost:3306/flink_autoscaler")) .linebreak() - .text("This option is required when using JDBC state store.") + .text( + "This option is required when using JDBC state store or JDBC event handler.") .build()); - public static final ConfigOption<String> STATE_STORE_JDBC_USERNAME = - autoscalerStandaloneConfig("state-store.jdbc.username") + public static final ConfigOption<String> JDBC_USERNAME = + autoscalerStandaloneConfig("jdbc.username") .stringType() .noDefaultValue() .withDescription( Description.builder() .text( - "The jdbc username of jdbc state store when %s has been set to %s.", - code(STATE_STORE_TYPE.key()), code(JDBC.toString())) + "The jdbc username when %s or %s has been set to %s.", + code(STATE_STORE_TYPE.key()), + code(EVENT_HANDLER_TYPE.key()), + code("JDBC")) .build()); - public static final ConfigOption<String> STATE_STORE_JDBC_PASSWORD_ENV_VARIABLE = - autoscalerStandaloneConfig("state-store.jdbc.password-env-variable") + public static final ConfigOption<String> JDBC_PASSWORD_ENV_VARIABLE = + autoscalerStandaloneConfig("jdbc.password-env-variable") .stringType() - .defaultValue("STATE_STORE_JDBC_PWD") + .defaultValue("JDBC_PWD") .withDescription( Description.builder() .text( - "The environment variable name of jdbc state store password when %s has been set to %s. " + "The environment variable name of jdbc password when %s or %s has been set to %s. " + "In general, the environment variable name doesn't need to be changed. Users need to " + "export the password using this environment variable.", - code(STATE_STORE_TYPE.key()), code(JDBC.toString())) + code(STATE_STORE_TYPE.key()), + code(EVENT_HANDLER_TYPE.key()), + code("JDBC")) .build()); } diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java similarity index 52% copy from flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java copy to flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java index 01f62d5c..4c76937b 100644 --- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java +++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java @@ -17,8 +17,8 @@ package org.apache.flink.autoscaler.standalone; -import org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore; -import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore; +import org.apache.flink.autoscaler.event.LoggingEventHandler; +import org.apache.flink.autoscaler.jdbc.event.JdbcAutoScalerEventHandler; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; @@ -26,53 +26,54 @@ import org.junit.jupiter.api.Test; import java.sql.DriverManager; import java.sql.SQLException; -import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC; -import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY; -import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_URL; -import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE; +import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.JDBC; +import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.LOGGING; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.EVENT_HANDLER_TYPE; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -class AutoscalerStateStoreFactoryTest { +/** Test for {@link AutoscalerEventHandlerFactory}. */ +class AutoscalerEventHandlerFactoryTest { @Test - void testCreateDefaultStateStore() throws Exception { - // Test for memory state store is created by default. - var stateStore = AutoscalerStateStoreFactory.create(new Configuration()); - assertThat(stateStore).isInstanceOf(InMemoryAutoScalerStateStore.class); + void testCreateDefaultEventHandler() throws Exception { + // Test for logging event handler is created by default. + var eventHandler = AutoscalerEventHandlerFactory.create(new Configuration()); + assertThat(eventHandler).isInstanceOf(LoggingEventHandler.class); } @Test - void testCreateInMemoryStateStore() throws Exception { - // Test for memory state store is created explicitly. + void testCreateInMemoryEventHandler() throws Exception { + // Test for logging event handler is created explicitly. final var conf = new Configuration(); - conf.set(STATE_STORE_TYPE, MEMORY); - var stateStore = AutoscalerStateStoreFactory.create(conf); - assertThat(stateStore).isInstanceOf(InMemoryAutoScalerStateStore.class); + conf.set(EVENT_HANDLER_TYPE, LOGGING); + var eventHandler = AutoscalerEventHandlerFactory.create(conf); + assertThat(eventHandler).isInstanceOf(LoggingEventHandler.class); } @Test - void testCreateJdbcStateStoreWithoutURL() { + void testCreateJdbcEventHandlerWithoutURL() { // Test for missing the jdbc url. final var conf = new Configuration(); - conf.set(STATE_STORE_TYPE, JDBC); - assertThatThrownBy(() -> AutoscalerStateStoreFactory.create(conf)) + conf.set(EVENT_HANDLER_TYPE, JDBC); + assertThatThrownBy(() -> AutoscalerEventHandlerFactory.create(conf)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("%s is required for jdbc state store.", STATE_STORE_JDBC_URL.key()); + .hasMessage("%s is required for jdbc event handler.", JDBC_URL.key()); } @Test - void testCreateJdbcStateStore() throws Exception { + void testCreateJdbcEventHandler() throws Exception { final var jdbcUrl = "jdbc:derby:memory:test"; DriverManager.getConnection(String.format("%s;create=true", jdbcUrl)).close(); - // Test for create JDBC State store. + // Test for create JDBC Event Handler. final var conf = new Configuration(); - conf.set(STATE_STORE_TYPE, JDBC); - conf.set(STATE_STORE_JDBC_URL, jdbcUrl); + conf.set(EVENT_HANDLER_TYPE, JDBC); + conf.set(JDBC_URL, jdbcUrl); - var stateStore = AutoscalerStateStoreFactory.create(conf); - assertThat(stateStore).isInstanceOf(JdbcAutoScalerStateStore.class); + var eventHandler = AutoscalerEventHandlerFactory.create(conf); + assertThat(eventHandler).isInstanceOf(JdbcAutoScalerEventHandler.class); try { DriverManager.getConnection(String.format("%s;shutdown=true", jdbcUrl)).close(); diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java index 01f62d5c..0939ea06 100644 --- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java +++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java @@ -28,7 +28,7 @@ import java.sql.SQLException; import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC; import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY; -import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_URL; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL; import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -58,7 +58,7 @@ class AutoscalerStateStoreFactoryTest { conf.set(STATE_STORE_TYPE, JDBC); assertThatThrownBy(() -> AutoscalerStateStoreFactory.create(conf)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("%s is required for jdbc state store.", STATE_STORE_JDBC_URL.key()); + .hasMessage("%s is required for jdbc state store.", JDBC_URL.key()); } @Test @@ -69,7 +69,7 @@ class AutoscalerStateStoreFactoryTest { // Test for create JDBC State store. final var conf = new Configuration(); conf.set(STATE_STORE_TYPE, JDBC); - conf.set(STATE_STORE_JDBC_URL, jdbcUrl); + conf.set(JDBC_URL, jdbcUrl); var stateStore = AutoscalerStateStoreFactory.create(conf); assertThat(stateStore).isInstanceOf(JdbcAutoScalerStateStore.class);