This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a426b2f [FLINK-36039][autoscaler] Support clean historical event 
handler records in JDBC event handler (#865)
6a426b2f is described below

commit 6a426b2ff60331b89d67371279f400f8761bf1f3
Author: Yuepeng Pan <flin...@126.com>
AuthorDate: Mon Aug 26 11:29:07 2024 +0800

    [FLINK-36039][autoscaler] Support clean historical event handler records in 
JDBC event handler (#865)
    
    ---------
    
    Co-authored-by: Rui Fan <fan...@apache.org>
---
 .../autoscaler_standalone_configuration.html       |   6 +
 .../jdbc/event/JdbcAutoScalerEventHandler.java     |  74 +++++++-
 .../autoscaler/jdbc/event/JdbcEventInteractor.java |  26 +++
 .../AbstractJdbcAutoscalerEventHandlerITCase.java  | 188 +++++++++++++++++++--
 .../jdbc/event/CountableJdbcEventInteractor.java   |  14 ++
 .../standalone/AutoscalerEventHandlerFactory.java  |   4 +-
 .../standalone/StandaloneAutoscalerExecutor.java   |   1 +
 .../config/AutoscalerStandaloneOptions.java        |   8 +
 .../autoscaler/event/AutoScalerEventHandler.java   |   7 +-
 9 files changed, 308 insertions(+), 20 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html 
b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
index 7d724b7f..45b5184f 100644
--- a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
+++ b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html
@@ -38,6 +38,12 @@
             <td>Integer</td>
             <td>The port of flink cluster when the flink-cluster fetcher is 
used.</td>
         </tr>
+        <tr>
+            <td><h5>autoscaler.standalone.jdbc.event-handler.ttl</h5></td>
+            <td style="word-wrap: break-word;">90 d</td>
+            <td>Duration</td>
+            <td>The time to live based on create time for the JDBC event 
handler records. When the config is set as '0', the ttl strategy for the 
records would be disabled.</td>
+        </tr>
         <tr>
             <td><h5>autoscaler.standalone.jdbc.password-env-variable</h5></td>
             <td style="word-wrap: break-word;">"JDBC_PWD"</td>
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
index c42692a6..5b455809 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java
@@ -18,18 +18,27 @@
 package org.apache.flink.autoscaler.jdbc.event;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.ScalingSummary;
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Nullable;
 
+import java.sql.Timestamp;
 import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The event handler which persists its event in JDBC related database.
@@ -38,13 +47,34 @@ import java.util.Objects;
  * @param <Context> The job autoscaler context.
  */
 @Experimental
+@Slf4j
 public class JdbcAutoScalerEventHandler<KEY, Context extends 
JobAutoScalerContext<KEY>>
         implements AutoScalerEventHandler<KEY, Context> {
 
     private final JdbcEventInteractor jdbcEventInteractor;
+    private final Duration eventHandlerTtl;
+    @Nullable private final ScheduledExecutorService 
scheduledEventHandlerCleaner;
 
-    public JdbcAutoScalerEventHandler(JdbcEventInteractor jdbcEventInteractor) 
{
+    public JdbcAutoScalerEventHandler(
+            JdbcEventInteractor jdbcEventInteractor, Duration eventHandlerTtl) 
{
         this.jdbcEventInteractor = jdbcEventInteractor;
+        this.eventHandlerTtl = Preconditions.checkNotNull(eventHandlerTtl);
+
+        if (eventHandlerTtl.toMillis() <= 0) {
+            this.scheduledEventHandlerCleaner = null;
+            return;
+        }
+        this.scheduledEventHandlerCleaner =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ThreadFactoryBuilder()
+                                
.setNameFormat("jdbc-autoscaler-events-cleaner")
+                                .setDaemon(true)
+                                .build());
+        this.scheduledEventHandlerCleaner.scheduleAtFixedRate(
+                this::cleanExpiredEvents,
+                Duration.ofDays(1).toMillis(),
+                Duration.ofDays(1).toMillis(),
+                TimeUnit.MILLISECONDS);
     }
 
     @SneakyThrows
@@ -104,6 +134,48 @@ public class JdbcAutoScalerEventHandler<KEY, Context 
extends JobAutoScalerContex
         }
     }
 
+    @Override
+    public void close() {
+        if (Objects.nonNull(scheduledEventHandlerCleaner)
+                && !scheduledEventHandlerCleaner.isShutdown()) {
+            scheduledEventHandlerCleaner.shutdownNow();
+        }
+    }
+
+    @VisibleForTesting
+    void cleanExpiredEvents() {
+        final var batchSize = 2000;
+        final var sleepMs = 1000;
+
+        var date =
+                Timestamp.from(
+                        jdbcEventInteractor
+                                .getCurrentInstant()
+                                .minusMillis(eventHandlerTtl.toMillis()));
+        try {
+            var deletedTotalCount = 0L;
+            while (true) {
+                Long minId = 
jdbcEventInteractor.queryMinEventIdByCreateTime(date);
+                if (Objects.isNull(minId)) {
+                    log.info(
+                            "Deleted expired {} event handler records 
successfully",
+                            deletedTotalCount);
+                    break;
+                }
+
+                for (long startId = minId;
+                        
jdbcEventInteractor.deleteExpiredEventsByIdRangeAndDate(
+                                        startId, startId + batchSize, date)
+                                == batchSize;
+                        startId += batchSize) {
+                    Thread.sleep(sleepMs);
+                }
+            }
+        } catch (Exception e) {
+            log.error("Error in cleaning expired event handler records.", e);
+        }
+    }
+
     /**
      * @return True means the existing event is still in the interval duration 
we can update it.
      *     Otherwise, it's too early, we should create a new one instead of 
updating it.
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
index 5e2d6166..ce3fc48d 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -152,4 +153,29 @@ public class JdbcEventInteractor {
     void setClock(@Nonnull Clock clock) {
         this.clock = Preconditions.checkNotNull(clock);
     }
+
+    @Nullable
+    Long queryMinEventIdByCreateTime(Timestamp timestamp) throws Exception {
+        var sql =
+                "SELECT id from t_flink_autoscaler_event_handler "
+                        + "           where id = (SELECT id FROM 
t_flink_autoscaler_event_handler order by id asc limit 1) "
+                        + "           and create_time < ?";
+        try (var pstmt = conn.prepareStatement(sql)) {
+            pstmt.setObject(1, timestamp);
+            ResultSet resultSet = pstmt.executeQuery();
+            return resultSet.next() ? resultSet.getLong(1) : null;
+        }
+    }
+
+    int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, 
Timestamp timestamp)
+            throws Exception {
+        var query =
+                "delete from t_flink_autoscaler_event_handler where id >= ? 
and id < ? and create_time < ?";
+        try (var pstmt = conn.prepareStatement(query)) {
+            pstmt.setObject(1, startId);
+            pstmt.setObject(2, endId);
+            pstmt.setObject(3, timestamp);
+            return pstmt.executeUpdate();
+        }
+    }
 }
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
index b08030f1..f5278171 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java
@@ -33,19 +33,30 @@ import 
org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import javax.annotation.Nonnull;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Timestamp;
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+import static 
org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_REPORT_REASON;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** The abstract IT case for {@link JdbcAutoScalerEventHandler}. */
@@ -60,6 +71,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase 
implements DatabaseTest
     private final Instant createTime = 
Instant.now().truncatedTo(ChronoUnit.SECONDS);
     private final Map<JobVertexID, ScalingSummary> scalingSummaries =
             generateScalingSummaries(currentParallelism, newParallelism, 
metricAvg, metricCurrent);
+    private final Clock defaultClock = Clock.fixed(createTime, 
ZoneId.systemDefault());
 
     private CountableJdbcEventInteractor jdbcEventInteractor;
     private JdbcAutoScalerEventHandler<JobID, JobAutoScalerContext<JobID>> 
eventHandler;
@@ -68,11 +80,16 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase 
implements DatabaseTest
     @BeforeEach
     void beforeEach() throws Exception {
         jdbcEventInteractor = new 
CountableJdbcEventInteractor(getConnection());
-        jdbcEventInteractor.setClock(Clock.fixed(createTime, 
ZoneId.systemDefault()));
-        eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor);
+        jdbcEventInteractor.setClock(defaultClock);
+        eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, 
Duration.ZERO);
         ctx = createDefaultJobAutoScalerContext();
     }
 
