This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 907d0f32126 [FLINK-33881][state] Avoid copy and update value in 
TtlListState#getUnexpiredOrNull
907d0f32126 is described below

commit 907d0f32126b9f8acfc80f3f4098e71cb37f0e37
Author: Jinzhong Li <lijinzhong2...@gmail.com>
AuthorDate: Tue Jan 9 18:02:43 2024 +0800

    [FLINK-33881][state] Avoid copy and update value in 
TtlListState#getUnexpiredOrNull
---
 .../flink/runtime/state/ttl/TtlListState.java       | 21 ++++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
index 797127582e1..fa4d50fa6b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
@@ -109,12 +109,27 @@ class TtlListState<K, N, T>
         }
 
         long currentTimestamp = timeProvider.currentTimestamp();
-        List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
         TypeSerializer<TtlValue<T>> elementSerializer =
                 ((ListSerializer<TtlValue<T>>) original.getValueSerializer())
                         .getElementSerializer();
-        for (TtlValue<T> ttlValue : ttlValues) {
-            if (!TtlUtils.expired(ttlValue, ttl, currentTimestamp)) {
+        int firstExpireElementIndex = -1;
+        for (int i = 0; i < ttlValues.size(); i++) {
+            TtlValue<T> ttlValue = ttlValues.get(i);
+            if (TtlUtils.expired(ttlValue, ttl, currentTimestamp)) {
+                firstExpireElementIndex = i;
+                break;
+            }
+        }
+        if (firstExpireElementIndex == -1) {
+            return ttlValues;
+        }
+
+        List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
+        for (int i = 0; i < ttlValues.size(); i++) {
+            TtlValue<T> ttlValue = ttlValues.get(i);
+            if (i < firstExpireElementIndex
+                    || (i > firstExpireElementIndex
+                            && !TtlUtils.expired(ttlValue, ttl, 
currentTimestamp))) {
                 // we have to do the defensive copy to update the value
                 unexpired.add(elementSerializer.copy(ttlValue));
             }

Reply via email to