This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 907d0f32126 [FLINK-33881][state] Avoid copy and update value in TtlListState#getUnexpiredOrNull 907d0f32126 is described below commit 907d0f32126b9f8acfc80f3f4098e71cb37f0e37 Author: Jinzhong Li <lijinzhong2...@gmail.com> AuthorDate: Tue Jan 9 18:02:43 2024 +0800 [FLINK-33881][state] Avoid copy and update value in TtlListState#getUnexpiredOrNull --- .../flink/runtime/state/ttl/TtlListState.java | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java index 797127582e1..fa4d50fa6b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java @@ -109,12 +109,27 @@ class TtlListState<K, N, T> } long currentTimestamp = timeProvider.currentTimestamp(); - List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size()); TypeSerializer<TtlValue<T>> elementSerializer = ((ListSerializer<TtlValue<T>>) original.getValueSerializer()) .getElementSerializer(); - for (TtlValue<T> ttlValue : ttlValues) { - if (!TtlUtils.expired(ttlValue, ttl, currentTimestamp)) { + int firstExpireElementIndex = -1; + for (int i = 0; i < ttlValues.size(); i++) { + TtlValue<T> ttlValue = ttlValues.get(i); + if (TtlUtils.expired(ttlValue, ttl, currentTimestamp)) { + firstExpireElementIndex = i; + break; + } + } + if (firstExpireElementIndex == -1) { + return ttlValues; + } + + List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size()); + for (int i = 0; i < ttlValues.size(); i++) { + TtlValue<T> ttlValue = ttlValues.get(i); + if (i < firstExpireElementIndex + || (i > firstExpireElementIndex + && !TtlUtils.expired(ttlValue, ttl, currentTimestamp))) { // we have to do the defensive copy to update the value unexpired.add(elementSerializer.copy(ttlValue)); }