+    @AfterEach
+    void tearDown() {
+        eventHandler.close();
+    }
+
     /** All events shouldn't be deduplicated when interval is null. */
     @Test
     void testEventWithoutInterval() throws Exception {
@@ -254,8 +271,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase 
implements DatabaseTest
 
         assertThat(
                         jdbcEventInteractor.queryEvents(
-                                ctx.getJobKey().toString(),
-                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                                ctx.getJobKey().toString(), 
SCALING_REPORT_REASON))
                 .singleElement()
                 .satisfies(
                         event -> {
@@ -283,8 +299,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase 
implements DatabaseTest
 
         assertThat(
                         jdbcEventInteractor.queryEvents(
-                                ctx.getJobKey().toString(),
-                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                                ctx.getJobKey().toString(), 
SCALING_REPORT_REASON))
                 .as("All scaling events shouldn't be deduplicated when scaling 
happens.")
                 .hasSize(2)
                 .satisfiesExactlyInAnyOrder(
@@ -322,8 +337,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase 
implements DatabaseTest
 
         assertThat(
                         jdbcEventInteractor.queryEvents(
-                                ctx.getJobKey().toString(),
-                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                                ctx.getJobKey().toString(), 
SCALING_REPORT_REASON))
                 .as(
                         "The event should be deduplicated when parallelism is 
not changed and within the interval.")
                 .singleElement()
@@ -360,8 +374,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase 
implements DatabaseTest
 
         assertThat(
                         jdbcEventInteractor.queryEvents(
-                                ctx.getJobKey().toString(),
-                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                                ctx.getJobKey().toString(), 
SCALING_REPORT_REASON))
                 .as("We should create a new event when the old event is too 
early.")
                 .hasSize(2)
                 .satisfiesExactlyInAnyOrder(
@@ -401,8 +414,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase 
implements DatabaseTest
 
         assertThat(
                         jdbcEventInteractor.queryEvents(
-                                ctx.getJobKey().toString(),
-                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                                ctx.getJobKey().toString(), 
SCALING_REPORT_REASON))
                 .as("We should create a new event when the old event is too 
early.")
                 .hasSize(2)
                 .satisfiesExactlyInAnyOrder(
@@ -430,6 +442,131 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase 
implements DatabaseTest
                         });
     }
 
+    @Test
+    void testDeleteCounterWhenIdNotConsecutive() throws Exception {
+        // Create 2 events.
+        final Duration ttl = Duration.ofDays(1L);
+        eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, 
ttl);
+        initTestingEventHandlerRecords(2);
+
+        // Simulate ids are not consecutive.
+        var events =
+                jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), 
SCALING_REPORT_REASON);
+        assertThat(events).hasSize(2);
+        var maxId =
+                events.stream()
+                        .map(AutoScalerEvent::getId)
+                        .max(Comparable::compareTo)
+                        .orElseThrow();
+
+        try (Connection connection = getConnection();
+                PreparedStatement ps =
+                        connection.prepareStatement(
+                                "update t_flink_autoscaler_event_handler set 
id = ? where id = ?")) {
+            ps.setLong(1, maxId + 1_000_000);
+            ps.setLong(2, maxId);
+            ps.execute();
+        }
+
+        // Reset the clock to clean all expired data.
+        jdbcEventInteractor.setClock(
+                Clock.fixed(
+                        jdbcEventInteractor
+                                .getCurrentInstant()
+                                .plus(ttl)
+                                .plus(Duration.ofMillis(1)),
+                        ZoneId.systemDefault()));
+
+        eventHandler.cleanExpiredEvents();
+        jdbcEventInteractor.assertDeleteExpiredCounter(2L);
+    }
+
+    private static Stream<Arguments> getExpiredEventHandlersCaseMatrix() {
+        return Stream.of(
+                Arguments.of(false, 128, Duration.ofMinutes(2), 10),
+                Arguments.of(true, 256, Duration.ofMinutes(2), 0),
+                Arguments.of(true, 1024 * 9, Duration.ofMinutes(2), 12),
+                Arguments.of(true, 1024 * 9, Duration.ofMinutes(2), 0),
+                Arguments.of(true, 512, Duration.ofMinutes(100), 3),
+                Arguments.of(false, 64, Duration.ofMinutes(100), 0),
+                Arguments.of(true, 1024 * 9, Duration.ofMinutes(100), 64),
+                Arguments.of(false, 1024 * 9, Duration.ofMinutes(100), 0),
+                Arguments.of(false, 0, Duration.ofMinutes(100), 128),
+                Arguments.of(false, 0, Duration.ofMinutes(100), 0));
+    }
+
+    @MethodSource("getExpiredEventHandlersCaseMatrix")
+    @ParameterizedTest(
+            name =
+                    "tryIdNotSequential:{0}, expiredRecordsNum: {1}, 
eventHandlerTtl: {2}, unexpiredRecordsNum: {3}")
+    void testCleanExpiredEvents(
+            boolean tryIdNotSequential,
+            int expiredRecordsNum,
+            Duration eventHandlerTtl,
+            int unexpiredRecordsNum)
+            throws Exception {
+        eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, 
eventHandlerTtl);
+
+        // Init the expired records.
+        initTestingEventHandlerRecords(expiredRecordsNum);
+        if (tryIdNotSequential) {
+            tryDeleteOneRecord(expiredRecordsNum);
+        }
+        var expiredInstant = jdbcEventInteractor.getCurrentInstant();
+
+        // Init the unexpired records.
+        initTestingEventHandlerRecords(unexpiredRecordsNum);
+
+        // Reset the clock to clean all expired data.
+        jdbcEventInteractor.setClock(
+                Clock.fixed(
+                        
expiredInstant.plus(eventHandlerTtl).plus(Duration.ofMillis(1)),
+                        ZoneId.systemDefault()));
+
+        eventHandler.cleanExpiredEvents();
+
+        try (Connection connection = getConnection();
+                PreparedStatement ps =
+                        connection.prepareStatement(
+                                "select count(1) from 
t_flink_autoscaler_event_handler");
+                ResultSet countResultSet = ps.executeQuery()) {
+            countResultSet.next();
+            
assertThat(countResultSet.getInt(1)).isEqualTo(unexpiredRecordsNum);
+        }
+    }
+
+    private void tryDeleteOneRecord(int expiredRecordsNum) throws Exception {
+        // To simulate non-sequential IDs in expired records.
+        Timestamp date = Timestamp.from(createTime);
+        Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date);
+        if (minId == null) {
+            return;
+        }
+        try (Connection connection = getConnection();
+                PreparedStatement ps =
+                        connection.prepareStatement(
+                                "delete from t_flink_autoscaler_event_handler 
where id = ?")) {
+            ps.setObject(1, (minId + expiredRecordsNum) / 2);
+            ps.execute();
+        }
+    }
+
+    private void initTestingEventHandlerRecords(int recordsNum) {
+        for (int i = 0; i < recordsNum; i++) {
+            jdbcEventInteractor.setClock(
+                    Clock.fixed(
+                            
jdbcEventInteractor.getCurrentInstant().plusSeconds(1),
+                            ZoneId.systemDefault()));
+            eventHandler.handleEvent(
+                    ctx,
+                    AutoScalerEventHandler.Type.Normal,
+                    SCALING_REPORT_REASON,
+                    "message-" + i,
+                    "messageKey-" + i,
+                    null);
+        }
+    }
+
     private void createFirstScalingEvent() throws Exception {
         jdbcEventInteractor.assertCounters(0, 0, 0);
         eventHandler.handleScalingEvent(
@@ -441,8 +578,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase 
implements DatabaseTest
 
         assertThat(
                         jdbcEventInteractor.queryEvents(
-                                ctx.getJobKey().toString(),
-                                AutoScalerEventHandler.SCALING_REPORT_REASON))
+                                ctx.getJobKey().toString(), 
SCALING_REPORT_REASON))
                 .singleElement()
                 .satisfies(
                         event -> {
@@ -523,7 +659,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase 
implements DatabaseTest
         assertThat(event.getCreateTime()).isEqualTo(expectedCreateTime);
         assertThat(event.getUpdateTime()).isEqualTo(expectedUpdateTime);
         assertThat(event.getJobKey()).isEqualTo(ctx.getJobKey().toString());
-        
assertThat(event.getReason()).isEqualTo(AutoScalerEventHandler.SCALING_REPORT_REASON);
+        assertThat(event.getReason()).isEqualTo(SCALING_REPORT_REASON);
         
assertThat(event.getEventType()).isEqualTo(AutoScalerEventHandler.Type.Normal.toString());
         assertThat(event.getCount()).isEqualTo(expectedCount);
     }
@@ -531,7 +667,20 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase 
implements DatabaseTest
 
 /** Test {@link JdbcAutoScalerEventHandler} via Derby. */
 class DerbyJdbcAutoscalerEventHandlerITCase extends 
AbstractJdbcAutoscalerEventHandlerITCase
-        implements DerbyTestBase {}
+        implements DerbyTestBase {
+
+    @Disabled("Disabled due to the 'LIMIT' clause is not supported in Derby.")
+    @Override
+    void testCleanExpiredEvents(
+            boolean tryIdNotSequential,
+            int expiredRecordsNum,
+            Duration eventHandlerTtl,
+            int unexpiredRecordsNum) {}
+
+    @Disabled("Disabled due to the 'LIMIT' clause is not supported in Derby.")
+    @Override
+    void testDeleteCounterWhenIdNotConsecutive() {}
+}
 
 /** Test {@link JdbcAutoScalerEventHandler} via MySQL 5.6.x. */
 class MySQL56JdbcAutoscalerEventHandlerITCase extends 
AbstractJdbcAutoscalerEventHandlerITCase
@@ -547,4 +696,9 @@ class MySQL8JdbcAutoscalerEventHandlerITCase extends 
AbstractJdbcAutoscalerEvent
 
 /** Test {@link JdbcAutoScalerEventHandler} via Postgre SQL. */
 class PostgreSQLJdbcAutoscalerEventHandlerITCase extends 
AbstractJdbcAutoscalerEventHandlerITCase
-        implements PostgreSQLTestBase {}
+        implements PostgreSQLTestBase {
+
+    @Disabled("Disabled due to the column 'id' can only be updated to 
DEFAULT.")
+    @Override
+    void testDeleteCounterWhenIdNotConsecutive() {}
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
index 664a28bd..39ccd76c 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java
@@ -20,6 +20,7 @@ package org.apache.flink.autoscaler.jdbc.event;
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 
 import java.sql.Connection;
+import java.sql.Timestamp;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -31,12 +32,14 @@ class CountableJdbcEventInteractor extends 
JdbcEventInteractor {
     private final AtomicLong queryCounter;
     private final AtomicLong createCounter;
     private final AtomicLong updateCounter;
+    private final AtomicLong deleteExpiredCounter;
 
     public CountableJdbcEventInteractor(Connection conn) {
         super(conn);
         queryCounter = new AtomicLong();
         createCounter = new AtomicLong();
         updateCounter = new AtomicLong();
+        deleteExpiredCounter = new AtomicLong();
     }
 
     @Override
@@ -64,10 +67,21 @@ class CountableJdbcEventInteractor extends 
JdbcEventInteractor {
         super.updateEvent(id, message, eventCount);
     }
 
+    @Override
+    int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, 
Timestamp timestamp)
+            throws Exception {
+        deleteExpiredCounter.incrementAndGet();
+        return super.deleteExpiredEventsByIdRangeAndDate(startId, endId, 
timestamp);
+    }
+
     public void assertCounters(
             long expectedQueryCounter, long expectedUpdateCounter, long 
expectedCreateCounter) {
         assertThat(queryCounter).hasValue(expectedQueryCounter);
         assertThat(updateCounter).hasValue(expectedUpdateCounter);
         assertThat(createCounter).hasValue(expectedCreateCounter);
     }
+
+    public void assertDeleteExpiredCounter(long expectedCounter) {
+        assertThat(deleteExpiredCounter).hasValue(expectedCounter);
+    }
 }
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
index 5d063caf..2f58ca6e 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.configuration.description.InlineElement;
 import static 
org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.JDBC;
 import static 
org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.LOGGING;
 import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.EVENT_HANDLER_TYPE;
+import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_EVENT_HANDLER_TTL;
 import static org.apache.flink.configuration.description.TextElement.text;
 
 /** The factory of {@link AutoScalerEventHandler}. */
@@ -73,6 +74,7 @@ public class AutoscalerEventHandlerFactory {
             AutoScalerEventHandler<KEY, Context> 
createJdbcEventHandler(Configuration conf)
                     throws Exception {
         var conn = HikariJDBCUtil.getConnection(conf);
-        return new JdbcAutoScalerEventHandler<>(new JdbcEventInteractor(conn));
+        return new JdbcAutoScalerEventHandler<>(
+                new JdbcEventInteractor(conn), 
conf.get(JDBC_EVENT_HANDLER_TTL));
     }
 }
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
index a075b718..87875ec1 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
@@ -113,6 +113,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context 
extends JobAutoScalerCont
     public void close() {
         scheduledExecutorService.shutdownNow();
         scalingThreadPool.shutdownNow();
+        eventHandler.close();
     }
 
     /**
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
index edc30549..15b9b6a0 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
@@ -121,4 +121,12 @@ public class AutoscalerStandaloneOptions {
                                             code(EVENT_HANDLER_TYPE.key()),
                                             code("JDBC"))
                                     .build());
+
+    public static final ConfigOption<Duration> JDBC_EVENT_HANDLER_TTL =
+            autoscalerStandaloneConfig("jdbc.event-handler.ttl")
+                    .durationType()
+                    .defaultValue(Duration.ofDays(90))
+                    .withDescription(
+                            "The time to live based on create time for the 
JDBC event handler records. "
+                                    + "When the config is set as '0', the ttl 
strategy for the records would be disabled.");
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
index afca8b8e..0fcf50b7 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 
 import javax.annotation.Nullable;
 
+import java.io.Closeable;
 import java.time.Duration;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -42,7 +43,8 @@ import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_
  * @param <Context> Instance of JobAutoScalerContext.
  */
 @Experimental
-public interface AutoScalerEventHandler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
+public interface AutoScalerEventHandler<KEY, Context extends 
JobAutoScalerContext<KEY>>
+        extends Closeable {
     String SCALING_SUMMARY_ENTRY =
             "{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f 
-> %.2f | Target data rate %.2f}";
     String SCALING_EXECUTION_DISABLED_REASON = "%s:%s, recommended parallelism 
change:";
@@ -99,6 +101,9 @@ public interface AutoScalerEventHandler<KEY, Context extends 
JobAutoScalerContex
                 interval);
     }
 
+    /** Close the related resource. */
+    default void close() {}
+
     static String scalingReport(Map<JobVertexID, ScalingSummary> 
scalingSummaries, String message) {
         StringBuilder sb = new StringBuilder(message);
         scalingSummaries.forEach(

Reply via email to