weiqingy commented on code in PR #657:
URL: https://github.com/apache/flink-agents/pull/657#discussion_r3300029799


##########
runtime/src/test/java/org/apache/flink/agents/runtime/memory/ShortTermMemoryTTLIntegrationTest.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.memory;
+
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.agents.api.agents.AgentExecutionOptions;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.context.MemoryObject;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Integration test for Short-Term Memory TTL functionality. */
+class ShortTermMemoryTTLIntegrationTest {
+
+    private static final String MEMORY_KEY = "test_key";
+
+    private static final class TestInput {
+        public String eventKey;
+        public long sleepMs;
+
+        private TestInput() {}
+
+        private TestInput(String eventKey, long sleepMs) {
+            this.eventKey = eventKey;
+            this.sleepMs = sleepMs;
+        }
+    }
+
+    public static class TTLTestAgent extends Agent {
+
+        @Action(listenEventTypes = {InputEvent.EVENT_TYPE})
+        public static void input(org.apache.flink.agents.api.Event event, 
RunnerContext ctx)
+                throws Exception {
+            InputEvent inputEvent = (InputEvent) event;
+            TestInput input = (TestInput) inputEvent.getInput();
+
+            MemoryObject shortTermMemory = ctx.getShortTermMemory();
+            MemoryObject memoryObject = shortTermMemory.get(input.eventKey);
+
+            Object existingValue = null;
+            int currentCount = 0;
+            if (memoryObject != null && !memoryObject.isNestedObject()) {
+                existingValue = memoryObject.getValue();
+                if (existingValue instanceof Integer) {
+                    currentCount = (Integer) existingValue;
+                } else if (existingValue instanceof Number) {
+                    currentCount = ((Number) existingValue).intValue();
+                }
+            }
+
+            shortTermMemory.set(input.eventKey, currentCount + 1);
+            Thread.sleep(input.sleepMs);
+            ctx.sendEvent(
+                    new OutputEvent(
+                            input.eventKey + "|" + (existingValue == null ? 
"NEW" : "EXISTING")));
+        }
+    }
+
+    @Test
+    void testTTLConfigurationNotApplied() throws Exception {
+        List<String> results = runScenario(1000L, 0L);
+
+        assertEquals(List.of("event1|NEW", "event2|NEW", "event1|EXISTING"), 
results);
+    }
+
+    @Test
+    void testTTLConfigurationApplied() throws Exception {

Review Comment:
   Two gaps in coverage worth closing while we're here:
   
   - **TTL disabled path.** No test exercises the early-return in 
`maybeEnableShortTermMemoryTTL` (`TTL_MS = 0` or unset). That branch is the 
default for every existing pipeline, so a regression that accidentally enables 
TTL with 0ms would be silent. A short scenario with `ttlMs = 0` confirming the 
third `event1` returns `EXISTING` regardless of sleep would cover it.
   - **Default update-type / visibility.** Both tests explicitly override the 
update-type to `OnCreateAndWrite`. The option defaults (`OnReadAndWrite` / 
`NeverReturnExpired`) are never actually exercised, so a regression in how the 
defaults are wired would slip through. One scenario that sets only `TTL_MS` and 
asserts the same expiry behavior would catch that.



##########
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java:
##########
@@ -74,6 +74,7 @@
  *   public static ResourceDesc openAIResponses() {
  *     return 
ResourceDescriptor.Builder.newBuilder(OpenAIResponsesModelConnection.class.getName())
  *             .addInitialArgument("api_key", System.getenv("OPENAI_API_KEY"))
+ *             .addInitialArgument("api_base_url", 
System.getenv("OPENAI_API_URL"))

Review Comment:
   This javadoc tweak isn't related to short-term-memory TTL. Mind splitting it 
out of this PR? Either it can ride as its own one-line hotfix, or it can be 
folded into the next OpenAI-touching change. Keeps the diff focused and the 
commit history honest — repo convention is one logical change per PR.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java:
##########
@@ -121,6 +129,39 @@ void 
initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext
                                 PENDING_INPUT_EVENT_STATE_NAME, 
TypeInformation.of(Event.class)));
     }
 
+    /**
+     * When {@link AgentExecutionOptions#SHORT_TERM_MEMORY_STATE_TTL_MS} is 
positive, attaches Flink
+     * {@link StateTtlConfig} to the short-term memory {@link 
MapStateDescriptor}. Unset, null, or
+     * non-positive values disable TTL (Flink does not allow zero/negative 
TTL).

Review Comment:
   Nice — recording *"Flink does not allow zero/negative TTL"* at the 
workaround site spares future maintainers a trip through Flink source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to