1996fanrui commented on code in PR #865: URL: https://github.com/apache/flink-kubernetes-operator/pull/865#discussion_r1728372397
########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java: ########## @@ -152,4 +153,29 @@ protected List<AutoScalerEvent> queryEvents(String jobKey, String reason) throws void setClock(@Nonnull Clock clock) { this.clock = Preconditions.checkNotNull(clock); } + + @Nullable + Long queryMinEventIdByCreateTime(Timestamp timestamp) throws Exception { + var sql = + "SELECT id from t_flink_autoscaler_event_handler " + + " where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) " + + " and create_time < ?"; Review Comment: `SELECT id, create_time FROM t_flink_autoscaler_event_handler order by id asc limit 1` is enough, we can check create_time in memory. ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java: ########## @@ -104,6 +135,49 @@ public void handleScalingEvent( } } + @Override + public void close() { + if (Objects.nonNull(scheduledEventHandlerCleaner) + && !scheduledEventHandlerCleaner.isShutdown()) { + scheduledEventHandlerCleaner.shutdownNow(); + } + } + + @VisibleForTesting + void cleanExpiredEvents() { + var date = + Timestamp.from( + jdbcEventInteractor + .getCurrentInstant() + .minusMillis(eventHandlerTtl.toMillis())); + try { + Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date); + if (Objects.isNull(minId)) { + log.warn("No expired event handlers queried at {}", new Date()); + return; + } + int deleted = 0, batch = 4098; + var count = 0L; + boolean cleanable = true; + for (long startId = minId; deleted == batch || cleanable; startId += batch) { Review Comment: 1. IIUC, `deleted == batch` isn't needed. If they are not equal, you updated the cleanable, so we check cleanable is enough. 2. If 1 is acceptable, we don't need the `for loop`. `while(true) loop` is enough, and we can break the loop if `cleanable` is false. 3. If 1 is acceptable, `deleted` can be defined inside of `loop`. 4. If 2 is acceptable, we don't need `cleanable`. We can check `jdbcEventInteractor.queryMinEventIdByCreateTime(date) == null`, if it's true, break directly. ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java: ########## @@ -104,6 +135,49 @@ public void handleScalingEvent( } } + @Override + public void close() { + if (Objects.nonNull(scheduledEventHandlerCleaner) + && !scheduledEventHandlerCleaner.isShutdown()) { + scheduledEventHandlerCleaner.shutdownNow(); + } + } + + @VisibleForTesting + void cleanExpiredEvents() { + var date = + Timestamp.from( + jdbcEventInteractor + .getCurrentInstant() + .minusMillis(eventHandlerTtl.toMillis())); + try { + Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date); + if (Objects.isNull(minId)) { + log.warn("No expired event handlers queried at {}", new Date()); + return; + } + int deleted = 0, batch = 4098; + var count = 0L; + boolean cleanable = true; + for (long startId = minId; deleted == batch || cleanable; startId += batch) { + var endId = startId + batch; + deleted = + jdbcEventInteractor.deleteExpiredEventsByIdRangeAndDate( + startId, endId, date); + count += deleted; + if (deleted != batch) { + cleanable = jdbcEventInteractor.queryMinEventIdByCreateTime(date) != null; Review Comment: IIUC, if it's not null, we should update the `startId = jdbcEventInteractor.queryMinEventIdByCreateTime(date)`. ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java: ########## @@ -104,6 +135,49 @@ public void handleScalingEvent( } } + @Override + public void close() { + if (Objects.nonNull(scheduledEventHandlerCleaner) + && !scheduledEventHandlerCleaner.isShutdown()) { + scheduledEventHandlerCleaner.shutdownNow(); + } + } + + @VisibleForTesting + void cleanExpiredEvents() { + var date = + Timestamp.from( + jdbcEventInteractor + .getCurrentInstant() + .minusMillis(eventHandlerTtl.toMillis())); + try { + Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date); + if (Objects.isNull(minId)) { + log.warn("No expired event handlers queried at {}", new Date()); + return; + } + int deleted = 0, batch = 4098; + var count = 0L; Review Comment: ```suggestion var deletedTotalCount = 0L; ``` ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java: ########## @@ -104,6 +135,49 @@ public void handleScalingEvent( } } + @Override + public void close() { + if (Objects.nonNull(scheduledEventHandlerCleaner) + && !scheduledEventHandlerCleaner.isShutdown()) { + scheduledEventHandlerCleaner.shutdownNow(); + } + } + + @VisibleForTesting + void cleanExpiredEvents() { + var date = + Timestamp.from( + jdbcEventInteractor + .getCurrentInstant() + .minusMillis(eventHandlerTtl.toMillis())); + try { + Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date); + if (Objects.isNull(minId)) { + log.warn("No expired event handlers queried at {}", new Date()); + return; + } + int deleted = 0, batch = 4098; Review Comment: How about renaming it to batchSize? ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java: ########## @@ -104,6 +135,49 @@ public void handleScalingEvent( } } + @Override + public void close() { + if (Objects.nonNull(scheduledEventHandlerCleaner) + && !scheduledEventHandlerCleaner.isShutdown()) { + scheduledEventHandlerCleaner.shutdownNow(); + } + } + + @VisibleForTesting + void cleanExpiredEvents() { + var date = + Timestamp.from( + jdbcEventInteractor + .getCurrentInstant() + .minusMillis(eventHandlerTtl.toMillis())); + try { + Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date); + if (Objects.isNull(minId)) { + log.warn("No expired event handlers queried at {}", new Date()); + return; + } + int deleted = 0, batch = 4098; + var count = 0L; + boolean cleanable = true; + for (long startId = minId; deleted == batch || cleanable; startId += batch) { + var endId = startId + batch; + deleted = + jdbcEventInteractor.deleteExpiredEventsByIdRangeAndDate( + startId, endId, date); + count += deleted; + if (deleted != batch) { + cleanable = jdbcEventInteractor.queryMinEventIdByCreateTime(date) != null; + } + log.debug( + "Deleted expired {} event handler records by batch successfully.", deleted); + Thread.sleep(10L); Review Comment: How about sleep 1s? One minute could clean 4096 * 60 = 240 k, it's totally enough. ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java: ########## @@ -104,6 +135,49 @@ public void handleScalingEvent( } } + @Override + public void close() { + if (Objects.nonNull(scheduledEventHandlerCleaner) + && !scheduledEventHandlerCleaner.isShutdown()) { + scheduledEventHandlerCleaner.shutdownNow(); + } + } + + @VisibleForTesting + void cleanExpiredEvents() { + var date = + Timestamp.from( + jdbcEventInteractor + .getCurrentInstant() + .minusMillis(eventHandlerTtl.toMillis())); + try { + Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date); Review Comment: We could define a nested loops: - The outer loop check `Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date)`, and `minId != null` - The inner loop check `deleted == batchSize`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org