This is an automated email from the ASF dual-hosted git repository.
xintongsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 39c5878c [Feature] Support TTL on ShortTermMemory (#657)
39c5878c is described below
commit 39c5878c58b08c15d6302688ed91745d6ab09ffd
Author: daken <[email protected]>
AuthorDate: Sun May 31 14:17:27 2026 +0800
[Feature] Support TTL on ShortTermMemory (#657)
---
.../agents/api/agents/AgentExecutionOptions.java | 22 +++
.../api/agents/ShortTermMemoryTtlUpdate.java | 25 +++
.../api/agents/ShortTermMemoryTtlVisibility.java | 25 +++
python/flink_agents/api/core_options.py | 37 +++++
.../short_term_memory_ttl_test.py | 178 +++++++++++++++++++++
.../create_python_option_from_java_option.py | 40 ++++-
.../runtime/operator/ActionExecutionOperator.java | 2 +-
.../runtime/operator/OperatorStateManager.java | 73 ++++++++-
.../memory/ShortTermMemoryTTLIntegrationTest.java | 158 ++++++++++++++++++
9 files changed, 554 insertions(+), 6 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java
b/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java
index 2b8751d4..a0b9269e 100644
---
a/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java
+++
b/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java
@@ -47,4 +47,26 @@ public class AgentExecutionOptions {
public static final ConfigOption<Boolean> RAG_ASYNC =
new ConfigOption<>("rag.async", Boolean.class, true);
+
+ /** Set to a positive value in milliseconds to enable short-term memory
TTL; 0 disables it. */
+ public static final ConfigOption<Long> SHORT_TERM_MEMORY_STATE_TTL_MS =
+ new ConfigOption<>("short-term-memory.state-ttl.ms", Long.class,
0L);
+
+ /** Update policy for short-term memory TTL, consulted only when TTL is
enabled. */
+ public static final ConfigOption<ShortTermMemoryTtlUpdate>
+ SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE =
+ new ConfigOption<>(
+ "short-term-memory.state-ttl.update-type",
+ ShortTermMemoryTtlUpdate.class,
+ ShortTermMemoryTtlUpdate.ON_READ_AND_WRITE);
+
+ /**
+ * Visibility policy for expired short-term memory state, consulted only
when TTL is enabled.
+ */
+ public static final ConfigOption<ShortTermMemoryTtlVisibility>
+ SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY =
+ new ConfigOption<>(
+ "short-term-memory.state-ttl.visibility",
+ ShortTermMemoryTtlVisibility.class,
+ ShortTermMemoryTtlVisibility.NEVER_RETURN_EXPIRED);
}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/agents/ShortTermMemoryTtlUpdate.java
b/api/src/main/java/org/apache/flink/agents/api/agents/ShortTermMemoryTtlUpdate.java
new file mode 100644
index 00000000..06b92de5
--- /dev/null
+++
b/api/src/main/java/org/apache/flink/agents/api/agents/ShortTermMemoryTtlUpdate.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.agents;
+
+/** Defines when short-term memory state TTL is refreshed. */
+public enum ShortTermMemoryTtlUpdate {
+ ON_CREATE_AND_WRITE,
+ ON_READ_AND_WRITE
+}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/agents/ShortTermMemoryTtlVisibility.java
b/api/src/main/java/org/apache/flink/agents/api/agents/ShortTermMemoryTtlVisibility.java
new file mode 100644
index 00000000..0e252c6c
--- /dev/null
+++
b/api/src/main/java/org/apache/flink/agents/api/agents/ShortTermMemoryTtlVisibility.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.agents;
+
+/** Defines whether expired short-term memory state can be returned before
cleanup. */
+public enum ShortTermMemoryTtlVisibility {
+ NEVER_RETURN_EXPIRED,
+ RETURN_EXPIRED_IF_NOT_CLEANED_UP
+}
diff --git a/python/flink_agents/api/core_options.py
b/python/flink_agents/api/core_options.py
index 5b575c3f..f5247616 100644
--- a/python/flink_agents/api/core_options.py
+++ b/python/flink_agents/api/core_options.py
@@ -82,6 +82,20 @@ class ErrorHandlingStrategy(Enum):
IGNORE = "ignore"
+class ShortTermMemoryTtlUpdate(Enum):
+ """Update policy for short-term memory TTL."""
+
+ ON_CREATE_AND_WRITE = "ON_CREATE_AND_WRITE"
+ ON_READ_AND_WRITE = "ON_READ_AND_WRITE"
+
+
+class ShortTermMemoryTtlVisibility(Enum):
+ """Visibility policy for expired short-term memory state."""
+
+ NEVER_RETURN_EXPIRED = "NEVER_RETURN_EXPIRED"
+ RETURN_EXPIRED_IF_NOT_CLEANED_UP = "RETURN_EXPIRED_IF_NOT_CLEANED_UP"
+
+
class LoggerType(Enum):
"""Built-in event logger types.
@@ -179,3 +193,26 @@ class AgentExecutionOptions:
config_type=bool,
default=True,
)
+
+ # Set to a positive value in milliseconds to enable short-term memory TTL;
+ # 0 disables it.
+ SHORT_TERM_MEMORY_STATE_TTL_MS = ConfigOption(
+ key="short-term-memory.state-ttl.ms",
+ config_type=int,
+ default=0,
+ )
+
+ # Update policy for short-term memory TTL, consulted only when TTL is
enabled.
+ SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE = ConfigOption(
+ key="short-term-memory.state-ttl.update-type",
+ config_type=ShortTermMemoryTtlUpdate,
+ default=ShortTermMemoryTtlUpdate.ON_READ_AND_WRITE,
+ )
+
+ # Visibility policy for expired short-term memory state, consulted only
when TTL
+ # is enabled.
+ SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY = ConfigOption(
+ key="short-term-memory.state-ttl.visibility",
+ config_type=ShortTermMemoryTtlVisibility,
+ default=ShortTermMemoryTtlVisibility.NEVER_RETURN_EXPIRED,
+ )
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_integration/short_term_memory_ttl_test.py
b/python/flink_agents/e2e_tests/e2e_tests_integration/short_term_memory_ttl_test.py
new file mode 100644
index 00000000..19c149fc
--- /dev/null
+++
b/python/flink_agents/e2e_tests/e2e_tests_integration/short_term_memory_ttl_test.py
@@ -0,0 +1,178 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import os
+import sysconfig
+import time
+from pathlib import Path
+from typing import Any
+
+from pydantic import BaseModel
+from pyflink.common import Configuration
+from pyflink.datastream import KeySelector, StreamExecutionEnvironment
+
+from flink_agents.api.agents.agent import Agent
+from flink_agents.api.core_options import (
+ AgentExecutionOptions,
+ ShortTermMemoryTtlUpdate,
+ ShortTermMemoryTtlVisibility,
+)
+from flink_agents.api.decorators import action
+from flink_agents.api.events.event import Event, InputEvent, OutputEvent
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.api.runner_context import RunnerContext
+
+current_dir = Path(__file__).parent
+os.environ["PYTHONPATH"] = (
+ f"{current_dir.parent.parent.parent}:{sysconfig.get_paths()['purelib']}"
+)
+
+
+class TtlTestInput(BaseModel):
+ event_key: str
+ sleep_ms: int
+
+
+class TtlTestKeySelector(KeySelector):
+ def get_key(self, value: TtlTestInput) -> str:
+ return "test_key"
+
+
+class ShortTermMemoryTtlTestAgent(Agent):
+ @action(InputEvent.EVENT_TYPE)
+ @staticmethod
+ def input(event: Event, ctx: RunnerContext) -> None:
+ input_data =
TtlTestInput.model_validate(InputEvent.from_event(event).input)
+
+ short_term_memory = ctx.short_term_memory
+ existing_value = short_term_memory.get(input_data.event_key)
+ current_count = 0
+ if isinstance(existing_value, int):
+ current_count = existing_value
+ elif isinstance(existing_value, float):
+ current_count = int(existing_value)
+
+ short_term_memory.set(input_data.event_key, current_count + 1)
+ time.sleep(input_data.sleep_ms / 1000)
+ ctx.send_event(
+ OutputEvent(
+ output=(
+ f"{input_data.event_key}|"
+ f"{'NEW' if existing_value is None else 'EXISTING'}"
+ )
+ )
+ )
+
+
+def run_scenario(
+ ttl_ms: int,
+ sleep_ms: int,
+ *,
+ configure_ttl_ms: bool,
+ configure_ttl_options: bool,
+) -> list[Any]:
+ config = Configuration()
+ config.set_string("python.pythonpath", os.environ["PYTHONPATH"])
+ env = StreamExecutionEnvironment.get_execution_environment(config)
+ env.set_parallelism(1)
+
+ input_stream = env.from_collection(
+ [
+ TtlTestInput(event_key="event1", sleep_ms=sleep_ms),
+ TtlTestInput(event_key="event2", sleep_ms=sleep_ms),
+ TtlTestInput(event_key="event1", sleep_ms=sleep_ms),
+ ]
+ )
+
+ agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+ agents_config = agents_env.get_config()
+ if configure_ttl_ms:
+
agents_config.set(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS, ttl_ms)
+ if configure_ttl_options:
+ agents_config.set(
+ AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE,
+ ShortTermMemoryTtlUpdate.ON_CREATE_AND_WRITE,
+ )
+ agents_config.set(
+ AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY,
+ ShortTermMemoryTtlVisibility.NEVER_RETURN_EXPIRED,
+ )
+
+ output_datastream = (
+ agents_env.from_datastream(
+ input=input_stream, key_selector=TtlTestKeySelector()
+ )
+ .apply(ShortTermMemoryTtlTestAgent())
+ .to_datastream()
+ )
+
+ return list(output_datastream.execute_and_collect())
+
+
+def test_value_still_visible_before_ttl_expiry() -> None:
+ results = run_scenario(
+ 1000,
+ 0,
+ configure_ttl_ms=True,
+ configure_ttl_options=True,
+ )
+
+ assert results == ["event1|NEW", "event2|NEW", "event1|EXISTING"]
+
+
+def test_ttl_configuration_disabled_with_zero_ttl() -> None:
+ results = run_scenario(
+ 0,
+ 50,
+ configure_ttl_ms=True,
+ configure_ttl_options=True,
+ )
+
+ assert results == ["event1|NEW", "event2|NEW", "event1|EXISTING"]
+
+
+def test_ttl_configuration_disabled_by_default() -> None:
+ results = run_scenario(
+ 0,
+ 50,
+ configure_ttl_ms=False,
+ configure_ttl_options=True,
+ )
+
+ assert results == ["event1|NEW", "event2|NEW", "event1|EXISTING"]
+
+
+def test_value_expires_after_ttl() -> None:
+ results = run_scenario(
+ 50,
+ 200,
+ configure_ttl_ms=True,
+ configure_ttl_options=True,
+ )
+
+ assert results == ["event1|NEW", "event2|NEW", "event1|NEW"]
+
+
+def test_ttl_configuration_applied_with_default_update_type_and_visibility()
-> None:
+ results = run_scenario(
+ 50,
+ 200,
+ configure_ttl_ms=True,
+ configure_ttl_options=False,
+ )
+
+ assert results == ["event1|NEW", "event2|NEW", "event1|NEW"]
diff --git
a/python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py
b/python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py
index c2da3714..b7251ee1 100644
---
a/python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py
+++
b/python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py
@@ -19,7 +19,12 @@ from pathlib import Path
from pyflink.util.java_utils import add_jars_to_context_class_loader
-from flink_agents.api.core_options import AgentConfigOptions
+from flink_agents.api.core_options import (
+ AgentConfigOptions,
+ AgentExecutionOptions,
+ ShortTermMemoryTtlUpdate,
+ ShortTermMemoryTtlVisibility,
+)
# This script is used to verify that Java-defined configuration options
# (e.g., AgentConfigOptions) are correctly exposed and accessible in the
@@ -39,3 +44,36 @@ if __name__ == "__main__":
assert AgentConfigOptions.BASE_LOG_DIR.get_key() == "baseLogDir"
assert AgentConfigOptions.BASE_LOG_DIR.get_type() is str
assert AgentConfigOptions.BASE_LOG_DIR.get_default_value() is None
+
+ assert (
+ AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS.get_key()
+ == "short-term-memory.state-ttl.ms"
+ )
+ assert AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS.get_type() is
int
+ assert
AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS.get_default_value() == 0
+
+ assert (
+ AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE.get_key()
+ == "short-term-memory.state-ttl.update-type"
+ )
+ assert (
+
AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE.get_type()
+ is ShortTermMemoryTtlUpdate
+ )
+ assert (
+
AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE.get_default_value()
+ is ShortTermMemoryTtlUpdate.ON_READ_AND_WRITE
+ )
+
+ assert (
+ AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY.get_key()
+ == "short-term-memory.state-ttl.visibility"
+ )
+ assert (
+ AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY.get_type()
+ is ShortTermMemoryTtlVisibility
+ )
+ assert (
+
AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY.get_default_value()
+ is ShortTermMemoryTtlVisibility.NEVER_RETURN_EXPIRED
+ )
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index a0a3813d..caefb955 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -141,7 +141,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
public void open() throws Exception {
super.open();
- stateManager.initializeKeyedStates(getRuntimeContext());
+ stateManager.initializeKeyedStates(getRuntimeContext(),
agentPlan.getConfig());
stateManager.initializeOperatorStates(getOperatorStateBackend());
// ResourceCache constructs its own long-lived ResourceContextImpl
internally; on
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java
index 843a0207..eee3c2dd 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java
@@ -19,11 +19,17 @@
package org.apache.flink.agents.runtime.operator;
import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.agents.AgentExecutionOptions;
+import org.apache.flink.agents.api.agents.ShortTermMemoryTtlUpdate;
+import org.apache.flink.agents.api.agents.ShortTermMemoryTtlVisibility;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -37,6 +43,8 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import javax.annotation.Nullable;
+import java.time.Duration;
+
import static org.apache.flink.agents.runtime.utils.StateUtil.*;
/**
@@ -56,9 +64,9 @@ import static
org.apache.flink.agents.runtime.utils.StateUtil.*;
*
* <p>Lifecycle: instantiated by the operator's {@code initializeState()} (the
Flink lifecycle runs
* {@code initializeState} before {@code open}). Both {@link
- *
#initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext)}
and {@link
- * #initializeOperatorStates(OperatorStateBackend)} are invoked later from the
operator's {@code
- * open()}. There is no explicit close — the underlying state handles are
owned by Flink.
+ *
#initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext,
AgentPlan)} and
+ * {@link #initializeOperatorStates(OperatorStateBackend)} are invoked later
from the operator's
+ * {@code open()}. There is no explicit close — the underlying state handles
are owned by Flink.
*
* <p>Design constraint: package-private; no manager-to-manager held
references. Cross-cutting data
* flows via method parameters (see for example {@link
ActionTaskContextManager#transferContexts}
@@ -87,7 +95,9 @@ class OperatorStateManager {
*
* @param runtimeContext the operator's runtime context, used to obtain
keyed state handles.
*/
- void
initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext
runtimeContext)
+ void initializeKeyedStates(
+ org.apache.flink.api.common.functions.RuntimeContext
runtimeContext,
+ AgentConfiguration agentConfiguration)
throws Exception {
// init sensoryMemState
MapStateDescriptor<String, MemoryObjectImpl.MemoryItem>
sensoryMemStateDescriptor =
@@ -103,6 +113,7 @@ class OperatorStateManager {
"shortTermMemory",
TypeInformation.of(String.class),
TypeInformation.of(MemoryObjectImpl.MemoryItem.class));
+ maybeEnableShortTermMemoryTTL(shortTermMemStateDescriptor,
agentConfiguration);
shortTermMemState =
runtimeContext.getMapState(shortTermMemStateDescriptor);
// init sequence number state for per key message ordering
@@ -121,6 +132,60 @@ class OperatorStateManager {
PENDING_INPUT_EVENT_STATE_NAME,
TypeInformation.of(Event.class)));
}
+ /**
+ * When {@link AgentExecutionOptions#SHORT_TERM_MEMORY_STATE_TTL_MS} is
positive, attaches Flink
+ * {@link StateTtlConfig} to the short-term memory {@link
MapStateDescriptor}. Unset, null, or
+ * non-positive values disable TTL (Flink does not allow zero/negative
TTL).
+ */
+ private void maybeEnableShortTermMemoryTTL(
+ MapStateDescriptor<String, MemoryObjectImpl.MemoryItem> descriptor,
+ AgentConfiguration agentConfiguration) {
+ Long ttlMs =
agentConfiguration.get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS);
+ if (ttlMs == null || ttlMs <= 0) {
+ return;
+ }
+
+ ShortTermMemoryTtlUpdate updateType =
+ agentConfiguration.get(
+
AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE);
+
+ ShortTermMemoryTtlVisibility stateVisibility =
+ agentConfiguration.get(
+
AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY);
+
+ StateTtlConfig ttlConfig =
+ StateTtlConfig.newBuilder(Duration.ofMillis(ttlMs))
+ .setUpdateType(toFlinkUpdateType(updateType))
+
.setStateVisibility(toFlinkStateVisibility(stateVisibility))
+ .cleanupFullSnapshot()
+ .build();
+ descriptor.enableTimeToLive(ttlConfig);
+ }
+
+ private StateTtlConfig.UpdateType
toFlinkUpdateType(ShortTermMemoryTtlUpdate updateType) {
+ switch (updateType) {
+ case ON_CREATE_AND_WRITE:
+ return StateTtlConfig.UpdateType.OnCreateAndWrite;
+ case ON_READ_AND_WRITE:
+ return StateTtlConfig.UpdateType.OnReadAndWrite;
+ default:
+ throw new IllegalArgumentException("Unsupported TTL update
type: " + updateType);
+ }
+ }
+
+ private StateTtlConfig.StateVisibility toFlinkStateVisibility(
+ ShortTermMemoryTtlVisibility stateVisibility) {
+ switch (stateVisibility) {
+ case NEVER_RETURN_EXPIRED:
+ return StateTtlConfig.StateVisibility.NeverReturnExpired;
+ case RETURN_EXPIRED_IF_NOT_CLEANED_UP:
+ return
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported TTL state visibility: " +
stateVisibility);
+ }
+ }
+
/**
* Registers operator-level (non-keyed) state.
*
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/ShortTermMemoryTTLIntegrationTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/ShortTermMemoryTTLIntegrationTest.java
new file mode 100644
index 00000000..1e911384
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/ShortTermMemoryTTLIntegrationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.memory;
+
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.agents.api.agents.AgentExecutionOptions;
+import org.apache.flink.agents.api.agents.ShortTermMemoryTtlUpdate;
+import org.apache.flink.agents.api.agents.ShortTermMemoryTtlVisibility;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.context.MemoryObject;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Integration test for Short-Term Memory TTL functionality. */
+class ShortTermMemoryTTLIntegrationTest {
+
+ private static final String MEMORY_KEY = "test_key";
+
+ private static final class TestInput {
+ public String eventKey;
+ public long sleepMs;
+
+ private TestInput() {}
+
+ private TestInput(String eventKey, long sleepMs) {
+ this.eventKey = eventKey;
+ this.sleepMs = sleepMs;
+ }
+ }
+
+ public static class TTLTestAgent extends Agent {
+
+ @Action(listenEventTypes = {InputEvent.EVENT_TYPE})
+ public static void input(org.apache.flink.agents.api.Event event,
RunnerContext ctx)
+ throws Exception {
+ InputEvent inputEvent = (InputEvent) event;
+ TestInput input = (TestInput) inputEvent.getInput();
+
+ MemoryObject shortTermMemory = ctx.getShortTermMemory();
+ MemoryObject memoryObject = shortTermMemory.get(input.eventKey);
+
+ Object existingValue = null;
+ int currentCount = 0;
+ if (memoryObject != null && !memoryObject.isNestedObject()) {
+ existingValue = memoryObject.getValue();
+ if (existingValue instanceof Integer) {
+ currentCount = (Integer) existingValue;
+ } else if (existingValue instanceof Number) {
+ currentCount = ((Number) existingValue).intValue();
+ }
+ }
+
+ shortTermMemory.set(input.eventKey, currentCount + 1);
+ Thread.sleep(input.sleepMs);
+ ctx.sendEvent(
+ new OutputEvent(
+ input.eventKey + "|" + (existingValue == null ?
"NEW" : "EXISTING")));
+ }
+ }
+
+ @Test
+ void testValueStillVisibleBeforeTTLExpiry() throws Exception {
+ List<String> results = runScenario(1000L, 0L, true, true);
+
+ assertEquals(List.of("event1|NEW", "event2|NEW", "event1|EXISTING"),
results);
+ }
+
+ @Test
+ void testTTLConfigurationDisabledWithZeroTtl() throws Exception {
+ List<String> results = runScenario(0L, 50L, true, true);
+
+ assertEquals(List.of("event1|NEW", "event2|NEW", "event1|EXISTING"),
results);
+ }
+
+ @Test
+ void testTTLConfigurationDisabledByDefault() throws Exception {
+ List<String> results = runScenario(0L, 50L, false, true);
+
+ assertEquals(List.of("event1|NEW", "event2|NEW", "event1|EXISTING"),
results);
+ }
+
+ @Test
+ void testValueExpiresAfterTTL() throws Exception {
+ List<String> results = runScenario(50L, 200L, true, true);
+
+ assertEquals(List.of("event1|NEW", "event2|NEW", "event1|NEW"),
results);
+ }
+
+ @Test
+ void testTTLConfigurationAppliedWithDefaultUpdateTypeAndVisibility()
throws Exception {
+ List<String> results = runScenario(50L, 200L, true, false);
+
+ assertEquals(List.of("event1|NEW", "event2|NEW", "event1|NEW"),
results);
+ }
+
+ private static List<String> runScenario(
+ long ttlMs, long sleepMs, boolean configureTtlMs, boolean
configureTtlOptions)
+ throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ AgentsExecutionEnvironment agentEnv =
+ AgentsExecutionEnvironment.getExecutionEnvironment(env);
+ AgentConfiguration agentsConfig = (AgentConfiguration)
agentEnv.getConfig();
+ if (configureTtlMs) {
+
agentsConfig.set(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS, ttlMs);
+ }
+ if (configureTtlOptions) {
+ agentsConfig.set(
+
AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE,
+ ShortTermMemoryTtlUpdate.ON_CREATE_AND_WRITE);
+ agentsConfig.set(
+
AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY,
+ ShortTermMemoryTtlVisibility.NEVER_RETURN_EXPIRED);
+ }
+
+ List<TestInput> testData = new ArrayList<>();
+ testData.add(new TestInput("event1", sleepMs));
+ testData.add(new TestInput("event2", sleepMs));
+ testData.add(new TestInput("event1", sleepMs));
+
+ DataStream<TestInput> inputStream = env.fromCollection(testData);
+ DataStream<Object> outputStream =
+ agentEnv.fromDataStream(inputStream, x -> MEMORY_KEY)
+ .apply(new TTLTestAgent())
+ .toDataStream();
+
+ List<String> results = new ArrayList<>();
+
outputStream.map(Object::toString).executeAndCollect().forEachRemaining(results::add);
+ return results;
+ }
+}