This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 9fda27a855549d9e3d132523642f732a3a554354
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Wed Jan 31 17:07:44 2024 +0800

    [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler
---
 flink-autoscaler-plugin-jdbc/pom.xml               |   7 +
 .../autoscaler/jdbc/event/AutoScalerEvent.java}    |  43 +-
 .../jdbc/event/JdbcAutoScalerEventHandler.java     | 115 +++++
 .../autoscaler/jdbc/event/JdbcEventInteractor.java | 155 ++++++
 .../src/main/resources/schema/derby_schema.sql     |  19 +
 .../src/main/resources/schema/mysql_schema.sql     |  16 +
 .../src/main/resources/schema/postgres_schema.sql  |  16 +
 .../AbstractJdbcAutoscalerEventHandlerITCase.java  | 550 +++++++++++++++++++++
 .../event/AbstractJdbcEventInteractorITCase.java   | 147 ++++++
 .../jdbc/event/CountableJdbcEventInteractor.java   |  73 +++
 .../testutils/databases/derby/DerbyExtension.java  |  31 +-
 .../test/resources/test_schema/mysql_schema.sql    |  15 +
 .../test/resources/test_schema/postgres_schema.sql |  16 +
 13 files changed, 1186 insertions(+), 17 deletions(-)

diff --git a/flink-autoscaler-plugin-jdbc/pom.xml 
b/flink-autoscaler-plugin-jdbc/pom.xml
index 097d4f22..ccef24d4 100644
--- a/flink-autoscaler-plugin-jdbc/pom.xml
+++ b/flink-autoscaler-plugin-jdbc/pom.xml
@@ -66,6 +66,13 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- Test dependencies -->
 
         <dependency>
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/AutoScalerEvent.java
similarity index 58%
copy from 
flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
copy to 
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/AutoScalerEvent.java
index 4d8abc22..a1b22a0b 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/AutoScalerEvent.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,16 +15,31 @@
  * limitations under the License.
  */
 
