1996fanrui commented on code in PR #865:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/865#discussion_r1728372397


##########
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java:
##########
@@ -152,4 +153,29 @@ protected List<AutoScalerEvent> queryEvents(String jobKey, 
String reason) throws
     void setClock(@Nonnull Clock clock) {
         this.clock = Preconditions.checkNotNull(clock);
     }
+
+    @Nullable
+    Long queryMinEventIdByCreateTime(Timestamp timestamp) throws Exception {
+        var sql =
+                "SELECT id from t_flink_autoscaler_event_handler "
+                        + "           where id = (SELECT id FROM 
t_flink_autoscaler_event_handler order by id asc limit 1) "
+                        + "           and create_time < ?";

Review Comment:
   `SELECT id, create_time FROM t_flink_autoscaler_event_handler order by id 
asc limit 1` is enough, we can check create_time in memory.



##########
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java:
##########
@@ -104,6 +135,49 @@ public void handleScalingEvent(
         }
     }
 
+    @Override
+    public void close() {
+        if (Objects.nonNull(scheduledEventHandlerCleaner)
+                && !scheduledEventHandlerCleaner.isShutdown()) {
+            scheduledEventHandlerCleaner.shutdownNow();
+        }
+    }
+
+    @VisibleForTesting
+    void cleanExpiredEvents() {
+        var date =
+                Timestamp.from(
+                        jdbcEventInteractor
+                                .getCurrentInstant()
+                                .minusMillis(eventHandlerTtl.toMillis()));
+        try {
+            Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date);
+            if (Objects.isNull(minId)) {
+                log.warn("No expired event handlers queried at {}", new 
Date());
+                return;
+            }
+            int deleted = 0, batch = 4098;
+            var count = 0L;
+            boolean cleanable = true;
+            for (long startId = minId; deleted == batch || cleanable; startId 
+= batch) {

Review Comment:
   1. IIUC, `deleted == batch` isn't needed. If they are not equal, you updated 
the cleanable, so we check cleanable is enough.
   2. If 1 is acceptable, we don't need the `for loop`. `while(true) loop` is 
enough, and we can break the loop if `cleanable` is false.
   3. If 1 is acceptable, `deleted` can be defined inside of `loop`.
   4. If 2 is acceptable, we don't need `cleanable`. We can check 
`jdbcEventInteractor.queryMinEventIdByCreateTime(date) == null`, if it's true, 
break directly.



##########
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java:
##########
@@ -104,6 +135,49 @@ public void handleScalingEvent(
         }
     }
 
+    @Override
+    public void close() {
+        if (Objects.nonNull(scheduledEventHandlerCleaner)
+                && !scheduledEventHandlerCleaner.isShutdown()) {
+            scheduledEventHandlerCleaner.shutdownNow();
+        }
+    }
+
+    @VisibleForTesting
+    void cleanExpiredEvents() {
+        var date =
+                Timestamp.from(
+                        jdbcEventInteractor
+                                .getCurrentInstant()
+                                .minusMillis(eventHandlerTtl.toMillis()));
+        try {
+            Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date);
+            if (Objects.isNull(minId)) {
+                log.warn("No expired event handlers queried at {}", new 
Date());
+                return;
+            }
+            int deleted = 0, batch = 4098;
+            var count = 0L;
+            boolean cleanable = true;
+            for (long startId = minId; deleted == batch || cleanable; startId 
+= batch) {
+                var endId = startId + batch;
+                deleted =
+                        
jdbcEventInteractor.deleteExpiredEventsByIdRangeAndDate(
+                                startId, endId, date);
+                count += deleted;
+                if (deleted != batch) {
+                    cleanable = 
jdbcEventInteractor.queryMinEventIdByCreateTime(date) != null;

Review Comment:
   IIUC, if it's not null, we should update the `startId = 
jdbcEventInteractor.queryMinEventIdByCreateTime(date)`.



##########
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java:
##########
@@ -104,6 +135,49 @@ public void handleScalingEvent(
         }
     }
 
+    @Override
+    public void close() {
+        if (Objects.nonNull(scheduledEventHandlerCleaner)
+                && !scheduledEventHandlerCleaner.isShutdown()) {
+            scheduledEventHandlerCleaner.shutdownNow();
+        }
+    }
+
+    @VisibleForTesting
+    void cleanExpiredEvents() {
+        var date =
+                Timestamp.from(
+                        jdbcEventInteractor
+                                .getCurrentInstant()
+                                .minusMillis(eventHandlerTtl.toMillis()));
+        try {
+            Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date);
+            if (Objects.isNull(minId)) {
+                log.warn("No expired event handlers queried at {}", new 
Date());
+                return;
+            }
+            int deleted = 0, batch = 4098;
+            var count = 0L;

Review Comment:
   ```suggestion
               var deletedTotalCount = 0L;
   ```



##########
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java:
##########
@@ -104,6 +135,49 @@ public void handleScalingEvent(
         }
     }
 
+    @Override
+    public void close() {
+        if (Objects.nonNull(scheduledEventHandlerCleaner)
+                && !scheduledEventHandlerCleaner.isShutdown()) {
+            scheduledEventHandlerCleaner.shutdownNow();
+        }
+    }
+
+    @VisibleForTesting
+    void cleanExpiredEvents() {
+        var date =
+                Timestamp.from(
+                        jdbcEventInteractor
+                                .getCurrentInstant()
+                                .minusMillis(eventHandlerTtl.toMillis()));
+        try {
+            Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date);
+            if (Objects.isNull(minId)) {
+                log.warn("No expired event handlers queried at {}", new 
Date());
+                return;
+            }
+            int deleted = 0, batch = 4098;

Review Comment:
   How about renaming it to batchSize?



##########
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java:
##########
@@ -104,6 +135,49 @@ public void handleScalingEvent(
         }
     }
 
+    @Override
+    public void close() {
+        if (Objects.nonNull(scheduledEventHandlerCleaner)
+                && !scheduledEventHandlerCleaner.isShutdown()) {
+            scheduledEventHandlerCleaner.shutdownNow();
+        }
+    }
+
+    @VisibleForTesting
+    void cleanExpiredEvents() {
+        var date =
+                Timestamp.from(
+                        jdbcEventInteractor
+                                .getCurrentInstant()
+                                .minusMillis(eventHandlerTtl.toMillis()));
+        try {
+            Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date);
+            if (Objects.isNull(minId)) {
+                log.warn("No expired event handlers queried at {}", new 
Date());
+                return;
+            }
+            int deleted = 0, batch = 4098;
+            var count = 0L;
+            boolean cleanable = true;
+            for (long startId = minId; deleted == batch || cleanable; startId 
+= batch) {
+                var endId = startId + batch;
+                deleted =
+                        
jdbcEventInteractor.deleteExpiredEventsByIdRangeAndDate(
+                                startId, endId, date);
+                count += deleted;
+                if (deleted != batch) {
+                    cleanable = 
jdbcEventInteractor.queryMinEventIdByCreateTime(date) != null;
+                }
+                log.debug(
+                        "Deleted expired {} event handler records by batch 
successfully.", deleted);
+                Thread.sleep(10L);

Review Comment:
   How about sleep 1s?
   
   One minute could clean 4096 * 60 = 240 k, it's totally enough.



##########
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java:
##########
@@ -104,6 +135,49 @@ public void handleScalingEvent(
         }
     }
 
+    @Override
+    public void close() {
+        if (Objects.nonNull(scheduledEventHandlerCleaner)
+                && !scheduledEventHandlerCleaner.isShutdown()) {
+            scheduledEventHandlerCleaner.shutdownNow();
+        }
+    }
+
+    @VisibleForTesting
+    void cleanExpiredEvents() {
+        var date =
+                Timestamp.from(
+                        jdbcEventInteractor
+                                .getCurrentInstant()
+                                .minusMillis(eventHandlerTtl.toMillis()));
+        try {
+            Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date);

Review Comment:
   We could define a nested loops:
   
   - The outer loop check `Long minId = 
jdbcEventInteractor.queryMinEventIdByCreateTime(date)`, and `minId != null`
   - The inner loop check `deleted == batchSize`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to