This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit c576c41742b0844c7506c37c456073a65d60ed09
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Wed Jan 31 17:09:23 2024 +0800

    [FLINK-33636][autoscaler] Autoscaler Standalone supports the 
JdbcAutoScalerEventHandler
---
 .../autoscaler_standalone_configuration.html       | 20 +++--
 .../standalone/AutoscalerEventHandlerFactory.java  | 88 ++++++++++++++++++++++
 .../standalone/AutoscalerStateStoreFactory.java    | 17 ++---
 .../standalone/StandaloneAutoscalerEntrypoint.java |  5 +-
 .../config/AutoscalerStandaloneOptions.java        | 42 +++++++----
 ...java => AutoscalerEventHandlerFactoryTest.java} | 53 ++++++-------
 .../AutoscalerStateStoreFactoryTest.java           |  6 +-
 7 files changed, 168 insertions(+), 63 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html 
b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
index 4cfdc52f..7d724b7f 100644
--- a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
+++ b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
@@ -20,6 +20,12 @@
             <td>Integer</td>
             <td>The parallelism of autoscaler standalone control loop.</td>
         </tr>
+        <tr>
+            <td><h5>autoscaler.standalone.event-handler.type</h5></td>
+            <td style="word-wrap: break-word;">LOGGING</td>
+            <td><p>Enum</p></td>
+            <td>The autoscaler event handler type.<br /><br />Possible 
values:<ul><li>"LOGGING": The event handler based on logging.</li><li>"JDBC": 
The event handler which persists all events in JDBC related database. It's 
recommended in production.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>autoscaler.standalone.fetcher.flink-cluster.host</h5></td>
             <td style="word-wrap: break-word;">"localhost"</td>
@@ -33,22 +39,22 @@
             <td>The port of flink cluster when the flink-cluster fetcher is 
used.</td>
         </tr>
         <tr>
-            
<td><h5>autoscaler.standalone.state-store.jdbc.password-env-variable</h5></td>
-            <td style="word-wrap: break-word;">"STATE_STORE_JDBC_PWD"</td>
+            <td><h5>autoscaler.standalone.jdbc.password-env-variable</h5></td>
+            <td style="word-wrap: break-word;">"JDBC_PWD"</td>
             <td>String</td>
-            <td>The environment variable name of jdbc state store password 
when <code 
class="highlighter-rouge">autoscaler.standalone.state-store.type</code> has 
been set to <code class="highlighter-rouge">JDBC</code>. In general, the 
environment variable name doesn't need to be changed. Users need to export the 
password using this environment variable.</td>
+            <td>The environment variable name of jdbc password when <code 
class="highlighter-rouge">autoscaler.standalone.state-store.type</code> or 
<code class="highlighter-rouge">autoscaler.standalone.event-handler.type</code> 
has been set to <code class="highlighter-rouge">JDBC</code>. In general, the 
environment variable name doesn't need to be changed. Users need to export the 
password using this environment variable.</td>
         </tr>
         <tr>
-            <td><h5>autoscaler.standalone.state-store.jdbc.url</h5></td>
+            <td><h5>autoscaler.standalone.jdbc.url</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>The jdbc url of jdbc state store when <code 
class="highlighter-rouge">autoscaler.standalone.state-store.type</code> has 
been set to <code class="highlighter-rouge">JDBC</code>, such as: <code 
class="highlighter-rouge">jdbc:mysql://localhost:3306/flink_autoscaler</code>.<br
 />This option is required when using JDBC state store.</td>
+            <td>The jdbc url when <code 
class="highlighter-rouge">autoscaler.standalone.state-store.type</code> or 
<code class="highlighter-rouge">autoscaler.standalone.event-handler.type</code> 
has been set to <code class="highlighter-rouge">JDBC</code>, such as: <code 
class="highlighter-rouge">jdbc:mysql://localhost:3306/flink_autoscaler</code>.<br
 />This option is required when using JDBC state store or JDBC event 
handler.</td>
         </tr>
         <tr>
