This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 9fda27a855549d9e3d132523642f732a3a554354 Author: Rui Fan <1996fan...@gmail.com> AuthorDate: Wed Jan 31 17:07:44 2024 +0800 [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler --- flink-autoscaler-plugin-jdbc/pom.xml | 7 + .../autoscaler/jdbc/event/AutoScalerEvent.java} | 43 +- .../jdbc/event/JdbcAutoScalerEventHandler.java | 115 +++++ .../autoscaler/jdbc/event/JdbcEventInteractor.java | 155 ++++++ .../src/main/resources/schema/derby_schema.sql | 19 + .../src/main/resources/schema/mysql_schema.sql | 16 + .../src/main/resources/schema/postgres_schema.sql | 16 + .../AbstractJdbcAutoscalerEventHandlerITCase.java | 550 +++++++++++++++++++++ .../event/AbstractJdbcEventInteractorITCase.java | 147 ++++++ .../jdbc/event/CountableJdbcEventInteractor.java | 73 +++ .../testutils/databases/derby/DerbyExtension.java | 31 +- .../test/resources/test_schema/mysql_schema.sql | 15 + .../test/resources/test_schema/postgres_schema.sql | 16 + 13 files changed, 1186 insertions(+), 17 deletions(-) diff --git a/flink-autoscaler-plugin-jdbc/pom.xml b/flink-autoscaler-plugin-jdbc/pom.xml index 097d4f22..ccef24d4 100644 --- a/flink-autoscaler-plugin-jdbc/pom.xml +++ b/flink-autoscaler-plugin-jdbc/pom.xml @@ -66,6 +66,13 @@ under the License. <scope>provided</scope> </dependency> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>${lombok.version}</version> + <scope>provided</scope> + </dependency> + <!-- Test dependencies --> <dependency> diff --git a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/AutoScalerEvent.java similarity index 58% copy from flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql copy to flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/AutoScalerEvent.java index 4d8abc22..a1b22a0b 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/AutoScalerEvent.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,16 +15,31 @@ * limitations under the License. */ -CREATE DATABASE flink_autoscaler; -\c flink_autoscaler; - -CREATE TABLE t_flink_autoscaler_state_store -( - id BIGSERIAL NOT NULL, - update_time TIMESTAMP NOT NULL, - job_key TEXT NOT NULL, - state_type TEXT NOT NULL, - state_value TEXT NOT NULL, - PRIMARY KEY (id), - UNIQUE (job_key, state_type) -); +package org.apache.flink.autoscaler.jdbc.event; + +import lombok.Data; + +import java.time.Instant; + +/** The Jdbc autoscaler event. */ +@Data +public class AutoScalerEvent { + + private final long id; + + private final Instant createTime; + + private final Instant updateTime; + + private final String jobKey; + + private final String reason; + + private final String eventType; + + private final String message; + + private final int count; + + private final String eventKey; +} diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java new file mode 100644 index 00000000..c42692a6 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.event; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import lombok.SneakyThrows; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; + +/** + * The event handler which persists its event in JDBC related database. + * + * @param <KEY> The job key. + * @param <Context> The job autoscaler context. + */ +@Experimental +public class JdbcAutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> + implements AutoScalerEventHandler<KEY, Context> { + + private final JdbcEventInteractor jdbcEventInteractor; + + public JdbcAutoScalerEventHandler(JdbcEventInteractor jdbcEventInteractor) { + this.jdbcEventInteractor = jdbcEventInteractor; + } + + @SneakyThrows + @Override + public void handleEvent( + Context context, + Type type, + String reason, + String message, + @Nullable String messageKey, + @Nullable Duration interval) { + final var jobKey = context.getJobKey().toString(); + var eventKey = + Integer.toString( + Objects.hash( + jobKey, type, reason, messageKey != null ? messageKey : message)); + if (interval == null) { + // Don't deduplicate when interval is null. + jdbcEventInteractor.createEvent(jobKey, reason, type, message, eventKey); + return; + } + + final var oldEventOpt = jdbcEventInteractor.queryLatestEvent(jobKey, reason, eventKey); + // Updating the old event when old event is present and the old event is created within + // interval to avoid generating a large number of duplicate events. + // Creating a new event when old event isn't present or old event is created before + // interval. + if (oldEventOpt.isPresent() && intervalCheck(oldEventOpt.get(), interval)) { + final var oldEvent = oldEventOpt.get(); + jdbcEventInteractor.updateEvent(oldEvent.getId(), message, oldEvent.getCount() + 1); + } else { + jdbcEventInteractor.createEvent(jobKey, reason, type, message, eventKey); + } + } + + @Override + public void handleScalingEvent( + Context context, + Map<JobVertexID, ScalingSummary> scalingSummaries, + String message, + Duration interval) { + if (message.contains(SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED)) { + // Don't deduplicate when scaling happens. + AutoScalerEventHandler.super.handleScalingEvent( + context, scalingSummaries, message, null); + } else { + // When scaling doesn't happen, autoscaler will generate a lot of scaling event. + // So we deduplicate event based on the parallelism hashcode. If the recommended + // parallelism isn't changed, we only create a new ScalingReport event per interval. + handleEvent( + context, + Type.Normal, + SCALING_REPORT_REASON, + AutoScalerEventHandler.scalingReport(scalingSummaries, message), + AutoScalerEventHandler.getParallelismHashCode(scalingSummaries), + interval); + } + } + + /** + * @return True means the existing event is still in the interval duration we can update it. + * Otherwise, it's too early, we should create a new one instead of updating it. + */ + private boolean intervalCheck(AutoScalerEvent existing, Duration interval) { + return existing.getCreateTime() + .isAfter(jdbcEventInteractor.getCurrentInstant().minusMillis(interval.toMillis())); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java new file mode 100644 index 00000000..5e2d6166 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.event; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Clock; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Responsible for interacting with the database. */ +public class JdbcEventInteractor { + + private final Connection conn; + private Clock clock = Clock.systemDefaultZone(); + + public JdbcEventInteractor(Connection conn) { + this.conn = conn; + } + + public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason, String eventKey) + throws Exception { + var query = + "select * from t_flink_autoscaler_event_handler " + + "where job_key = ? and reason = ? and event_key = ? "; + + try (var pstmt = conn.prepareStatement(query)) { + pstmt.setString(1, jobKey); + pstmt.setString(2, reason); + pstmt.setString(3, eventKey); + + var rs = pstmt.executeQuery(); + // A better approach of finding the latestEvent is sql query desc the id and limit 1, + // but the limit syntax is different for different databases. + AutoScalerEvent latestEvent = null; + while (rs.next()) { + var currentEvent = generateEvent(rs); + if (latestEvent == null || latestEvent.getId() < currentEvent.getId()) { + // If the current event is newer than the latestEvent, then update the + // latestEvent. + latestEvent = currentEvent; + } + } + return Optional.ofNullable(latestEvent); + } + } + + private AutoScalerEvent generateEvent(ResultSet rs) throws SQLException { + return new AutoScalerEvent( + rs.getLong("id"), + rs.getTimestamp("create_time").toInstant(), + rs.getTimestamp("update_time").toInstant(), + rs.getString("job_key"), + rs.getString("reason"), + rs.getString("event_type"), + rs.getString("message"), + rs.getInt("event_count"), + rs.getString("event_key")); + } + + public void createEvent( + String jobKey, + String reason, + AutoScalerEventHandler.Type type, + String message, + String eventKey) + throws Exception { + var query = + "INSERT INTO t_flink_autoscaler_event_handler (" + + "create_time, update_time, job_key, reason, event_type, message, event_count, event_key)" + + " values (?, ?, ?, ?, ?, ?, ?, ?)"; + + var createTime = Timestamp.from(clock.instant()); + try (var pstmt = conn.prepareStatement(query)) { + pstmt.setTimestamp(1, createTime); + pstmt.setTimestamp(2, createTime); + pstmt.setString(3, jobKey); + pstmt.setString(4, reason); + pstmt.setString(5, type.toString()); + pstmt.setString(6, message); + pstmt.setInt(7, 1); + pstmt.setString(8, eventKey); + pstmt.executeUpdate(); + } + } + + public void updateEvent(long id, String message, int eventCount) throws Exception { + var query = + "UPDATE t_flink_autoscaler_event_handler set update_time = ?, message = ?, event_count = ? where id = ?"; + + var updateTime = Timestamp.from(clock.instant()); + try (var pstmt = conn.prepareStatement(query)) { + pstmt.setTimestamp(1, updateTime); + pstmt.setString(2, message); + pstmt.setInt(3, eventCount); + pstmt.setLong(4, id); + checkState(pstmt.executeUpdate() == 1, "Update event id=[%s] fails.", id); + } + } + + public Instant getCurrentInstant() { + return clock.instant(); + } + + @VisibleForTesting + protected List<AutoScalerEvent> queryEvents(String jobKey, String reason) throws Exception { + var query = + "select * from t_flink_autoscaler_event_handler " + + "where job_key = ? and reason = ? "; + + try (var pstmt = conn.prepareStatement(query)) { + pstmt.setString(1, jobKey); + pstmt.setString(2, reason); + + var rs = pstmt.executeQuery(); + var events = new ArrayList<AutoScalerEvent>(); + while (rs.next()) { + events.add(generateEvent(rs)); + } + return events; + } + } + + @VisibleForTesting + void setClock(@Nonnull Clock clock) { + this.clock = Preconditions.checkNotNull(clock); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql index eb5cab6a..2ae16f84 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql +++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql @@ -24,3 +24,22 @@ CREATE TABLE t_flink_autoscaler_state_store state_value CLOB NOT NULL, PRIMARY KEY (id) ); + +CREATE UNIQUE INDEX un_job_state_type_inx ON t_flink_autoscaler_state_store (job_key, state_type); + +CREATE TABLE t_flink_autoscaler_event_handler +( + id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), + create_time TIMESTAMP NOT NULL, + update_time TIMESTAMP NOT NULL, + job_key VARCHAR(191) NOT NULL, + reason VARCHAR(500) NOT NULL, + event_type VARCHAR(100) NOT NULL, + message CLOB NOT NULL, + event_count INTEGER NOT NULL, + event_key VARCHAR(100) NOT NULL, + PRIMARY KEY (id) +); + +CREATE INDEX job_key_reason_event_key_idx ON t_flink_autoscaler_event_handler (job_key, reason, event_key); +CREATE INDEX job_key_reason_create_time_idx ON t_flink_autoscaler_event_handler (job_key, reason, create_time); diff --git a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql index 5e72512e..7b280463 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql +++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql @@ -30,3 +30,19 @@ create table `t_flink_autoscaler_state_store` unique key `un_job_state_type_inx` (`job_key`,`state_type`) using btree ) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci; +create table `t_flink_autoscaler_event_handler` +( + `id` bigint not null auto_increment, + `create_time` datetime not null comment 'The create time', + `update_time` datetime not null comment 'The update time', + `job_key` varchar(191) not null comment 'The job key', + `reason` varchar(191) not null comment 'The event reason, such as: ScalingReport, IneffectiveScaling and AutoscalerError, etc.', + `event_type` varchar(100) not null comment 'The event type, such as: Normal, Warning.', + `message` longtext not null comment 'The event message.', + `event_count` int not null comment 'The count of current event.', + `event_key` varchar(100) not null comment 'The event key is used for event deduplication.', + primary key (`id`) using btree, + INDEX `job_key_reason_event_key_idx` (`job_key`, `reason`, `event_key`), + INDEX `job_key_reason_create_time_idx` (`job_key`, `reason`, `create_time`) +) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci; + diff --git a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql index 4d8abc22..ea0d0118 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql +++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql @@ -28,3 +28,19 @@ CREATE TABLE t_flink_autoscaler_state_store PRIMARY KEY (id), UNIQUE (job_key, state_type) ); + +CREATE TABLE t_flink_autoscaler_event_handler +( + id BIGINT NOT NULL PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + create_time TIMESTAMP NOT NULL, + update_time TIMESTAMP NOT NULL, + job_key VARCHAR(191) NOT NULL, + reason VARCHAR(500) NOT NULL, + event_type VARCHAR(100) NOT NULL, + message TEXT NOT NULL, + event_count INTEGER NOT NULL, + event_key VARCHAR(100) NOT NULL +); + +CREATE INDEX job_key_reason_event_key_idx ON t_flink_autoscaler_event_handler (job_key, reason, event_key); +CREATE INDEX job_key_reason_create_time_idx ON t_flink_autoscaler_event_handler (job_key, reason, create_time); diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java new file mode 100644 index 00000000..b08030f1 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.event; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.autoscaler.event.AutoscalerEventUtils; +import org.apache.flink.autoscaler.event.VertexScalingReport; +import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest; +import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; +import java.util.Map; + +import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext; +import static org.assertj.core.api.Assertions.assertThat; + +/** The abstract IT case for {@link JdbcAutoScalerEventHandler}. */ +abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest { + + private final String jobVertex = "1b51e99e55e89e404d9a0443fd98d9e2"; + private final Duration interval = Duration.ofMinutes(30); + private final int currentParallelism = 1; + private final int newParallelism = 2; + private final double metricAvg = 10.1d; + private final double metricCurrent = 20.5d; + private final Instant createTime = Instant.now().truncatedTo(ChronoUnit.SECONDS); + private final Map<JobVertexID, ScalingSummary> scalingSummaries = + generateScalingSummaries(currentParallelism, newParallelism, metricAvg, metricCurrent); + + private CountableJdbcEventInteractor jdbcEventInteractor; + private JdbcAutoScalerEventHandler<JobID, JobAutoScalerContext<JobID>> eventHandler; + private JobAutoScalerContext<JobID> ctx; + + @BeforeEach + void beforeEach() throws Exception { + jdbcEventInteractor = new CountableJdbcEventInteractor(getConnection()); + jdbcEventInteractor.setClock(Clock.fixed(createTime, ZoneId.systemDefault())); + eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor); + ctx = createDefaultJobAutoScalerContext(); + } + + /** All events shouldn't be deduplicated when interval is null. */ + @Test + void testEventWithoutInterval() throws Exception { + var reason = "ExpectedEventReason"; + var message = "ExpectedEventMessage"; + + jdbcEventInteractor.assertCounters(0, 0, 0); + + eventHandler.handleEvent( + ctx, AutoScalerEventHandler.Type.Normal, reason, message, null, null); + jdbcEventInteractor.assertCounters(0, 0, 1); + + assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), reason)) + .singleElement() + .satisfies(event -> assertEvent(event, createTime, createTime, reason, message, 1)); + + // Handler the same event. + jdbcEventInteractor.setClock( + Clock.fixed(createTime.plusSeconds(1), ZoneId.systemDefault())); + eventHandler.handleEvent( + ctx, AutoScalerEventHandler.Type.Normal, reason, message, null, null); + jdbcEventInteractor.assertCounters(0, 0, 2); + + assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), reason)) + .hasSize(2) + .as("All events shouldn't be deduplicated when interval is null.") + .satisfiesExactlyInAnyOrder( + event -> assertEvent(event, createTime, createTime, reason, message, 1), + event -> + assertEvent( + event, + createTime.plusSeconds(1), + createTime.plusSeconds(1), + reason, + message, + 1)); + } + + /** + * The message will be the message key, and the event should be deduplicated within interval. + */ + @Test + void testEventIntervalWithoutMessageKey() throws Exception { + var reason = "ExpectedEventReason"; + var message = "ExpectedEventMessage"; + + jdbcEventInteractor.assertCounters(0, 0, 0); + eventHandler.handleEvent( + ctx, AutoScalerEventHandler.Type.Normal, reason, message, null, interval); + assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), reason)) + .singleElement() + .satisfies(event -> assertEvent(event, createTime, createTime, reason, message, 1)); + jdbcEventInteractor.assertCounters(1, 0, 1); + + // Handler the same event within interval. + final Instant updateTime = createTime.plusSeconds(Duration.ofMinutes(20).toSeconds()); + jdbcEventInteractor.setClock(Clock.fixed(updateTime, ZoneId.systemDefault())); + + eventHandler.handleEvent( + ctx, AutoScalerEventHandler.Type.Normal, reason, message, null, interval); + jdbcEventInteractor.assertCounters(2, 1, 1); + assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), reason)) + .singleElement() + .as( + "We expect to update old event instead of create a new one when handler the same event within interval.") + .satisfies(event -> assertEvent(event, createTime, updateTime, reason, message, 2)); + + // Handler the same event after interval. + final Instant secondCreateTime = createTime.plusSeconds(Duration.ofMinutes(40).toSeconds()); + jdbcEventInteractor.setClock(Clock.fixed(secondCreateTime, ZoneId.systemDefault())); + + eventHandler.handleEvent( + ctx, AutoScalerEventHandler.Type.Normal, reason, message, null, interval); + jdbcEventInteractor.assertCounters(3, 1, 2); + assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), reason)) + .as("We expect to create a new event when handler the same event after interval.") + .hasSize(2) + .satisfiesExactlyInAnyOrder( + event -> assertEvent(event, createTime, updateTime, reason, message, 2), + event -> + assertEvent( + event, + secondCreateTime, + secondCreateTime, + reason, + message, + 1)); + } + + /** The event should be deduplicated within interval when the message key is same. */ + @Test + void testEventWithIntervalAndMessageKey() throws Exception { + var reason = "ExpectedEventReason"; + var messageKey = "ExpectedMessageKey"; + var firstMessage = "FirstMessage"; + var secondMessage = "SecondMessage"; + var thirdMessage = "ThirdMessage"; + + jdbcEventInteractor.assertCounters(0, 0, 0); + eventHandler.handleEvent( + ctx, + AutoScalerEventHandler.Type.Normal, + reason, + firstMessage, + messageKey, + interval); + assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), reason)) + .singleElement() + .satisfies( + event -> + assertEvent( + event, createTime, createTime, reason, firstMessage, 1)); + jdbcEventInteractor.assertCounters(1, 0, 1); + + // Handler the same event within interval. + final Instant updateTime = createTime.plusSeconds(Duration.ofMinutes(20).toSeconds()); + jdbcEventInteractor.setClock(Clock.fixed(updateTime, ZoneId.systemDefault())); + + eventHandler.handleEvent( + ctx, + AutoScalerEventHandler.Type.Normal, + reason, + secondMessage, + messageKey, + interval); + jdbcEventInteractor.assertCounters(2, 1, 1); + assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), reason)) + .singleElement() + .as( + "We expect to update old event instead of create a new one when handler the same event within interval.") + .satisfies( + event -> + assertEvent( + event, createTime, updateTime, reason, secondMessage, 2)); + + // Handler the same event after interval. + final Instant secondCreateTime = createTime.plusSeconds(Duration.ofMinutes(40).toSeconds()); + jdbcEventInteractor.setClock(Clock.fixed(secondCreateTime, ZoneId.systemDefault())); + + eventHandler.handleEvent( + ctx, + AutoScalerEventHandler.Type.Normal, + reason, + thirdMessage, + messageKey, + interval); + jdbcEventInteractor.assertCounters(3, 1, 2); + assertThat(jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), reason)) + .as("We expect to create a new event when handler the same event after interval.") + .hasSize(2) + .satisfiesExactlyInAnyOrder( + event -> + assertEvent( + event, createTime, updateTime, reason, secondMessage, 2), + event -> + assertEvent( + event, + secondCreateTime, + secondCreateTime, + reason, + thirdMessage, + 1)); + } + + /** All scaling events shouldn't be deduplicated when scaling happens. */ + @Test + void testScalingEventWithScalingHappens() throws Exception { + Map<JobVertexID, ScalingSummary> scalingSummaries = + generateScalingSummaries( + currentParallelism, newParallelism, metricAvg, metricCurrent); + + jdbcEventInteractor.assertCounters(0, 0, 0); + eventHandler.handleScalingEvent( + ctx, + scalingSummaries, + AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED, + interval); + jdbcEventInteractor.assertCounters(0, 0, 1); + + assertThat( + jdbcEventInteractor.queryEvents( + ctx.getJobKey().toString(), + AutoScalerEventHandler.SCALING_REPORT_REASON)) + .singleElement() + .satisfies( + event -> { + assertScalingEvent(event, createTime, createTime, 1); + assertScalingReport( + event.getMessage(), + jobVertex, + currentParallelism, + newParallelism, + metricAvg, + metricCurrent, + metricAvg); + }); + + // Handler the same event. + final Instant updateTime = createTime.plusSeconds(1); + jdbcEventInteractor.setClock(Clock.fixed(updateTime, ZoneId.systemDefault())); + + eventHandler.handleScalingEvent( + ctx, + scalingSummaries, + AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED, + interval); + jdbcEventInteractor.assertCounters(0, 0, 2); + + assertThat( + jdbcEventInteractor.queryEvents( + ctx.getJobKey().toString(), + AutoScalerEventHandler.SCALING_REPORT_REASON)) + .as("All scaling events shouldn't be deduplicated when scaling happens.") + .hasSize(2) + .satisfiesExactlyInAnyOrder( + event -> assertScalingEvent(event, createTime, createTime, 1), + event -> assertScalingEvent(event, updateTime, updateTime, 1)) + .allSatisfy( + event -> + assertScalingReport( + event.getMessage(), + jobVertex, + currentParallelism, + newParallelism, + metricAvg, + metricCurrent, + metricAvg)); + } + + /** The deduplication only works when parallelism is changed and within the interval. */ + @Test + void testScalingEventDeduplication() throws Exception { + createFirstScalingEvent(); + + // The metric changed, but parallelism is not changed. + final Instant updateTime = createTime.plusSeconds(1); + jdbcEventInteractor.setClock(Clock.fixed(updateTime, ZoneId.systemDefault())); + + Map<JobVertexID, ScalingSummary> newScalingSummaries = + generateScalingSummaries(currentParallelism, 2, 11.1d, metricCurrent); + eventHandler.handleScalingEvent( + ctx, + newScalingSummaries, + AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED, + interval); + jdbcEventInteractor.assertCounters(2, 1, 1); + + assertThat( + jdbcEventInteractor.queryEvents( + ctx.getJobKey().toString(), + AutoScalerEventHandler.SCALING_REPORT_REASON)) + .as( + "The event should be deduplicated when parallelism is not changed and within the interval.") + .singleElement() + .satisfies( + event -> { + assertScalingEvent(event, createTime, updateTime, 2); + // Metric is changed. + assertScalingReport( + event.getMessage(), + jobVertex, + currentParallelism, + newParallelism, + 11.1d, + metricCurrent, + 11.1d); + }); + } + + /** We should create a new event after the interval. */ + @Test + void testScalingEventNotWithinInterval() throws Exception { + createFirstScalingEvent(); + + // The parallelism is not changed, but the old event is too early. + final Instant newCreateTime = createTime.plusSeconds(Duration.ofMinutes(30).toSeconds()); + jdbcEventInteractor.setClock(Clock.fixed(newCreateTime, ZoneId.systemDefault())); + + eventHandler.handleScalingEvent( + ctx, + scalingSummaries, + AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED, + interval); + jdbcEventInteractor.assertCounters(2, 0, 2); + + assertThat( + jdbcEventInteractor.queryEvents( + ctx.getJobKey().toString(), + AutoScalerEventHandler.SCALING_REPORT_REASON)) + .as("We should create a new event when the old event is too early.") + .hasSize(2) + .satisfiesExactlyInAnyOrder( + event -> assertScalingEvent(event, createTime, createTime, 1), + event -> assertScalingEvent(event, newCreateTime, newCreateTime, 1)) + .allSatisfy( + event -> + assertScalingReport( + event.getMessage(), + jobVertex, + currentParallelism, + newParallelism, + metricAvg, + metricCurrent, + metricAvg)); + } + + /** We should create a new event when the parallelism is changed. */ + @Test + void testScalingEventWithParallelismChange() throws Exception { + createFirstScalingEvent(); + + // The parallelism is changed. + final Instant newCreateTime = createTime.plusSeconds(1); + jdbcEventInteractor.setClock(Clock.fixed(newCreateTime, ZoneId.systemDefault())); + + var secondNewParallelism = 3; + Map<JobVertexID, ScalingSummary> newScalingSummaries = + generateScalingSummaries( + currentParallelism, secondNewParallelism, metricAvg, metricCurrent); + eventHandler.handleScalingEvent( + ctx, + newScalingSummaries, + AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED, + interval); + jdbcEventInteractor.assertCounters(2, 0, 2); + + assertThat( + jdbcEventInteractor.queryEvents( + ctx.getJobKey().toString(), + AutoScalerEventHandler.SCALING_REPORT_REASON)) + .as("We should create a new event when the old event is too early.") + .hasSize(2) + .satisfiesExactlyInAnyOrder( + event -> { + assertScalingEvent(event, createTime, createTime, 1); + assertScalingReport( + event.getMessage(), + jobVertex, + currentParallelism, + newParallelism, + metricAvg, + metricCurrent, + metricAvg); + }, + event -> { + assertScalingEvent(event, newCreateTime, newCreateTime, 1); + assertScalingReport( + event.getMessage(), + jobVertex, + currentParallelism, + secondNewParallelism, + metricAvg, + metricCurrent, + metricAvg); + }); + } + + private void createFirstScalingEvent() throws Exception { + jdbcEventInteractor.assertCounters(0, 0, 0); + eventHandler.handleScalingEvent( + ctx, + scalingSummaries, + AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED, + interval); + jdbcEventInteractor.assertCounters(1, 0, 1); + + assertThat( + jdbcEventInteractor.queryEvents( + ctx.getJobKey().toString(), + AutoScalerEventHandler.SCALING_REPORT_REASON)) + .singleElement() + .satisfies( + event -> { + assertScalingEvent(event, createTime, createTime, 1); + assertScalingReport( + event.getMessage(), + jobVertex, + currentParallelism, + newParallelism, + metricAvg, + metricCurrent, + metricAvg); + }); + } + + private void assertScalingReport( + String scalingReport, + String expectedJobVertex, + int expectedCurrentParallelism, + int expectedNewParallelism, + double expectedCurrentProcessCapacity, + double expectedExpectedProcessCapacity, + double expectedTargetDataRate) { + var vertexScalingReports = AutoscalerEventUtils.parseVertexScalingReports(scalingReport); + + var expectedVertexScalingReport = new VertexScalingReport(); + expectedVertexScalingReport.setVertexId(expectedJobVertex); + expectedVertexScalingReport.setCurrentParallelism(expectedCurrentParallelism); + expectedVertexScalingReport.setNewParallelism(expectedNewParallelism); + expectedVertexScalingReport.setCurrentProcessCapacity(expectedCurrentProcessCapacity); + expectedVertexScalingReport.setExpectedProcessCapacity(expectedExpectedProcessCapacity); + expectedVertexScalingReport.setTargetDataRate(expectedTargetDataRate); + assertThat(vertexScalingReports).singleElement().isEqualTo(expectedVertexScalingReport); + } + + @Nonnull + private Map<JobVertexID, ScalingSummary> generateScalingSummaries( + int currentParallelism, int newParallelism, double metricAvg, double metricCurrent) { + var jobVertexID = JobVertexID.fromHexString(jobVertex); + var evaluatedScalingMetric = new EvaluatedScalingMetric(); + evaluatedScalingMetric.setAverage(metricAvg); + evaluatedScalingMetric.setCurrent(metricCurrent); + return Map.of( + jobVertexID, + new ScalingSummary( + currentParallelism, + newParallelism, + Map.of( + ScalingMetric.TRUE_PROCESSING_RATE, + evaluatedScalingMetric, + ScalingMetric.EXPECTED_PROCESSING_RATE, + evaluatedScalingMetric, + ScalingMetric.TARGET_DATA_RATE, + evaluatedScalingMetric))); + } + + private void assertEvent( + AutoScalerEvent event, + Instant expectedCreateTime, + Instant expectedUpdateTime, + String expectedReason, + String expectedMessage, + int expectedCount) { + assertThat(event.getCreateTime()).isEqualTo(expectedCreateTime); + assertThat(event.getUpdateTime()).isEqualTo(expectedUpdateTime); + assertThat(event.getJobKey()).isEqualTo(ctx.getJobKey().toString()); + assertThat(event.getReason()).isEqualTo(expectedReason); + assertThat(event.getEventType()).isEqualTo(AutoScalerEventHandler.Type.Normal.toString()); + assertThat(event.getMessage()).isEqualTo(expectedMessage); + assertThat(event.getCount()).isEqualTo(expectedCount); + } + + private void assertScalingEvent( + AutoScalerEvent event, + Instant expectedCreateTime, + Instant expectedUpdateTime, + int expectedCount) { + assertThat(event.getCreateTime()).isEqualTo(expectedCreateTime); + assertThat(event.getUpdateTime()).isEqualTo(expectedUpdateTime); + assertThat(event.getJobKey()).isEqualTo(ctx.getJobKey().toString()); + assertThat(event.getReason()).isEqualTo(AutoScalerEventHandler.SCALING_REPORT_REASON); + assertThat(event.getEventType()).isEqualTo(AutoScalerEventHandler.Type.Normal.toString()); + assertThat(event.getCount()).isEqualTo(expectedCount); + } +} + +/** Test {@link JdbcAutoScalerEventHandler} via Derby. */ +class DerbyJdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase + implements DerbyTestBase {} + +/** Test {@link JdbcAutoScalerEventHandler} via MySQL 5.6.x. */ +class MySQL56JdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase + implements MySQL56TestBase {} + +/** Test {@link JdbcAutoScalerEventHandler} via MySQL 5.7.x. */ +class MySQL57JdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase + implements MySQL57TestBase {} + +/** Test {@link JdbcAutoScalerEventHandler} via MySQL 8.x. */ +class MySQL8JdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase + implements MySQL8TestBase {} + +/** Test {@link JdbcAutoScalerEventHandler} via Postgre SQL. */ +class PostgreSQLJdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase + implements PostgreSQLTestBase {} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java new file mode 100644 index 00000000..2fe69201 --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.event; + +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest; +import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase; +import org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase; + +import org.junit.jupiter.api.Test; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** The abstract IT case for {@link JdbcEventInteractor}. */ +abstract class AbstractJdbcEventInteractorITCase implements DatabaseTest { + + @Test + void testAllOperations() throws Exception { + var jobKey = "jobKey"; + var reason = "ScalingReport"; + var message = "Expected event message."; + var eventKey = Integer.toString(34567); + + // The datetime precision is seconds in MySQL by default. + var createTime = Instant.now().truncatedTo(ChronoUnit.SECONDS); + try (var conn = getConnection()) { + var jdbcEventInteractor = new JdbcEventInteractor(conn); + jdbcEventInteractor.setClock(Clock.fixed(createTime, ZoneId.systemDefault())); + + jdbcEventInteractor.createEvent( + jobKey, reason, AutoScalerEventHandler.Type.Normal, message, eventKey); + var firstEventOptional = jdbcEventInteractor.queryLatestEvent(jobKey, reason, eventKey); + assertThat(firstEventOptional).isPresent(); + assertEvent( + firstEventOptional.get(), + createTime, + createTime, + jobKey, + reason, + message, + 1, + eventKey); + + // The create time is changed for the second event. + var secondCreateTime = createTime.plusSeconds(5); + jdbcEventInteractor.setClock(Clock.fixed(secondCreateTime, ZoneId.systemDefault())); + jdbcEventInteractor.createEvent( + jobKey, reason, AutoScalerEventHandler.Type.Normal, message + 2, eventKey); + // The latest event should be the second event. + var secondEventOptional = + jdbcEventInteractor.queryLatestEvent(jobKey, reason, eventKey); + assertThat(secondEventOptional).isPresent(); + var secondEvent = secondEventOptional.get(); + assertEvent( + secondEvent, + secondCreateTime, + secondCreateTime, + jobKey, + reason, + message + 2, + 1, + eventKey); + + // Update event + var updateTime = secondCreateTime.plusSeconds(3); + jdbcEventInteractor.setClock(Clock.fixed(updateTime, ZoneId.systemDefault())); + jdbcEventInteractor.updateEvent( + secondEvent.getId(), secondEvent.getMessage() + 3, secondEvent.getCount() + 1); + + var updatedEventOptional = + jdbcEventInteractor.queryLatestEvent(jobKey, reason, eventKey); + assertThat(updatedEventOptional).isPresent(); + var updatedEvent = updatedEventOptional.get(); + assertEvent( + updatedEvent, + secondCreateTime, + updateTime, + jobKey, + reason, + secondEvent.getMessage() + 3, + 2, + eventKey); + } + } + + private void assertEvent( + AutoScalerEvent event, + Instant expectedCreateTime, + Instant expectedUpdateTime, + String expectedJobKey, + String expectedReason, + String expectedMessage, + int expectedCount, + String expectedEventKey) { + assertThat(event.getCreateTime()).isEqualTo(expectedCreateTime); + assertThat(event.getUpdateTime()).isEqualTo(expectedUpdateTime); + assertThat(event.getJobKey()).isEqualTo(expectedJobKey); + assertThat(event.getReason()).isEqualTo(expectedReason); + assertThat(event.getEventType()).isEqualTo(AutoScalerEventHandler.Type.Normal.toString()); + assertThat(event.getMessage()).isEqualTo(expectedMessage); + assertThat(event.getCount()).isEqualTo(expectedCount); + assertThat(event.getEventKey()).isEqualTo(expectedEventKey); + } +} + +/** Test {@link JdbcEventInteractor} via Derby. */ +class DerbyJdbcEventInteractorITCase extends AbstractJdbcEventInteractorITCase + implements DerbyTestBase {} + +/** Test {@link JdbcEventInteractor} via MySQL 5.6.x. */ +class MySQL56JdbcEventInteractorITCase extends AbstractJdbcEventInteractorITCase + implements MySQL56TestBase {} + +/** Test {@link JdbcEventInteractor} via MySQL 5.7.x. */ +class MySQL57JdbcEventInteractorITCase extends AbstractJdbcEventInteractorITCase + implements MySQL57TestBase {} + +/** Test {@link JdbcEventInteractor} via MySQL 8.x. */ +class MySQL8JdbcEventInteractorITCase extends AbstractJdbcEventInteractorITCase + implements MySQL8TestBase {} + +/** Test {@link JdbcEventInteractor} via Postgre SQL. */ +class PostgreSQLJdbcEventInteractorITCase extends AbstractJdbcEventInteractorITCase + implements PostgreSQLTestBase {} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java new file mode 100644 index 00000000..664a28bd --- /dev/null +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.jdbc.event; + +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; + +import java.sql.Connection; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Countable {@link JdbcEventInteractor}. */ +class CountableJdbcEventInteractor extends JdbcEventInteractor { + + private final AtomicLong queryCounter; + private final AtomicLong createCounter; + private final AtomicLong updateCounter; + + public CountableJdbcEventInteractor(Connection conn) { + super(conn); + queryCounter = new AtomicLong(); + createCounter = new AtomicLong(); + updateCounter = new AtomicLong(); + } + + @Override + public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason, String eventKey) + throws Exception { + queryCounter.incrementAndGet(); + return super.queryLatestEvent(jobKey, reason, eventKey); + } + + @Override + public void createEvent( + String jobKey, + String reason, + AutoScalerEventHandler.Type type, + String message, + String eventKey) + throws Exception { + createCounter.incrementAndGet(); + super.createEvent(jobKey, reason, type, message, eventKey); + } + + @Override + public void updateEvent(long id, String message, int eventCount) throws Exception { + updateCounter.incrementAndGet(); + super.updateEvent(id, message, eventCount); + } + + public void assertCounters( + long expectedQueryCounter, long expectedUpdateCounter, long expectedCreateCounter) { + assertThat(queryCounter).hasValue(expectedQueryCounter); + assertThat(updateCounter).hasValue(expectedUpdateCounter); + assertThat(createCounter).hasValue(expectedCreateCounter); + } +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java index 0c78d978..42566d96 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java @@ -30,7 +30,8 @@ import java.util.List; /** The extension of Derby. */ public class DerbyExtension implements BeforeAllCallback, AfterAllCallback, AfterEachCallback { - private static final List<String> TABLES = List.of("t_flink_autoscaler_state_store"); + private static final List<String> TABLES = + List.of("t_flink_autoscaler_state_store", "t_flink_autoscaler_event_handler"); private static final String JDBC_URL = "jdbc:derby:memory:test"; public Connection getConnection() throws Exception { @@ -52,12 +53,36 @@ public class DerbyExtension implements BeforeAllCallback, AfterAllCallback, Afte + " PRIMARY KEY (id)\n" + ")\n"; - var createIndex = + var createStateStoreIndex = "CREATE UNIQUE INDEX un_job_state_type_inx ON t_flink_autoscaler_state_store (job_key, state_type)"; + + var eventHandlerDDL = + "CREATE TABLE t_flink_autoscaler_event_handler\n" + + "(\n" + + " id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),\n" + + " create_time TIMESTAMP NOT NULL,\n" + + " update_time TIMESTAMP NOT NULL,\n" + + " job_key VARCHAR(191) NOT NULL,\n" + + " reason VARCHAR(500) NOT NULL,\n" + + " event_type VARCHAR(100) NOT NULL,\n" + + " message CLOB NOT NULL,\n" + + " event_count INTEGER NOT NULL,\n" + + " event_key VARCHAR(100) NOT NULL,\n" + + " PRIMARY KEY (id)\n" + + ")\n"; + + var eventKeyIndex = + "CREATE INDEX job_key_reason_event_key_idx ON t_flink_autoscaler_event_handler (job_key, reason, event_key)"; + var jobKeyReasonCreateTimeIndex = + "CREATE INDEX job_key_reason_create_time_idx ON t_flink_autoscaler_event_handler (job_key, reason, create_time)"; + try (var conn = getConnection(); var st = conn.createStatement()) { st.execute(stateStoreDDL); - st.execute(createIndex); + st.execute(createStateStoreIndex); + st.execute(eventHandlerDDL); + st.execute(eventKeyIndex); + st.execute(jobKeyReasonCreateTimeIndex); } } diff --git a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql index 2fcd468c..d2e39ad0 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql +++ b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql @@ -26,3 +26,18 @@ create table `t_flink_autoscaler_state_store` unique key `un_job_state_type_inx` (`job_key`,`state_type`) using btree ) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci; +create table `t_flink_autoscaler_event_handler` +( + `id` bigint not null auto_increment, + `create_time` datetime not null comment 'The create time', + `update_time` datetime not null comment 'The update time', + `job_key` varchar(191) not null comment 'The job key', + `reason` varchar(191) not null comment 'The event reason, such as: ScalingReport, IneffectiveScaling and AutoscalerError, etc.', + `event_type` varchar(100) not null comment 'The event type, such as: Normal, Warning.', + `message` longtext not null comment 'The event message.', + `event_count` int not null comment 'The count of current event.', + `event_key` varchar(100) not null comment 'The event key is used for event deduplication.', + primary key (`id`) using btree, + INDEX `job_key_reason_event_key_idx` (`job_key`, `reason`, `event_key`), + INDEX `job_key_reason_create_time_idx` (`job_key`, `reason`, `create_time`) +) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci; diff --git a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql index 0f405d10..733401ce 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql +++ b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql @@ -25,3 +25,19 @@ CREATE TABLE t_flink_autoscaler_state_store PRIMARY KEY (id), UNIQUE (job_key, state_type) ); + +CREATE TABLE t_flink_autoscaler_event_handler +( + id BIGINT NOT NULL PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + create_time TIMESTAMP NOT NULL, + update_time TIMESTAMP NOT NULL, + job_key VARCHAR(191) NOT NULL, + reason VARCHAR(500) NOT NULL, + event_type VARCHAR(100) NOT NULL, + message TEXT NOT NULL, + event_count INTEGER NOT NULL, + event_key VARCHAR(100) NOT NULL +); + +CREATE INDEX job_key_reason_event_key_idx ON t_flink_autoscaler_event_handler (job_key, reason, event_key); +CREATE INDEX job_key_reason_create_time_idx ON t_flink_autoscaler_event_handler (job_key, reason, create_time);