This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 6a426b2f [FLINK-36039][autoscaler] Support clean historical event handler records in JDBC event handler (#865) 6a426b2f is described below commit 6a426b2ff60331b89d67371279f400f8761bf1f3 Author: Yuepeng Pan <flin...@126.com> AuthorDate: Mon Aug 26 11:29:07 2024 +0800 [FLINK-36039][autoscaler] Support clean historical event handler records in JDBC event handler (#865) --------- Co-authored-by: Rui Fan <fan...@apache.org> --- .../autoscaler_standalone_configuration.html | 6 + .../jdbc/event/JdbcAutoScalerEventHandler.java | 74 +++++++- .../autoscaler/jdbc/event/JdbcEventInteractor.java | 26 +++ .../AbstractJdbcAutoscalerEventHandlerITCase.java | 188 +++++++++++++++++++-- .../jdbc/event/CountableJdbcEventInteractor.java | 14 ++ .../standalone/AutoscalerEventHandlerFactory.java | 4 +- .../standalone/StandaloneAutoscalerExecutor.java | 1 + .../config/AutoscalerStandaloneOptions.java | 8 + .../autoscaler/event/AutoScalerEventHandler.java | 7 +- 9 files changed, 308 insertions(+), 20 deletions(-) diff --git a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html index 7d724b7f..45b5184f 100644 --- a/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html +++ b/docs/layouts/shortcodes/generated/autoscaler_standalone_configuration.html @@ -38,6 +38,12 @@ <td>Integer</td> <td>The port of flink cluster when the flink-cluster fetcher is used.</td> </tr> + <tr> + <td><h5>autoscaler.standalone.jdbc.event-handler.ttl</h5></td> + <td style="word-wrap: break-word;">90 d</td> + <td>Duration</td> + <td>The time to live based on create time for the JDBC event handler records. When the config is set as '0', the ttl strategy for the records would be disabled.</td> + </tr> <tr> <td><h5>autoscaler.standalone.jdbc.password-env-variable</h5></td> <td style="word-wrap: break-word;">"JDBC_PWD"</td> diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java index c42692a6..5b455809 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java @@ -18,18 +18,27 @@ package org.apache.flink.autoscaler.jdbc.event; import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.autoscaler.JobAutoScalerContext; import org.apache.flink.autoscaler.ScalingSummary; import org.apache.flink.autoscaler.event.AutoScalerEventHandler; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; +import java.sql.Timestamp; import java.time.Duration; import java.util.Map; import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * The event handler which persists its event in JDBC related database. @@ -38,13 +47,34 @@ import java.util.Objects; * @param <Context> The job autoscaler context. */ @Experimental +@Slf4j public class JdbcAutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> implements AutoScalerEventHandler<KEY, Context> { private final JdbcEventInteractor jdbcEventInteractor; + private final Duration eventHandlerTtl; + @Nullable private final ScheduledExecutorService scheduledEventHandlerCleaner; - public JdbcAutoScalerEventHandler(JdbcEventInteractor jdbcEventInteractor) { + public JdbcAutoScalerEventHandler( + JdbcEventInteractor jdbcEventInteractor, Duration eventHandlerTtl) { this.jdbcEventInteractor = jdbcEventInteractor; + this.eventHandlerTtl = Preconditions.checkNotNull(eventHandlerTtl); + + if (eventHandlerTtl.toMillis() <= 0) { + this.scheduledEventHandlerCleaner = null; + return; + } + this.scheduledEventHandlerCleaner = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("jdbc-autoscaler-events-cleaner") + .setDaemon(true) + .build()); + this.scheduledEventHandlerCleaner.scheduleAtFixedRate( + this::cleanExpiredEvents, + Duration.ofDays(1).toMillis(), + Duration.ofDays(1).toMillis(), + TimeUnit.MILLISECONDS); } @SneakyThrows @@ -104,6 +134,48 @@ public class JdbcAutoScalerEventHandler<KEY, Context extends JobAutoScalerContex } } + @Override + public void close() { + if (Objects.nonNull(scheduledEventHandlerCleaner) + && !scheduledEventHandlerCleaner.isShutdown()) { + scheduledEventHandlerCleaner.shutdownNow(); + } + } + + @VisibleForTesting + void cleanExpiredEvents() { + final var batchSize = 2000; + final var sleepMs = 1000; + + var date = + Timestamp.from( + jdbcEventInteractor + .getCurrentInstant() + .minusMillis(eventHandlerTtl.toMillis())); + try { + var deletedTotalCount = 0L; + while (true) { + Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date); + if (Objects.isNull(minId)) { + log.info( + "Deleted expired {} event handler records successfully", + deletedTotalCount); + break; + } + + for (long startId = minId; + jdbcEventInteractor.deleteExpiredEventsByIdRangeAndDate( + startId, startId + batchSize, date) + == batchSize; + startId += batchSize) { + Thread.sleep(sleepMs); + } + } + } catch (Exception e) { + log.error("Error in cleaning expired event handler records.", e); + } + } + /** * @return True means the existing event is still in the interval duration we can update it. * Otherwise, it's too early, we should create a new one instead of updating it. diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java index 5e2d6166..ce3fc48d 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java @@ -22,6 +22,7 @@ import org.apache.flink.autoscaler.event.AutoScalerEventHandler; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.sql.Connection; import java.sql.ResultSet; @@ -152,4 +153,29 @@ public class JdbcEventInteractor { void setClock(@Nonnull Clock clock) { this.clock = Preconditions.checkNotNull(clock); } + + @Nullable + Long queryMinEventIdByCreateTime(Timestamp timestamp) throws Exception { + var sql = + "SELECT id from t_flink_autoscaler_event_handler " + + " where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) " + + " and create_time < ?"; + try (var pstmt = conn.prepareStatement(sql)) { + pstmt.setObject(1, timestamp); + ResultSet resultSet = pstmt.executeQuery(); + return resultSet.next() ? resultSet.getLong(1) : null; + } + } + + int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp timestamp) + throws Exception { + var query = + "delete from t_flink_autoscaler_event_handler where id >= ? and id < ? and create_time < ?"; + try (var pstmt = conn.prepareStatement(query)) { + pstmt.setObject(1, startId); + pstmt.setObject(2, endId); + pstmt.setObject(3, timestamp); + return pstmt.executeUpdate(); + } + } } diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java index b08030f1..f5278171 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java @@ -33,19 +33,30 @@ import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import javax.annotation.Nonnull; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Timestamp; import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import java.time.temporal.ChronoUnit; import java.util.Map; +import java.util.stream.Stream; import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext; +import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_REPORT_REASON; import static org.assertj.core.api.Assertions.assertThat; /** The abstract IT case for {@link JdbcAutoScalerEventHandler}. */ @@ -60,6 +71,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest private final Instant createTime = Instant.now().truncatedTo(ChronoUnit.SECONDS); private final Map<JobVertexID, ScalingSummary> scalingSummaries = generateScalingSummaries(currentParallelism, newParallelism, metricAvg, metricCurrent); + private final Clock defaultClock = Clock.fixed(createTime, ZoneId.systemDefault()); private CountableJdbcEventInteractor jdbcEventInteractor; private JdbcAutoScalerEventHandler<JobID, JobAutoScalerContext<JobID>> eventHandler; @@ -68,11 +80,16 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest @BeforeEach void beforeEach() throws Exception { jdbcEventInteractor = new CountableJdbcEventInteractor(getConnection()); - jdbcEventInteractor.setClock(Clock.fixed(createTime, ZoneId.systemDefault())); - eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor); + jdbcEventInteractor.setClock(defaultClock); + eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, Duration.ZERO); ctx = createDefaultJobAutoScalerContext(); } + @AfterEach + void tearDown() { + eventHandler.close(); + } + /** All events shouldn't be deduplicated when interval is null. */ @Test void testEventWithoutInterval() throws Exception { @@ -254,8 +271,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest assertThat( jdbcEventInteractor.queryEvents( - ctx.getJobKey().toString(), - AutoScalerEventHandler.SCALING_REPORT_REASON)) + ctx.getJobKey().toString(), SCALING_REPORT_REASON)) .singleElement() .satisfies( event -> { @@ -283,8 +299,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest assertThat( jdbcEventInteractor.queryEvents( - ctx.getJobKey().toString(), - AutoScalerEventHandler.SCALING_REPORT_REASON)) + ctx.getJobKey().toString(), SCALING_REPORT_REASON)) .as("All scaling events shouldn't be deduplicated when scaling happens.") .hasSize(2) .satisfiesExactlyInAnyOrder( @@ -322,8 +337,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest assertThat( jdbcEventInteractor.queryEvents( - ctx.getJobKey().toString(), - AutoScalerEventHandler.SCALING_REPORT_REASON)) + ctx.getJobKey().toString(), SCALING_REPORT_REASON)) .as( "The event should be deduplicated when parallelism is not changed and within the interval.") .singleElement() @@ -360,8 +374,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest assertThat( jdbcEventInteractor.queryEvents( - ctx.getJobKey().toString(), - AutoScalerEventHandler.SCALING_REPORT_REASON)) + ctx.getJobKey().toString(), SCALING_REPORT_REASON)) .as("We should create a new event when the old event is too early.") .hasSize(2) .satisfiesExactlyInAnyOrder( @@ -401,8 +414,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest assertThat( jdbcEventInteractor.queryEvents( - ctx.getJobKey().toString(), - AutoScalerEventHandler.SCALING_REPORT_REASON)) + ctx.getJobKey().toString(), SCALING_REPORT_REASON)) .as("We should create a new event when the old event is too early.") .hasSize(2) .satisfiesExactlyInAnyOrder( @@ -430,6 +442,131 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest }); } + @Test + void testDeleteCounterWhenIdNotConsecutive() throws Exception { + // Create 2 events. + final Duration ttl = Duration.ofDays(1L); + eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, ttl); + initTestingEventHandlerRecords(2); + + // Simulate ids are not consecutive. + var events = + jdbcEventInteractor.queryEvents(ctx.getJobKey().toString(), SCALING_REPORT_REASON); + assertThat(events).hasSize(2); + var maxId = + events.stream() + .map(AutoScalerEvent::getId) + .max(Comparable::compareTo) + .orElseThrow(); + + try (Connection connection = getConnection(); + PreparedStatement ps = + connection.prepareStatement( + "update t_flink_autoscaler_event_handler set id = ? where id = ?")) { + ps.setLong(1, maxId + 1_000_000); + ps.setLong(2, maxId); + ps.execute(); + } + + // Reset the clock to clean all expired data. + jdbcEventInteractor.setClock( + Clock.fixed( + jdbcEventInteractor + .getCurrentInstant() + .plus(ttl) + .plus(Duration.ofMillis(1)), + ZoneId.systemDefault())); + + eventHandler.cleanExpiredEvents(); + jdbcEventInteractor.assertDeleteExpiredCounter(2L); + } + + private static Stream<Arguments> getExpiredEventHandlersCaseMatrix() { + return Stream.of( + Arguments.of(false, 128, Duration.ofMinutes(2), 10), + Arguments.of(true, 256, Duration.ofMinutes(2), 0), + Arguments.of(true, 1024 * 9, Duration.ofMinutes(2), 12), + Arguments.of(true, 1024 * 9, Duration.ofMinutes(2), 0), + Arguments.of(true, 512, Duration.ofMinutes(100), 3), + Arguments.of(false, 64, Duration.ofMinutes(100), 0), + Arguments.of(true, 1024 * 9, Duration.ofMinutes(100), 64), + Arguments.of(false, 1024 * 9, Duration.ofMinutes(100), 0), + Arguments.of(false, 0, Duration.ofMinutes(100), 128), + Arguments.of(false, 0, Duration.ofMinutes(100), 0)); + } + + @MethodSource("getExpiredEventHandlersCaseMatrix") + @ParameterizedTest( + name = + "tryIdNotSequential:{0}, expiredRecordsNum: {1}, eventHandlerTtl: {2}, unexpiredRecordsNum: {3}") + void testCleanExpiredEvents( + boolean tryIdNotSequential, + int expiredRecordsNum, + Duration eventHandlerTtl, + int unexpiredRecordsNum) + throws Exception { + eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, eventHandlerTtl); + + // Init the expired records. + initTestingEventHandlerRecords(expiredRecordsNum); + if (tryIdNotSequential) { + tryDeleteOneRecord(expiredRecordsNum); + } + var expiredInstant = jdbcEventInteractor.getCurrentInstant(); + + // Init the unexpired records. + initTestingEventHandlerRecords(unexpiredRecordsNum); + + // Reset the clock to clean all expired data. + jdbcEventInteractor.setClock( + Clock.fixed( + expiredInstant.plus(eventHandlerTtl).plus(Duration.ofMillis(1)), + ZoneId.systemDefault())); + + eventHandler.cleanExpiredEvents(); + + try (Connection connection = getConnection(); + PreparedStatement ps = + connection.prepareStatement( + "select count(1) from t_flink_autoscaler_event_handler"); + ResultSet countResultSet = ps.executeQuery()) { + countResultSet.next(); + assertThat(countResultSet.getInt(1)).isEqualTo(unexpiredRecordsNum); + } + } + + private void tryDeleteOneRecord(int expiredRecordsNum) throws Exception { + // To simulate non-sequential IDs in expired records. + Timestamp date = Timestamp.from(createTime); + Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date); + if (minId == null) { + return; + } + try (Connection connection = getConnection(); + PreparedStatement ps = + connection.prepareStatement( + "delete from t_flink_autoscaler_event_handler where id = ?")) { + ps.setObject(1, (minId + expiredRecordsNum) / 2); + ps.execute(); + } + } + + private void initTestingEventHandlerRecords(int recordsNum) { + for (int i = 0; i < recordsNum; i++) { + jdbcEventInteractor.setClock( + Clock.fixed( + jdbcEventInteractor.getCurrentInstant().plusSeconds(1), + ZoneId.systemDefault())); + eventHandler.handleEvent( + ctx, + AutoScalerEventHandler.Type.Normal, + SCALING_REPORT_REASON, + "message-" + i, + "messageKey-" + i, + null); + } + } + private void createFirstScalingEvent() throws Exception { jdbcEventInteractor.assertCounters(0, 0, 0); eventHandler.handleScalingEvent( @@ -441,8 +578,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest assertThat( jdbcEventInteractor.queryEvents( - ctx.getJobKey().toString(), - AutoScalerEventHandler.SCALING_REPORT_REASON)) + ctx.getJobKey().toString(), SCALING_REPORT_REASON)) .singleElement() .satisfies( event -> { @@ -523,7 +659,7 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest assertThat(event.getCreateTime()).isEqualTo(expectedCreateTime); assertThat(event.getUpdateTime()).isEqualTo(expectedUpdateTime); assertThat(event.getJobKey()).isEqualTo(ctx.getJobKey().toString()); - assertThat(event.getReason()).isEqualTo(AutoScalerEventHandler.SCALING_REPORT_REASON); + assertThat(event.getReason()).isEqualTo(SCALING_REPORT_REASON); assertThat(event.getEventType()).isEqualTo(AutoScalerEventHandler.Type.Normal.toString()); assertThat(event.getCount()).isEqualTo(expectedCount); } @@ -531,7 +667,20 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest /** Test {@link JdbcAutoScalerEventHandler} via Derby. */ class DerbyJdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase - implements DerbyTestBase {} + implements DerbyTestBase { + + @Disabled("Disabled due to the 'LIMIT' clause is not supported in Derby.") + @Override + void testCleanExpiredEvents( + boolean tryIdNotSequential, + int expiredRecordsNum, + Duration eventHandlerTtl, + int unexpiredRecordsNum) {} + + @Disabled("Disabled due to the 'LIMIT' clause is not supported in Derby.") + @Override + void testDeleteCounterWhenIdNotConsecutive() {} +} /** Test {@link JdbcAutoScalerEventHandler} via MySQL 5.6.x. */ class MySQL56JdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase @@ -547,4 +696,9 @@ class MySQL8JdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEvent /** Test {@link JdbcAutoScalerEventHandler} via Postgre SQL. */ class PostgreSQLJdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase - implements PostgreSQLTestBase {} + implements PostgreSQLTestBase { + + @Disabled("Disabled due to the column 'id' can only be updated to DEFAULT.") + @Override + void testDeleteCounterWhenIdNotConsecutive() {} +} diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java index 664a28bd..39ccd76c 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java @@ -20,6 +20,7 @@ package org.apache.flink.autoscaler.jdbc.event; import org.apache.flink.autoscaler.event.AutoScalerEventHandler; import java.sql.Connection; +import java.sql.Timestamp; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -31,12 +32,14 @@ class CountableJdbcEventInteractor extends JdbcEventInteractor { private final AtomicLong queryCounter; private final AtomicLong createCounter; private final AtomicLong updateCounter; + private final AtomicLong deleteExpiredCounter; public CountableJdbcEventInteractor(Connection conn) { super(conn); queryCounter = new AtomicLong(); createCounter = new AtomicLong(); updateCounter = new AtomicLong(); + deleteExpiredCounter = new AtomicLong(); } @Override @@ -64,10 +67,21 @@ class CountableJdbcEventInteractor extends JdbcEventInteractor { super.updateEvent(id, message, eventCount); } + @Override + int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp timestamp) + throws Exception { + deleteExpiredCounter.incrementAndGet(); + return super.deleteExpiredEventsByIdRangeAndDate(startId, endId, timestamp); + } + public void assertCounters( long expectedQueryCounter, long expectedUpdateCounter, long expectedCreateCounter) { assertThat(queryCounter).hasValue(expectedQueryCounter); assertThat(updateCounter).hasValue(expectedUpdateCounter); assertThat(createCounter).hasValue(expectedCreateCounter); } + + public void assertDeleteExpiredCounter(long expectedCounter) { + assertThat(deleteExpiredCounter).hasValue(expectedCounter); + } } diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java index 5d063caf..2f58ca6e 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java @@ -30,6 +30,7 @@ import org.apache.flink.configuration.description.InlineElement; import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.JDBC; import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.LOGGING; import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.EVENT_HANDLER_TYPE; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_EVENT_HANDLER_TTL; import static org.apache.flink.configuration.description.TextElement.text; /** The factory of {@link AutoScalerEventHandler}. */ @@ -73,6 +74,7 @@ public class AutoscalerEventHandlerFactory { AutoScalerEventHandler<KEY, Context> createJdbcEventHandler(Configuration conf) throws Exception { var conn = HikariJDBCUtil.getConnection(conf); - return new JdbcAutoScalerEventHandler<>(new JdbcEventInteractor(conn)); + return new JdbcAutoScalerEventHandler<>( + new JdbcEventInteractor(conn), conf.get(JDBC_EVENT_HANDLER_TTL)); } } diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java index a075b718..87875ec1 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java @@ -113,6 +113,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont public void close() { scheduledExecutorService.shutdownNow(); scalingThreadPool.shutdownNow(); + eventHandler.close(); } /** diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java index edc30549..15b9b6a0 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java @@ -121,4 +121,12 @@ public class AutoscalerStandaloneOptions { code(EVENT_HANDLER_TYPE.key()), code("JDBC")) .build()); + + public static final ConfigOption<Duration> JDBC_EVENT_HANDLER_TTL = + autoscalerStandaloneConfig("jdbc.event-handler.ttl") + .durationType() + .defaultValue(Duration.ofDays(90)) + .withDescription( + "The time to live based on create time for the JDBC event handler records. " + + "When the config is set as '0', the ttl strategy for the records would be disabled."); } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java index afca8b8e..0fcf50b7 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java @@ -27,6 +27,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import javax.annotation.Nullable; +import java.io.Closeable; import java.time.Duration; import java.util.Map; import java.util.stream.Collectors; @@ -42,7 +43,8 @@ import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_ * @param <Context> Instance of JobAutoScalerContext. */ @Experimental -public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> { +public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> + extends Closeable { String SCALING_SUMMARY_ENTRY = "{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f}"; String SCALING_EXECUTION_DISABLED_REASON = "%s:%s, recommended parallelism change:"; @@ -99,6 +101,9 @@ public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContex interval); } + /** Close the related resource. */ + default void close() {} + static String scalingReport(Map<JobVertexID, ScalingSummary> scalingSummaries, String message) { StringBuilder sb = new StringBuilder(message); scalingSummaries.forEach(