-            <td><h5>autoscaler.standalone.state-store.jdbc.username</h5></td>
+            <td><h5>autoscaler.standalone.jdbc.username</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>The jdbc username of jdbc state store when <code 
class="highlighter-rouge">autoscaler.standalone.state-store.type</code> has 
been set to <code class="highlighter-rouge">JDBC</code>.</td>
+            <td>The jdbc username when <code 
class="highlighter-rouge">autoscaler.standalone.state-store.type</code> or 
<code class="highlighter-rouge">autoscaler.standalone.event-handler.type</code> 
has been set to <code class="highlighter-rouge">JDBC</code>.</td>
         </tr>
         <tr>
             <td><h5>autoscaler.standalone.state-store.type</h5></td>
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
new file mode 100644
index 00000000..cc69c453
--- /dev/null
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.event.LoggingEventHandler;
+import org.apache.flink.autoscaler.jdbc.event.JdbcAutoScalerEventHandler;
+import org.apache.flink.autoscaler.jdbc.event.JdbcEventInteractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import java.sql.DriverManager;
+
+import static 
org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.JDBC;
+import static 
org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.LOGGING;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.EVENT_HANDLER_TYPE;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The factory of {@link AutoScalerEventHandler}. */
+public class AutoscalerEventHandlerFactory {
+
+    /** Out-of-box event handler type. */
+    public enum EventHandlerType implements DescribedEnum {
+        LOGGING("The event handler based on logging."),
+        JDBC(
+                "The event handler which persists all events in JDBC related 
database. It's recommended in production.");
+
+        private final InlineElement description;
+
+        EventHandlerType(String description) {
+            this.description = text(description);
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>>
+            AutoScalerEventHandler<KEY, Context> create(Configuration conf) 
throws Exception {
+        var eventHandlerType = conf.get(EVENT_HANDLER_TYPE);
+        switch (eventHandlerType) {
+            case LOGGING:
+                return new LoggingEventHandler<>();
+            case JDBC:
+                return createJdbcEventHandler(conf);
+            default:
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Unknown event handler type : %s. Optional 
event handlers are: %s and %s.",
+                                eventHandlerType, LOGGING, JDBC));
+        }
+    }
+
+    private static <KEY, Context extends JobAutoScalerContext<KEY>>
+            AutoScalerEventHandler<KEY, Context> 
createJdbcEventHandler(Configuration conf)
+                    throws Exception {
+        final var jdbcUrl = conf.get(JDBC_URL);
+        checkArgument(jdbcUrl != null, "%s is required for jdbc event 
handler.", JDBC_URL.key());
+        var user = conf.get(JDBC_USERNAME);
+        var password = 
System.getenv().get(conf.get(JDBC_PASSWORD_ENV_VARIABLE));
+
+        var conn = DriverManager.getConnection(jdbcUrl, user, password);
+        return new JdbcAutoScalerEventHandler<>(new JdbcEventInteractor(conn));
+    }
+}
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
index 4b0ed170..d5a62bdf 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java
@@ -31,9 +31,9 @@ import java.sql.DriverManager;
 
 import static 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC;
 import static 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY;
-import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_PASSWORD_ENV_VARIABLE;
-import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_URL;
-import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_USERNAME;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME;
 import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE;
 import static org.apache.flink.configuration.description.TextElement.text;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -79,13 +79,10 @@ public class AutoscalerStateStoreFactory {
     private static <KEY, Context extends JobAutoScalerContext<KEY>>
             AutoScalerStateStore<KEY, Context> 
createJdbcStateStore(Configuration conf)
                     throws Exception {
-        final var jdbcUrl = conf.get(STATE_STORE_JDBC_URL);
-        checkArgument(
-                jdbcUrl != null,
-                "%s is required for jdbc state store.",
-                STATE_STORE_JDBC_URL.key());
-        var user = conf.get(STATE_STORE_JDBC_USERNAME);
-        var password = 
System.getenv().get(conf.get(STATE_STORE_JDBC_PASSWORD_ENV_VARIABLE));
+        final var jdbcUrl = conf.get(JDBC_URL);
+        checkArgument(jdbcUrl != null, "%s is required for jdbc state store.", 
JDBC_URL.key());
+        var user = conf.get(JDBC_USERNAME);
+        var password = 
System.getenv().get(conf.get(JDBC_PASSWORD_ENV_VARIABLE));
 
         var conn = DriverManager.getConnection(jdbcUrl, user, password);
         return new JdbcAutoScalerStateStore<>(new JdbcStateStore(new 
JdbcStateInteractor(conn)));
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
index 35eba18c..6d8a495e 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
@@ -26,7 +26,6 @@ import org.apache.flink.autoscaler.RestApiMetricsCollector;
 import org.apache.flink.autoscaler.ScalingExecutor;
 import org.apache.flink.autoscaler.ScalingMetricEvaluator;
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
-import org.apache.flink.autoscaler.event.LoggingEventHandler;
 import 
org.apache.flink.autoscaler.standalone.flinkcluster.FlinkClusterJobListFetcher;
 import 
org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
@@ -53,10 +52,12 @@ public class StandaloneAutoscalerEntrypoint {
         LOG.info("The standalone autoscaler is started, configuration: {}", 
conf);
 
         // Initialize JobListFetcher and JobAutoScaler.
-        var eventHandler = new LoggingEventHandler<KEY, Context>();
         JobListFetcher<KEY, Context> jobListFetcher = 
createJobListFetcher(conf);
 
         AutoScalerStateStore<KEY, Context> stateStore = 
AutoscalerStateStoreFactory.create(conf);
+        AutoScalerEventHandler<KEY, Context> eventHandler =
+                AutoscalerEventHandlerFactory.create(conf);
+
         var autoScaler = createJobAutoscaler(eventHandler, stateStore);
 
         var autoscalerExecutor =
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
index 510042fe..edc30549 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.autoscaler.standalone.config;
 
+import 
org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType;
 import 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
@@ -24,7 +25,6 @@ import org.apache.flink.configuration.description.Description;
 
 import java.time.Duration;
 
-import static 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC;
 import static org.apache.flink.configuration.description.TextElement.code;
 
 /** Config options related to the autoscaler standalone module. */
@@ -71,42 +71,54 @@ public class AutoscalerStandaloneOptions {
                     .defaultValue(StateStoreType.MEMORY)
                     .withDescription("The autoscaler state store type.");
 
-    public static final ConfigOption<String> STATE_STORE_JDBC_URL =
-            autoscalerStandaloneConfig("state-store.jdbc.url")
+    public static final ConfigOption<EventHandlerType> EVENT_HANDLER_TYPE =
+            autoscalerStandaloneConfig("event-handler.type")
+                    .enumType(EventHandlerType.class)
+                    .defaultValue(EventHandlerType.LOGGING)
+                    .withDescription("The autoscaler event handler type.");
+
+    public static final ConfigOption<String> JDBC_URL =
+            autoscalerStandaloneConfig("jdbc.url")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The jdbc url of jdbc state store 
when %s has been set to %s, such as: %s.",
+                                            "The jdbc url when %s or %s has 
been set to %s, such as: %s.",
                                             code(STATE_STORE_TYPE.key()),
-                                            code(JDBC.toString()),
+                                            code(EVENT_HANDLER_TYPE.key()),
+                                            code("JDBC"),
                                             
code("jdbc:mysql://localhost:3306/flink_autoscaler"))
                                     .linebreak()
-                                    .text("This option is required when using 
JDBC state store.")
+                                    .text(
+                                            "This option is required when 
using JDBC state store or JDBC event handler.")
                                     .build());
 
-    public static final ConfigOption<String> STATE_STORE_JDBC_USERNAME =
-            autoscalerStandaloneConfig("state-store.jdbc.username")
+    public static final ConfigOption<String> JDBC_USERNAME =
+            autoscalerStandaloneConfig("jdbc.username")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The jdbc username of jdbc state 
store when %s has been set to %s.",
-                                            code(STATE_STORE_TYPE.key()), 
code(JDBC.toString()))
+                                            "The jdbc username when %s or %s 
has been set to %s.",
+                                            code(STATE_STORE_TYPE.key()),
+                                            code(EVENT_HANDLER_TYPE.key()),
+                                            code("JDBC"))
                                     .build());
 
-    public static final ConfigOption<String> 
STATE_STORE_JDBC_PASSWORD_ENV_VARIABLE =
-            
autoscalerStandaloneConfig("state-store.jdbc.password-env-variable")
+    public static final ConfigOption<String> JDBC_PASSWORD_ENV_VARIABLE =
+            autoscalerStandaloneConfig("jdbc.password-env-variable")
                     .stringType()
-                    .defaultValue("STATE_STORE_JDBC_PWD")
+                    .defaultValue("JDBC_PWD")
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The environment variable name of 
jdbc state store password when %s has been set to %s. "
+                                            "The environment variable name of 
jdbc password when %s or %s has been set to %s. "
                                                     + "In general, the 
environment variable name doesn't need to be changed. Users need to "
                                                     + "export the password 
using this environment variable.",
-                                            code(STATE_STORE_TYPE.key()), 
code(JDBC.toString()))
+                                            code(STATE_STORE_TYPE.key()),
+                                            code(EVENT_HANDLER_TYPE.key()),
+                                            code("JDBC"))
                                     .build());
 }