-CREATE DATABASE flink_autoscaler;
-\c flink_autoscaler;
-
-CREATE TABLE t_flink_autoscaler_state_store
-(
-    id            BIGSERIAL     NOT NULL,
-    update_time   TIMESTAMP     NOT NULL,
-    job_key       TEXT          NOT NULL,
-    state_type    TEXT          NOT NULL,
-    state_value   TEXT          NOT NULL,
-    PRIMARY KEY (id),
-    UNIQUE (job_key, state_type)
-);
+package org.apache.flink.autoscaler.jdbc.event;
+
+import lombok.Data;
+
+import java.time.Instant;
+
+/** The Jdbc autoscaler event. */
+@Data
+public class AutoScalerEvent {
+
+    private final long id;
+
+    private final Instant createTime;
+
+    private final Instant updateTime;
+
+    private final String jobKey;
+
+    private final String reason;
+
+    private final String eventType;
+
+    private final String message;
+
+    private final int count;
+
+    private final String eventKey;
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
new file mode 100644
index 00000000..c42692a6
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.event;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.SneakyThrows;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The event handler which persists its event in JDBC related database.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> The job autoscaler context.
+ */
+@Experimental
+public class JdbcAutoScalerEventHandler<KEY, Context extends 
JobAutoScalerContext<KEY>>
+        implements AutoScalerEventHandler<KEY, Context> {
+
+    private final JdbcEventInteractor jdbcEventInteractor;
+
+    public JdbcAutoScalerEventHandler(JdbcEventInteractor jdbcEventInteractor) 
{
+        this.jdbcEventInteractor = jdbcEventInteractor;
+    }
+
+    @SneakyThrows
+    @Override
+    public void handleEvent(
+            Context context,
+            Type type,
+            String reason,
+            String message,
+            @Nullable String messageKey,
+            @Nullable Duration interval) {
+        final var jobKey = context.getJobKey().toString();
+        var eventKey =
+                Integer.toString(
+                        Objects.hash(
+                                jobKey, type, reason, messageKey != null ? 
messageKey : message));
+        if (interval == null) {
+            // Don't deduplicate when interval is null.
+            jdbcEventInteractor.createEvent(jobKey, reason, type, message, 
eventKey);
+            return;
+        }
+
+        final var oldEventOpt = jdbcEventInteractor.queryLatestEvent(jobKey, 
reason, eventKey);
+        // Updating the old event when old event is present and the old event 
is created within
+        // interval to avoid generating a large number of duplicate events.
+        // Creating a new event when old event isn't present or old event is 
created before
+        // interval.
+        if (oldEventOpt.isPresent() && intervalCheck(oldEventOpt.get(), 
interval)) {
+            final var oldEvent = oldEventOpt.get();
+            jdbcEventInteractor.updateEvent(oldEvent.getId(), message, 
oldEvent.getCount() + 1);
+        } else {
+            jdbcEventInteractor.createEvent(jobKey, reason, type, message, 
eventKey);
+        }
+    }
+
+    @Override
+    public void handleScalingEvent(
+            Context context,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            String message,
+            Duration interval) {
+        if 
(message.contains(SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED)) {
+            // Don't deduplicate when scaling happens.
+            AutoScalerEventHandler.super.handleScalingEvent(
+                    context, scalingSummaries, message, null);
+        } else {
+            // When scaling doesn't happen, autoscaler will generate a lot of 
scaling event.
+            // So we deduplicate event based on the parallelism hashcode. If 
the recommended
+            // parallelism isn't changed, we only create a new ScalingReport 
event per interval.
+            handleEvent(
+                    context,
+                    Type.Normal,
+                    SCALING_REPORT_REASON,
+                    AutoScalerEventHandler.scalingReport(scalingSummaries, 
message),
+                    
AutoScalerEventHandler.getParallelismHashCode(scalingSummaries),
+                    interval);
+        }
+    }
+
+    /**
+     * @return True means the existing event is still in the interval duration 
we can update it.
+     *     Otherwise, it's too early, we should create a new one instead of 
updating it.
+     */
+    private boolean intervalCheck(AutoScalerEvent existing, Duration interval) 
{
+        return existing.getCreateTime()
+                
.isAfter(jdbcEventInteractor.getCurrentInstant().minusMillis(interval.toMillis()));
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
new file mode 100644
index 00000000..5e2d6166
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.event;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Responsible for interacting with the database. */
+public class JdbcEventInteractor {
+
+    private final Connection conn;
+    private Clock clock = Clock.systemDefaultZone();
+
+    public JdbcEventInteractor(Connection conn) {
+        this.conn = conn;
+    }
+
+    public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String 
reason, String eventKey)
+            throws Exception {
+        var query =
+                "select * from t_flink_autoscaler_event_handler "
+                        + "where job_key = ? and reason = ? and event_key = ? 
";
+
+        try (var pstmt = conn.prepareStatement(query)) {
+            pstmt.setString(1, jobKey);
+            pstmt.setString(2, reason);
+            pstmt.setString(3, eventKey);
+
+            var rs = pstmt.executeQuery();
+            // A better approach of finding the latestEvent is sql query desc 
the id and limit 1,
+            // but the limit syntax is different for different databases.
+            AutoScalerEvent latestEvent = null;
+            while (rs.next()) {
+                var currentEvent = generateEvent(rs);
+                if (latestEvent == null || latestEvent.getId() < 
currentEvent.getId()) {
+                    // If the current event is newer than the latestEvent, 
then update the
+                    // latestEvent.
+                    latestEvent = currentEvent;
+                }
+            }
+            return Optional.ofNullable(latestEvent);
+        }
+    }
+
+    private AutoScalerEvent generateEvent(ResultSet rs) throws SQLException {
+        return new AutoScalerEvent(
+                rs.getLong("id"),
+                rs.getTimestamp("create_time").toInstant(),
+                rs.getTimestamp("update_time").toInstant(),
+                rs.getString("job_key"),
+                rs.getString("reason"),
+                rs.getString("event_type"),
+                rs.getString("message"),
+                rs.getInt("event_count"),
+                rs.getString("event_key"));
+    }
+
+    public void createEvent(
+            String jobKey,
+            String reason,
+            AutoScalerEventHandler.Type type,
+            String message,
+            String eventKey)
+            throws Exception {
+        var query =
+                "INSERT INTO t_flink_autoscaler_event_handler ("
+                        + "create_time, update_time, job_key, reason, 
event_type, message, event_count, event_key)"
+                        + " values (?, ?, ?, ?, ?, ?, ?, ?)";
+
+        var createTime = Timestamp.from(clock.instant());
+        try (var pstmt = conn.prepareStatement(query)) {
+            pstmt.setTimestamp(1, createTime);
+            pstmt.setTimestamp(2, createTime);
+            pstmt.setString(3, jobKey);
+            pstmt.setString(4, reason);
+            pstmt.setString(5, type.toString());
+            pstmt.setString(6, message);
+            pstmt.setInt(7, 1);
+            pstmt.setString(8, eventKey);
+            pstmt.executeUpdate();
+        }
+    }
+
+    public void updateEvent(long id, String message, int eventCount) throws 
Exception {
+        var query =
+                "UPDATE t_flink_autoscaler_event_handler set update_time = ?, 
message = ?, event_count = ? where id = ?";
+
+        var updateTime = Timestamp.from(clock.instant());
+        try (var pstmt = conn.prepareStatement(query)) {
+            pstmt.setTimestamp(1, updateTime);
+            pstmt.setString(2, message);
+            pstmt.setInt(3, eventCount);
+            pstmt.setLong(4, id);
+            checkState(pstmt.executeUpdate() == 1, "Update event id=[%s] 
fails.", id);
+        }
+    }
+
+    public Instant getCurrentInstant() {
+        return clock.instant();
+    }
+
+    @VisibleForTesting
+    protected List<AutoScalerEvent> queryEvents(String jobKey, String reason) 
throws Exception {
+        var query =
+                "select * from t_flink_autoscaler_event_handler "
+                        + "where job_key = ? and reason = ? ";
+
+        try (var pstmt = conn.prepareStatement(query)) {
+            pstmt.setString(1, jobKey);
+            pstmt.setString(2, reason);
+
+            var rs = pstmt.executeQuery();
+            var events = new ArrayList<AutoScalerEvent>();
+            while (rs.next()) {
+                events.add(generateEvent(rs));
+            }
+            return events;
+        }
+    }
+
+    @VisibleForTesting
+    void setClock(@Nonnull Clock clock) {
+        this.clock = Preconditions.checkNotNull(clock);
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql
index eb5cab6a..2ae16f84 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql
+++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql
@@ -24,3 +24,22 @@ CREATE TABLE t_flink_autoscaler_state_store
     state_value   CLOB NOT NULL,
     PRIMARY KEY (id)
 );
+
+CREATE UNIQUE INDEX un_job_state_type_inx ON t_flink_autoscaler_state_store 
(job_key, state_type);
+
+CREATE TABLE t_flink_autoscaler_event_handler
+(
+    id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT 
BY 1),
+    create_time TIMESTAMP NOT NULL,
+    update_time TIMESTAMP NOT NULL,
+    job_key VARCHAR(191) NOT NULL,
+    reason VARCHAR(500) NOT NULL,
+    event_type VARCHAR(100) NOT NULL,
+    message CLOB NOT NULL,
+    event_count INTEGER NOT NULL,
+    event_key VARCHAR(100) NOT NULL,
+    PRIMARY KEY (id)
+);
+
+CREATE INDEX job_key_reason_event_key_idx ON t_flink_autoscaler_event_handler 
(job_key, reason, event_key);
+CREATE INDEX job_key_reason_create_time_idx ON 
t_flink_autoscaler_event_handler (job_key, reason, create_time);
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql
index 5e72512e..7b280463 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql
+++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql
@@ -30,3 +30,19 @@ create table `t_flink_autoscaler_state_store`
     unique key `un_job_state_type_inx` (`job_key`,`state_type`) using btree
 ) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci;
 
+create table `t_flink_autoscaler_event_handler`
+(
+    `id`            bigint       not null auto_increment,
+    `create_time`   datetime     not null comment 'The create time',
+    `update_time`   datetime     not null comment 'The update time',
+    `job_key`       varchar(191) not null comment 'The job key',
+    `reason`        varchar(191) not null comment 'The event reason, such as: 
ScalingReport, IneffectiveScaling and AutoscalerError, etc.',
+    `event_type`    varchar(100) not null comment 'The event type, such as: 
Normal, Warning.',
+    `message`       longtext     not null comment 'The event message.',
+    `event_count`   int          not null comment 'The count of current 
event.',
+    `event_key`     varchar(100) not null comment 'The event key is used for 
event deduplication.',
+    primary key (`id`) using btree,
+    INDEX `job_key_reason_event_key_idx` (`job_key`, `reason`, `event_key`),
+    INDEX `job_key_reason_create_time_idx` (`job_key`, `reason`, `create_time`)
+) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci;
+
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
index 4d8abc22..ea0d0118 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
+++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
@@ -28,3 +28,19 @@ CREATE TABLE t_flink_autoscaler_state_store
     PRIMARY KEY (id),
     UNIQUE (job_key, state_type)
 );