diff --git 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java
similarity index 52%
copy from 
flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
copy to 
flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java
index 01f62d5c..4c76937b 100644
--- 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
+++ 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.autoscaler.standalone;
 
-import org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore;
-import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
+import org.apache.flink.autoscaler.event.LoggingEventHandler;
+import org.apache.flink.autoscaler.jdbc.event.JdbcAutoScalerEventHandler;
 import org.apache.flink.configuration.Configuration;
 
 import org.junit.jupiter.api.Test;
@@ -26,53 +26,54 @@ import org.junit.jupiter.api.Test;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 
-import static 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC;
-import static 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY;
-import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_URL;
-import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE;
+import static 
org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.JDBC;
+import static 
org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.LOGGING;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.EVENT_HANDLER_TYPE;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-class AutoscalerStateStoreFactoryTest {
+/** Test for {@link AutoscalerEventHandlerFactory}. */
+class AutoscalerEventHandlerFactoryTest {
 
     @Test
-    void testCreateDefaultStateStore() throws Exception {
-        // Test for memory state store is created by default.
-        var stateStore = AutoscalerStateStoreFactory.create(new 
Configuration());
-        
assertThat(stateStore).isInstanceOf(InMemoryAutoScalerStateStore.class);
+    void testCreateDefaultEventHandler() throws Exception {
+        // Test for logging event handler is created by default.
+        var eventHandler = AutoscalerEventHandlerFactory.create(new 
Configuration());
+        assertThat(eventHandler).isInstanceOf(LoggingEventHandler.class);
     }
 
     @Test
-    void testCreateInMemoryStateStore() throws Exception {
-        // Test for memory state store is created explicitly.
+    void testCreateInMemoryEventHandler() throws Exception {
+        // Test for logging event handler is created explicitly.
         final var conf = new Configuration();
-        conf.set(STATE_STORE_TYPE, MEMORY);
-        var stateStore = AutoscalerStateStoreFactory.create(conf);
-        
assertThat(stateStore).isInstanceOf(InMemoryAutoScalerStateStore.class);
+        conf.set(EVENT_HANDLER_TYPE, LOGGING);
+        var eventHandler = AutoscalerEventHandlerFactory.create(conf);
+        assertThat(eventHandler).isInstanceOf(LoggingEventHandler.class);
     }
 
     @Test
-    void testCreateJdbcStateStoreWithoutURL() {
+    void testCreateJdbcEventHandlerWithoutURL() {
         // Test for missing the jdbc url.
         final var conf = new Configuration();
-        conf.set(STATE_STORE_TYPE, JDBC);
-        assertThatThrownBy(() -> AutoscalerStateStoreFactory.create(conf))
+        conf.set(EVENT_HANDLER_TYPE, JDBC);
+        assertThatThrownBy(() -> AutoscalerEventHandlerFactory.create(conf))
                 .isInstanceOf(IllegalArgumentException.class)
-                .hasMessage("%s is required for jdbc state store.", 
STATE_STORE_JDBC_URL.key());
+                .hasMessage("%s is required for jdbc event handler.", 
JDBC_URL.key());
     }
 
     @Test
-    void testCreateJdbcStateStore() throws Exception {
+    void testCreateJdbcEventHandler() throws Exception {
         final var jdbcUrl = "jdbc:derby:memory:test";
         DriverManager.getConnection(String.format("%s;create=true", 
jdbcUrl)).close();
 
-        // Test for create JDBC State store.
+        // Test for create JDBC Event Handler.
         final var conf = new Configuration();
-        conf.set(STATE_STORE_TYPE, JDBC);
-        conf.set(STATE_STORE_JDBC_URL, jdbcUrl);
+        conf.set(EVENT_HANDLER_TYPE, JDBC);
+        conf.set(JDBC_URL, jdbcUrl);
 
-        var stateStore = AutoscalerStateStoreFactory.create(conf);
-        assertThat(stateStore).isInstanceOf(JdbcAutoScalerStateStore.class);
+        var eventHandler = AutoscalerEventHandlerFactory.create(conf);
+        
assertThat(eventHandler).isInstanceOf(JdbcAutoScalerEventHandler.class);
 
         try {
             DriverManager.getConnection(String.format("%s;shutdown=true", 
jdbcUrl)).close();
diff --git 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
index 01f62d5c..0939ea06 100644
--- 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
+++ 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java
@@ -28,7 +28,7 @@ import java.sql.SQLException;
 
 import static 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC;
 import static 
org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY;
-import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_JDBC_URL;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
 import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -58,7 +58,7 @@ class AutoscalerStateStoreFactoryTest {
         conf.set(STATE_STORE_TYPE, JDBC);
         assertThatThrownBy(() -> AutoscalerStateStoreFactory.create(conf))
                 .isInstanceOf(IllegalArgumentException.class)
-                .hasMessage("%s is required for jdbc state store.", 
STATE_STORE_JDBC_URL.key());
+                .hasMessage("%s is required for jdbc state store.", 
JDBC_URL.key());
     }
 
     @Test
@@ -69,7 +69,7 @@ class AutoscalerStateStoreFactoryTest {
         // Test for create JDBC State store.
         final var conf = new Configuration();
         conf.set(STATE_STORE_TYPE, JDBC);
-        conf.set(STATE_STORE_JDBC_URL, jdbcUrl);
+        conf.set(JDBC_URL, jdbcUrl);
 
         var stateStore = AutoscalerStateStoreFactory.create(conf);
         assertThat(stateStore).isInstanceOf(JdbcAutoScalerStateStore.class);

Reply via email to