+
+CREATE TABLE t_flink_autoscaler_event_handler
+(
+    id BIGINT NOT NULL PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
+    create_time     TIMESTAMP       NOT NULL,
+    update_time     TIMESTAMP       NOT NULL,
+    job_key         VARCHAR(191)    NOT NULL,
+    reason          VARCHAR(500)    NOT NULL,
+    event_type      VARCHAR(100)    NOT NULL,
+    message         TEXT            NOT NULL,
+    event_count     INTEGER         NOT NULL,
+    event_key       VARCHAR(100)    NOT NULL
+);
+
+CREATE INDEX job_key_reason_event_key_idx ON t_flink_autoscaler_event_handler 
(job_key, reason, event_key);
+CREATE INDEX job_key_reason_create_time_idx ON 
t_flink_autoscaler_event_handler (job_key, reason, create_time);
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
new file mode 100644
index 00000000..b08030f1
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.event;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.event.AutoscalerEventUtils;
+import org.apache.flink.autoscaler.event.VertexScalingReport;
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+
+import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The abstract IT case for {@link JdbcAutoScalerEventHandler}. */
+abstract class AbstractJdbcAutoscalerEventHandlerITCase implements 
DatabaseTest {
+
+    private final String jobVertex = "1b51e99e55e89e404d9a0443fd98d9e2";
+    private final Duration interval = Duration.ofMinutes(30);
+    private final int currentParallelism = 1;
+    private final int newParallelism = 2;
+    private final double metricAvg = 10.1d;
+    private final double metricCurrent = 20.5d;
+    private final Instant createTime = 
Instant.now().truncatedTo(ChronoUnit.SECONDS);
+    private final Map<JobVertexID, ScalingSummary> scalingSummaries =
+            generateScalingSummaries(currentParallelism, newParallelism, 
metricAvg, metricCurrent);
+
+    private CountableJdbcEventInteractor jdbcEventInteractor;
+    private JdbcAutoScalerEventHandler<JobID, JobAutoScalerContext<JobID>> 
eventHandler;
+    private JobAutoScalerContext<JobID> ctx;
+
+    @BeforeEach
+    void beforeEach() throws Exception {
+        jdbcEventInteractor = new 
CountableJdbcEventInteractor(getConnection());
+        jdbcEventInteractor.setClock(Clock.fixed(createTime, 
ZoneId.systemDefault()));
+        eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor);
+        ctx = createDefaultJobAutoScalerContext();
+    }
+
+    /** All events shouldn't be deduplicated when interval is null. */
+    @Test
+    void testEventWithoutInterval() throws Exception {
+        var reason = "ExpectedEventReason";
+        var message = "ExpectedEventMessage";
+
+        jdbcEventInteractor.assertCounters(0, 0, 0);
+
+        eventHandler.handleEvent(
+                ctx, AutoScalerEventHandler.Type.Normal, reason, message, 
null, null);
+        jdbcEventInteractor.assertCounters(0, 0, 1);
+
+        assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), 
reason))
+                .singleElement()
+                .satisfies(event -> assertEvent(event, createTime, createTime, 
reason, message, 1));
+
+        // Handler the same event.
+        jdbcEventInteractor.setClock(
+                Clock.fixed(createTime.plusSeconds(1), 
ZoneId.systemDefault()));
+        eventHandler.handleEvent(
+                ctx, AutoScalerEventHandler.Type.Normal, reason, message, 
null, null);
+        jdbcEventInteractor.assertCounters(0, 0, 2);
+
+        assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), 
reason))
+                .hasSize(2)
+                .as("All events shouldn't be deduplicated when interval is 
null.")
+                .satisfiesExactlyInAnyOrder(
+                        event -> assertEvent(event, createTime, createTime, 
reason, message, 1),
+                        event ->
+                                assertEvent(
+                                        event,
+                                        createTime.plusSeconds(1),
+                                        createTime.plusSeconds(1),
+                                        reason,
+                                        message,
+                                        1));
+    }
+
+    /**
+     * The message will be the message key, and the event should be 
deduplicated within interval.
+     */
+    @Test
+    void testEventIntervalWithoutMessageKey() throws Exception {
+        var reason = "ExpectedEventReason";
+        var message = "ExpectedEventMessage";
+
+        jdbcEventInteractor.assertCounters(0, 0, 0);
+        eventHandler.handleEvent(
+                ctx, AutoScalerEventHandler.Type.Normal, reason, message, 
null, interval);
+        assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), 
reason))
+                .singleElement()
+                .satisfies(event -> assertEvent(event, createTime, createTime, 
reason, message, 1));
+        jdbcEventInteractor.assertCounters(1, 0, 1);
+
+        // Handler the same event within interval.
+        final Instant updateTime = 
createTime.plusSeconds(Duration.ofMinutes(20).toSeconds());
+        jdbcEventInteractor.setClock(Clock.fixed(updateTime, 
ZoneId.systemDefault()));
+
+        eventHandler.handleEvent(
+                ctx, AutoScalerEventHandler.Type.Normal, reason, message, 
null, interval);
+        jdbcEventInteractor.assertCounters(2, 1, 1);
+        assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), 
reason))
+                .singleElement()
+                .as(
+                        "We expect to update old event instead of create a new 
one when handler the same event within interval.")
+                .satisfies(event -> assertEvent(event, createTime, updateTime, 
reason, message, 2));
+
+        // Handler the same event after interval.
+        final Instant secondCreateTime = 
createTime.plusSeconds(Duration.ofMinutes(40).toSeconds());
+        jdbcEventInteractor.setClock(Clock.fixed(secondCreateTime, 
ZoneId.systemDefault()));
+
+        eventHandler.handleEvent(
+                ctx, AutoScalerEventHandler.Type.Normal, reason, message, 
null, interval);
+        jdbcEventInteractor.assertCounters(3, 1, 2);
+        assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), 
reason))
+                .as("We expect to create a new event when handler the same 
event after interval.")
+                .hasSize(2)
+                .satisfiesExactlyInAnyOrder(
+                        event -> assertEvent(event, createTime, updateTime, 
reason, message, 2),
+                        event ->
+                                assertEvent(
+                                        event,
+                                        secondCreateTime,
+                                        secondCreateTime,
+                                        reason,
+                                        message,
+                                        1));
+    }
+
+    /** The event should be deduplicated within interval when the message key 
is same. */
+    @Test
+    void testEventWithIntervalAndMessageKey() throws Exception {
+        var reason = "ExpectedEventReason";
+        var messageKey = "ExpectedMessageKey";
+        var firstMessage = "FirstMessage";
+        var secondMessage = "SecondMessage";
+        var thirdMessage = "ThirdMessage";
+
+        jdbcEventInteractor.assertCounters(0, 0, 0);
+        eventHandler.handleEvent(
+                ctx,
+                AutoScalerEventHandler.Type.Normal,
+                reason,
+                firstMessage,
+                messageKey,
+                interval);
+        assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), 
reason))
+                .singleElement()
+                .satisfies(
+                        event ->
+                                assertEvent(
+                                        event, createTime, createTime, reason, 
firstMessage, 1));
+        jdbcEventInteractor.assertCounters(1, 0, 1);
+
+        // Handler the same event within interval.
+        final Instant updateTime = 
createTime.plusSeconds(Duration.ofMinutes(20).toSeconds());
+        jdbcEventInteractor.setClock(Clock.fixed(updateTime, 
ZoneId.systemDefault()));
+
+        eventHandler.handleEvent(
+                ctx,
+                AutoScalerEventHandler.Type.Normal,
+                reason,
+                secondMessage,
+                messageKey,
+                interval);
+        jdbcEventInteractor.assertCounters(2, 1, 1);
+        assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), 
reason))
+                .singleElement()
+                .as(
+                        "We expect to update old event instead of create a new 
one when handler the same event within interval.")
+                .satisfies(
+                        event ->
+                                assertEvent(
+                                        event, createTime, updateTime, reason, 
secondMessage, 2));
+
+        // Handler the same event after interval.
+        final Instant secondCreateTime = 
createTime.plusSeconds(Duration.ofMinutes(40).toSeconds());
+        jdbcEventInteractor.setClock(Clock.fixed(secondCreateTime, 
ZoneId.systemDefault()));
+
+        eventHandler.handleEvent(
+                ctx,
+                AutoScalerEventHandler.Type.Normal,
+                reason,
+                thirdMessage,
+                messageKey,
+                interval);
+        jdbcEventInteractor.assertCounters(3, 1, 2);
+        assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), 
reason))
+                .as("We expect to create a new event when handler the same 
event after interval.")
+                .hasSize(2)
+                .satisfiesExactlyInAnyOrder(
+                        event ->
+                                assertEvent(
+                                        event, createTime, updateTime, reason, 
secondMessage, 2),
+                        event ->
+                                assertEvent(
+                                        event,
+                                        secondCreateTime,
+                                        secondCreateTime,
+                                        reason,
+                                        thirdMessage,
+                                        1));
+    }
+
+    /** All scaling events shouldn't be deduplicated when scaling happens. */
+    @Test
+    void testScalingEventWithScalingHappens() throws Exception {
+        Map<JobVertexID, ScalingSummary> scalingSummaries =
+                generateScalingSummaries(
+                        currentParallelism, newParallelism, metricAvg, 
metricCurrent);
+
+        jdbcEventInteractor.assertCounters(0, 0, 0);
+        eventHandler.handleScalingEvent(
+                ctx,
+                scalingSummaries,
+                
AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED,
+                interval);
+        jdbcEventInteractor.assertCounters(0, 0, 1);
+
+        assertThat(
+                        jdbcEventInteractor.queryEvents(
+                                ctx.getJobKey().toString(),
+                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                .singleElement()
+                .satisfies(
+                        event -> {
+                            assertScalingEvent(event, createTime, createTime, 
1);
+                            assertScalingReport(
+                                    event.getMessage(),
+                                    jobVertex,
+                                    currentParallelism,
+                                    newParallelism,
+                                    metricAvg,
+                                    metricCurrent,
+                                    metricAvg);
+                        });
+
+        // Handler the same event.
+        final Instant updateTime = createTime.plusSeconds(1);
+        jdbcEventInteractor.setClock(Clock.fixed(updateTime, 
ZoneId.systemDefault()));
+
+        eventHandler.handleScalingEvent(
+                ctx,
+                scalingSummaries,
+                
AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED,
+                interval);
+        jdbcEventInteractor.assertCounters(0, 0, 2);
+
+        assertThat(
+                        jdbcEventInteractor.queryEvents(
+                                ctx.getJobKey().toString(),
+                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                .as("All scaling events shouldn't be deduplicated when scaling 
happens.")
+                .hasSize(2)
+                .satisfiesExactlyInAnyOrder(
+                        event -> assertScalingEvent(event, createTime, 
createTime, 1),
+                        event -> assertScalingEvent(event, updateTime, 
updateTime, 1))
+                .allSatisfy(
+                        event ->
+                                assertScalingReport(
+                                        event.getMessage(),
+                                        jobVertex,
+                                        currentParallelism,
+                                        newParallelism,
+                                        metricAvg,
+                                        metricCurrent,
+                                        metricAvg));
+    }
+
+    /** The deduplication only works when parallelism is changed and within 
the interval. */
+    @Test
+    void testScalingEventDeduplication() throws Exception {
+        createFirstScalingEvent();
+
+        // The metric changed, but parallelism is not changed.
+        final Instant updateTime = createTime.plusSeconds(1);
+        jdbcEventInteractor.setClock(Clock.fixed(updateTime, 
ZoneId.systemDefault()));
+
+        Map<JobVertexID, ScalingSummary> newScalingSummaries =
+                generateScalingSummaries(currentParallelism, 2, 11.1d, 
metricCurrent);
+        eventHandler.handleScalingEvent(
+                ctx,
+                newScalingSummaries,
+                
AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED,
+                interval);
+        jdbcEventInteractor.assertCounters(2, 1, 1);
+
+        assertThat(
+                        jdbcEventInteractor.queryEvents(
+                                ctx.getJobKey().toString(),
+                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                .as(
+                        "The event should be deduplicated when parallelism is 
not changed and within the interval.")
+                .singleElement()
+                .satisfies(
+                        event -> {
+                            assertScalingEvent(event, createTime, updateTime, 
2);
+                            // Metric is changed.
+                            assertScalingReport(
+                                    event.getMessage(),
+                                    jobVertex,
+                                    currentParallelism,
+                                    newParallelism,
+                                    11.1d,
+                                    metricCurrent,
+                                    11.1d);
+                        });
+    }
+
+    /** We should create a new event after the interval. */
+    @Test
+    void testScalingEventNotWithinInterval() throws Exception {
+        createFirstScalingEvent();
+
+        // The parallelism is not changed, but the old event is too early.
+        final Instant newCreateTime = 
createTime.plusSeconds(Duration.ofMinutes(30).toSeconds());
+        jdbcEventInteractor.setClock(Clock.fixed(newCreateTime, 
ZoneId.systemDefault()));
+
+        eventHandler.handleScalingEvent(
+                ctx,
+                scalingSummaries,
+                
AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED,
+                interval);
+        jdbcEventInteractor.assertCounters(2, 0, 2);
+
+        assertThat(
+                        jdbcEventInteractor.queryEvents(
+                                ctx.getJobKey().toString(),
+                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                .as("We should create a new event when the old event is too 
early.")
+                .hasSize(2)
+                .satisfiesExactlyInAnyOrder(
+                        event -> assertScalingEvent(event, createTime, 
createTime, 1),
+                        event -> assertScalingEvent(event, newCreateTime, 
newCreateTime, 1))
+                .allSatisfy(
+                        event ->
+                                assertScalingReport(
+                                        event.getMessage(),
+                                        jobVertex,
+                                        currentParallelism,
+                                        newParallelism,
+                                        metricAvg,
+                                        metricCurrent,
+                                        metricAvg));
+    }
+
+    /** We should create a new event when the parallelism is changed. */
+    @Test
+    void testScalingEventWithParallelismChange() throws Exception {
+        createFirstScalingEvent();
+
+        // The parallelism is changed.
+        final Instant newCreateTime = createTime.plusSeconds(1);
+        jdbcEventInteractor.setClock(Clock.fixed(newCreateTime, 
ZoneId.systemDefault()));
+
+        var secondNewParallelism = 3;
+        Map<JobVertexID, ScalingSummary> newScalingSummaries =
+                generateScalingSummaries(
+                        currentParallelism, secondNewParallelism, metricAvg, 
metricCurrent);
+        eventHandler.handleScalingEvent(
+                ctx,
+                newScalingSummaries,
+                
AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED,
+                interval);
+        jdbcEventInteractor.assertCounters(2, 0, 2);
+
+        assertThat(
+                        jdbcEventInteractor.queryEvents(
+                                ctx.getJobKey().toString(),
+                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                .as("We should create a new event when the old event is too 
early.")
+                .hasSize(2)
+                .satisfiesExactlyInAnyOrder(
+                        event -> {
+                            assertScalingEvent(event, createTime, createTime, 
1);
+                            assertScalingReport(
+                                    event.getMessage(),
+                                    jobVertex,
+                                    currentParallelism,
+                                    newParallelism,
+                                    metricAvg,
+                                    metricCurrent,
+                                    metricAvg);
+                        },
+                        event -> {
+                            assertScalingEvent(event, newCreateTime, 
newCreateTime, 1);
+                            assertScalingReport(
+                                    event.getMessage(),
+                                    jobVertex,
+                                    currentParallelism,
+                                    secondNewParallelism,
+                                    metricAvg,
+                                    metricCurrent,
+                                    metricAvg);
+                        });
+    }
+
+    private void createFirstScalingEvent() throws Exception {
+        jdbcEventInteractor.assertCounters(0, 0, 0);
+        eventHandler.handleScalingEvent(
+                ctx,
+                scalingSummaries,
+                
AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED,
+                interval);
+        jdbcEventInteractor.assertCounters(1, 0, 1);
+
+        assertThat(
+                        jdbcEventInteractor.queryEvents(
+                                ctx.getJobKey().toString(),
+                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                .singleElement()
+                .satisfies(
+                        event -> {
+                            assertScalingEvent(event, createTime, createTime, 
1);
+                            assertScalingReport(
+                                    event.getMessage(),
+                                    jobVertex,
+                                    currentParallelism,
+                                    newParallelism,
+                                    metricAvg,
+                                    metricCurrent,
+                                    metricAvg);
+                        });
+    }
+
+    private void assertScalingReport(
+            String scalingReport,
+            String expectedJobVertex,
+            int expectedCurrentParallelism,
+            int expectedNewParallelism,
+            double expectedCurrentProcessCapacity,
+            double expectedExpectedProcessCapacity,
+            double expectedTargetDataRate) {
+        var vertexScalingReports = 
AutoscalerEventUtils.parseVertexScalingReports(scalingReport);
+
+        var expectedVertexScalingReport = new VertexScalingReport();
+        expectedVertexScalingReport.setVertexId(expectedJobVertex);
+        
expectedVertexScalingReport.setCurrentParallelism(expectedCurrentParallelism);
+        expectedVertexScalingReport.setNewParallelism(expectedNewParallelism);
+        
expectedVertexScalingReport.setCurrentProcessCapacity(expectedCurrentProcessCapacity);
+        
expectedVertexScalingReport.setExpectedProcessCapacity(expectedExpectedProcessCapacity);
+        expectedVertexScalingReport.setTargetDataRate(expectedTargetDataRate);
+        
assertThat(vertexScalingReports).singleElement().isEqualTo(expectedVertexScalingReport);
+    }
+
+    @Nonnull
+    private Map<JobVertexID, ScalingSummary> generateScalingSummaries(
+            int currentParallelism, int newParallelism, double metricAvg, 
double metricCurrent) {
+        var jobVertexID = JobVertexID.fromHexString(jobVertex);
+        var evaluatedScalingMetric = new EvaluatedScalingMetric();
+        evaluatedScalingMetric.setAverage(metricAvg);
+        evaluatedScalingMetric.setCurrent(metricCurrent);
+        return Map.of(
+                jobVertexID,
+                new ScalingSummary(
+                        currentParallelism,
+                        newParallelism,
+                        Map.of(
+                                ScalingMetric.TRUE_PROCESSING_RATE,
+                                evaluatedScalingMetric,
+                                ScalingMetric.EXPECTED_PROCESSING_RATE,
+                                evaluatedScalingMetric,
+                                ScalingMetric.TARGET_DATA_RATE,
+                                evaluatedScalingMetric)));
+    }
+
+    private void assertEvent(
+            AutoScalerEvent event,
+            Instant expectedCreateTime,
+            Instant expectedUpdateTime,
+            String expectedReason,
+            String expectedMessage,
+            int expectedCount) {
+        assertThat(event.getCreateTime()).isEqualTo(expectedCreateTime);
+        assertThat(event.getUpdateTime()).isEqualTo(expectedUpdateTime);
+        assertThat(event.getJobKey()).isEqualTo(ctx.getJobKey().toString());
+        assertThat(event.getReason()).isEqualTo(expectedReason);
+        
assertThat(event.getEventType()).isEqualTo(AutoScalerEventHandler.Type.Normal.toString());
+        assertThat(event.getMessage()).isEqualTo(expectedMessage);
+        assertThat(event.getCount()).isEqualTo(expectedCount);
+    }
+
+    private void assertScalingEvent(
+            AutoScalerEvent event,
+            Instant expectedCreateTime,
+            Instant expectedUpdateTime,
+            int expectedCount) {
+        assertThat(event.getCreateTime()).isEqualTo(expectedCreateTime);
+        assertThat(event.getUpdateTime()).isEqualTo(expectedUpdateTime);
+        assertThat(event.getJobKey()).isEqualTo(ctx.getJobKey().toString());
+        
assertThat(event.getReason()).isEqualTo(AutoScalerEventHandler.SCALING_REPORT_REASON);
+        
assertThat(event.getEventType()).isEqualTo(AutoScalerEventHandler.Type.Normal.toString());
+        assertThat(event.getCount()).isEqualTo(expectedCount);
+    }
+}
+
+/** Test {@link JdbcAutoScalerEventHandler} via Derby. */
+class DerbyJdbcAutoscalerEventHandlerITCase extends 
AbstractJdbcAutoscalerEventHandlerITCase
+        implements DerbyTestBase {}
+
+/** Test {@link JdbcAutoScalerEventHandler} via MySQL 5.6.x. */
+class MySQL56JdbcAutoscalerEventHandlerITCase extends 
AbstractJdbcAutoscalerEventHandlerITCase
+        implements MySQL56TestBase {}
+
+/** Test {@link JdbcAutoScalerEventHandler} via MySQL 5.7.x. */
+class MySQL57JdbcAutoscalerEventHandlerITCase extends 
AbstractJdbcAutoscalerEventHandlerITCase
+        implements MySQL57TestBase {}
+
+/** Test {@link JdbcAutoScalerEventHandler} via MySQL 8.x. */
+class MySQL8JdbcAutoscalerEventHandlerITCase extends 
AbstractJdbcAutoscalerEventHandlerITCase
+        implements MySQL8TestBase {}
+
+/** Test {@link JdbcAutoScalerEventHandler} via Postgre SQL. */
+class PostgreSQLJdbcAutoscalerEventHandlerITCase extends 
AbstractJdbcAutoscalerEventHandlerITCase
+        implements PostgreSQLTestBase {}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java
new file mode 100644
index 00000000..2fe69201
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.event;
+
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The abstract IT case for {@link JdbcEventInteractor}. */
+abstract class AbstractJdbcEventInteractorITCase implements DatabaseTest {
+
+    @Test
+    void testAllOperations() throws Exception {
+        var jobKey = "jobKey";
+        var reason = "ScalingReport";
+        var message = "Expected event message.";
+        var eventKey = Integer.toString(34567);
+
+        // The datetime precision is seconds in MySQL by default.
+        var createTime = Instant.now().truncatedTo(ChronoUnit.SECONDS);
+        try (var conn = getConnection()) {
+            var jdbcEventInteractor = new JdbcEventInteractor(conn);
+            jdbcEventInteractor.setClock(Clock.fixed(createTime, 
ZoneId.systemDefault()));
+
+            jdbcEventInteractor.createEvent(
+                    jobKey, reason, AutoScalerEventHandler.Type.Normal, 
message, eventKey);
+            var firstEventOptional = 
jdbcEventInteractor.queryLatestEvent(jobKey, reason, eventKey);
+            assertThat(firstEventOptional).isPresent();
+            assertEvent(
+                    firstEventOptional.get(),
+                    createTime,
+                    createTime,
+                    jobKey,
+                    reason,
+                    message,
+                    1,
+                    eventKey);
+
+            // The create time is changed for the second event.
+            var secondCreateTime = createTime.plusSeconds(5);
+            jdbcEventInteractor.setClock(Clock.fixed(secondCreateTime, 
ZoneId.systemDefault()));
+            jdbcEventInteractor.createEvent(
+                    jobKey, reason, AutoScalerEventHandler.Type.Normal, 
message + 2, eventKey);
+            // The latest event should be the second event.
+            var secondEventOptional =
+                    jdbcEventInteractor.queryLatestEvent(jobKey, reason, 
eventKey);
+            assertThat(secondEventOptional).isPresent();
+            var secondEvent = secondEventOptional.get();
+            assertEvent(
+                    secondEvent,
+                    secondCreateTime,
+                    secondCreateTime,
+                    jobKey,
+                    reason,
+                    message + 2,
+                    1,
+                    eventKey);
+
+            // Update event
+            var updateTime = secondCreateTime.plusSeconds(3);
+            jdbcEventInteractor.setClock(Clock.fixed(updateTime, 
ZoneId.systemDefault()));
+            jdbcEventInteractor.updateEvent(
+                    secondEvent.getId(), secondEvent.getMessage() + 3, 
secondEvent.getCount() + 1);
+
+            var updatedEventOptional =
+                    jdbcEventInteractor.queryLatestEvent(jobKey, reason, 
eventKey);
+            assertThat(updatedEventOptional).isPresent();
+            var updatedEvent = updatedEventOptional.get();
+            assertEvent(
+                    updatedEvent,
+                    secondCreateTime,
+                    updateTime,
+                    jobKey,
+                    reason,
+                    secondEvent.getMessage() + 3,
+                    2,
+                    eventKey);
+        }
+    }
+
+    private void assertEvent(
+            AutoScalerEvent event,
+            Instant expectedCreateTime,
+            Instant expectedUpdateTime,
+            String expectedJobKey,
+            String expectedReason,
+            String expectedMessage,
+            int expectedCount,
+            String expectedEventKey) {
+        assertThat(event.getCreateTime()).isEqualTo(expectedCreateTime);
+        assertThat(event.getUpdateTime()).isEqualTo(expectedUpdateTime);
+        assertThat(event.getJobKey()).isEqualTo(expectedJobKey);
+        assertThat(event.getReason()).isEqualTo(expectedReason);
+        
assertThat(event.getEventType()).isEqualTo(AutoScalerEventHandler.Type.Normal.toString());
+        assertThat(event.getMessage()).isEqualTo(expectedMessage);
+        assertThat(event.getCount()).isEqualTo(expectedCount);
+        assertThat(event.getEventKey()).isEqualTo(expectedEventKey);
+    }
+}
+
+/** Test {@link JdbcEventInteractor} via Derby. */
+class DerbyJdbcEventInteractorITCase extends AbstractJdbcEventInteractorITCase
+        implements DerbyTestBase {}
+
+/** Test {@link JdbcEventInteractor} via MySQL 5.6.x. */
+class MySQL56JdbcEventInteractorITCase extends 
AbstractJdbcEventInteractorITCase
+        implements MySQL56TestBase {}
+
+/** Test {@link JdbcEventInteractor} via MySQL 5.7.x. */
+class MySQL57JdbcEventInteractorITCase extends 
AbstractJdbcEventInteractorITCase
+        implements MySQL57TestBase {}
+
+/** Test {@link JdbcEventInteractor} via MySQL 8.x. */
+class MySQL8JdbcEventInteractorITCase extends AbstractJdbcEventInteractorITCase
+        implements MySQL8TestBase {}
+
+/** Test {@link JdbcEventInteractor} via Postgre SQL. */
+class PostgreSQLJdbcEventInteractorITCase extends 
AbstractJdbcEventInteractorITCase
+        implements PostgreSQLTestBase {}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
new file mode 100644
index 00000000..664a28bd
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.event;
+
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+
+import java.sql.Connection;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Countable {@link JdbcEventInteractor}. */
+class CountableJdbcEventInteractor extends JdbcEventInteractor {
+
+    private final AtomicLong queryCounter;
+    private final AtomicLong createCounter;
+    private final AtomicLong updateCounter;
+
+    public CountableJdbcEventInteractor(Connection conn) {
+        super(conn);
+        queryCounter = new AtomicLong();
+        createCounter = new AtomicLong();
+        updateCounter = new AtomicLong();
+    }
+
+    @Override
+    public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String 
reason, String eventKey)
+            throws Exception {
+        queryCounter.incrementAndGet();
+        return super.queryLatestEvent(jobKey, reason, eventKey);
+    }
+
+    @Override
+    public void createEvent(
+            String jobKey,
+            String reason,
+            AutoScalerEventHandler.Type type,
+            String message,
+            String eventKey)
+            throws Exception {
+        createCounter.incrementAndGet();
+        super.createEvent(jobKey, reason, type, message, eventKey);
+    }
+
+    @Override
+    public void updateEvent(long id, String message, int eventCount) throws 
Exception {
+        updateCounter.incrementAndGet();
+        super.updateEvent(id, message, eventCount);
+    }
+
+    public void assertCounters(
+            long expectedQueryCounter, long expectedUpdateCounter, long 
expectedCreateCounter) {
+        assertThat(queryCounter).hasValue(expectedQueryCounter);
+        assertThat(updateCounter).hasValue(expectedUpdateCounter);
+        assertThat(createCounter).hasValue(expectedCreateCounter);
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
index 0c78d978..42566d96 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
@@ -30,7 +30,8 @@ import java.util.List;
 /** The extension of Derby. */
 public class DerbyExtension implements BeforeAllCallback, AfterAllCallback, 
AfterEachCallback {
 
-    private static final List<String> TABLES = 
List.of("t_flink_autoscaler_state_store");
+    private static final List<String> TABLES =
+            List.of("t_flink_autoscaler_state_store", 
"t_flink_autoscaler_event_handler");
     private static final String JDBC_URL = "jdbc:derby:memory:test";
 
     public Connection getConnection() throws Exception {
@@ -52,12 +53,36 @@ public class DerbyExtension implements BeforeAllCallback, 
AfterAllCallback, Afte
                         + "    PRIMARY KEY (id)\n"
                         + ")\n";
 
-        var createIndex =
+        var createStateStoreIndex =
                 "CREATE UNIQUE INDEX un_job_state_type_inx ON 
t_flink_autoscaler_state_store (job_key, state_type)";
+
+        var eventHandlerDDL =
+                "CREATE TABLE t_flink_autoscaler_event_handler\n"
+                        + "(\n"
+                        + "    id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY 
(START WITH 1, INCREMENT BY 1),\n"
+                        + "    create_time TIMESTAMP NOT NULL,\n"
+                        + "    update_time TIMESTAMP NOT NULL,\n"
+                        + "    job_key VARCHAR(191) NOT NULL,\n"
+                        + "    reason VARCHAR(500) NOT NULL,\n"
+                        + "    event_type VARCHAR(100) NOT NULL,\n"
+                        + "    message CLOB NOT NULL,\n"
+                        + "    event_count INTEGER NOT NULL,\n"
+                        + "    event_key VARCHAR(100) NOT NULL,\n"
+                        + "    PRIMARY KEY (id)\n"
+                        + ")\n";
+
+        var eventKeyIndex =
+                "CREATE INDEX job_key_reason_event_key_idx ON 
t_flink_autoscaler_event_handler (job_key, reason, event_key)";
+        var jobKeyReasonCreateTimeIndex =
+                "CREATE INDEX job_key_reason_create_time_idx ON 
t_flink_autoscaler_event_handler (job_key, reason, create_time)";
+
         try (var conn = getConnection();
                 var st = conn.createStatement()) {
             st.execute(stateStoreDDL);
-            st.execute(createIndex);
+            st.execute(createStateStoreIndex);
+            st.execute(eventHandlerDDL);
+            st.execute(eventKeyIndex);
+            st.execute(jobKeyReasonCreateTimeIndex);
         }
     }
 
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql
index 2fcd468c..d2e39ad0 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql
@@ -26,3 +26,18 @@ create table `t_flink_autoscaler_state_store`
     unique key `un_job_state_type_inx` (`job_key`,`state_type`) using btree
 ) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci;
 
+create table `t_flink_autoscaler_event_handler`
+(
+    `id`            bigint       not null auto_increment,
+    `create_time`   datetime     not null comment 'The create time',
+    `update_time`   datetime     not null comment 'The update time',
+    `job_key`       varchar(191) not null comment 'The job key',
+    `reason`        varchar(191) not null comment 'The event reason, such as: 
ScalingReport, IneffectiveScaling and AutoscalerError, etc.',
+    `event_type`    varchar(100) not null comment 'The event type, such as: 
Normal, Warning.',
+    `message`       longtext     not null comment 'The event message.',
+    `event_count`   int          not null comment 'The count of current 
event.',
+    `event_key`     varchar(100) not null comment 'The event key is used for 
event deduplication.',
+    primary key (`id`) using btree,
+    INDEX `job_key_reason_event_key_idx` (`job_key`, `reason`, `event_key`),
+    INDEX `job_key_reason_create_time_idx` (`job_key`, `reason`, `create_time`)
+) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci;
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
 
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
index 0f405d10..733401ce 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
@@ -25,3 +25,19 @@ CREATE TABLE t_flink_autoscaler_state_store
     PRIMARY KEY (id),
     UNIQUE (job_key, state_type)
 );
+
+CREATE TABLE t_flink_autoscaler_event_handler
+(
+    id BIGINT NOT NULL PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
+    create_time     TIMESTAMP       NOT NULL,
+    update_time     TIMESTAMP       NOT NULL,
+    job_key         VARCHAR(191)    NOT NULL,
+    reason          VARCHAR(500)    NOT NULL,
+    event_type      VARCHAR(100)    NOT NULL,
+    message         TEXT            NOT NULL,
+    event_count     INTEGER         NOT NULL,
+    event_key       VARCHAR(100)    NOT NULL
+);
+
+CREATE INDEX job_key_reason_event_key_idx ON t_flink_autoscaler_event_handler 
(job_key, reason, event_key);
+CREATE INDEX job_key_reason_create_time_idx ON 
t_flink_autoscaler_event_handler (job_key, reason, create_time);

Reply via email to