weiqingy commented on code in PR #657: URL: https://github.com/apache/flink-agents/pull/657#discussion_r3300029799
########## runtime/src/test/java/org/apache/flink/agents/runtime/memory/ShortTermMemoryTTLIntegrationTest.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.runtime.memory; + +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.agents.Agent; +import org.apache.flink.agents.api.agents.AgentExecutionOptions; +import org.apache.flink.agents.api.annotation.Action; +import org.apache.flink.agents.api.context.MemoryObject; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.plan.AgentConfiguration; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Integration test for Short-Term Memory TTL functionality. */ +class ShortTermMemoryTTLIntegrationTest { + + private static final String MEMORY_KEY = "test_key"; + + private static final class TestInput { + public String eventKey; + public long sleepMs; + + private TestInput() {} + + private TestInput(String eventKey, long sleepMs) { + this.eventKey = eventKey; + this.sleepMs = sleepMs; + } + } + + public static class TTLTestAgent extends Agent { + + @Action(listenEventTypes = {InputEvent.EVENT_TYPE}) + public static void input(org.apache.flink.agents.api.Event event, RunnerContext ctx) + throws Exception { + InputEvent inputEvent = (InputEvent) event; + TestInput input = (TestInput) inputEvent.getInput(); + + MemoryObject shortTermMemory = ctx.getShortTermMemory(); + MemoryObject memoryObject = shortTermMemory.get(input.eventKey); + + Object existingValue = null; + int currentCount = 0; + if (memoryObject != null && !memoryObject.isNestedObject()) { + existingValue = memoryObject.getValue(); + if (existingValue instanceof Integer) { + currentCount = (Integer) existingValue; + } else if (existingValue instanceof Number) { + currentCount = ((Number) existingValue).intValue(); + } + } + + shortTermMemory.set(input.eventKey, currentCount + 1); + Thread.sleep(input.sleepMs); + ctx.sendEvent( + new OutputEvent( + input.eventKey + "|" + (existingValue == null ? "NEW" : "EXISTING"))); + } + } + + @Test + void testTTLConfigurationNotApplied() throws Exception { + List<String> results = runScenario(1000L, 0L); + + assertEquals(List.of("event1|NEW", "event2|NEW", "event1|EXISTING"), results); + } + + @Test + void testTTLConfigurationApplied() throws Exception { Review Comment: Two gaps in coverage worth closing while we're here: - **TTL disabled path.** No test exercises the early-return in `maybeEnableShortTermMemoryTTL` (`TTL_MS = 0` or unset). That branch is the default for every existing pipeline, so a regression that accidentally enables TTL with 0ms would be silent. A short scenario with `ttlMs = 0` confirming the third `event1` returns `EXISTING` regardless of sleep would cover it. - **Default update-type / visibility.** Both tests explicitly override the update-type to `OnCreateAndWrite`. The option defaults (`OnReadAndWrite` / `NeverReturnExpired`) are never actually exercised, so a regression in how the defaults are wired would slip through. One scenario that sets only `TTL_MS` and asserts the same expiry behavior would catch that. ########## integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java: ########## @@ -74,6 +74,7 @@ * public static ResourceDesc openAIResponses() { * return ResourceDescriptor.Builder.newBuilder(OpenAIResponsesModelConnection.class.getName()) * .addInitialArgument("api_key", System.getenv("OPENAI_API_KEY")) + * .addInitialArgument("api_base_url", System.getenv("OPENAI_API_URL")) Review Comment: This javadoc tweak isn't related to short-term-memory TTL. Mind splitting it out of this PR? Either it can ride as its own one-line hotfix, or it can be folded into the next OpenAI-touching change. Keeps the diff focused and the commit history honest — repo convention is one logical change per PR. ########## runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java: ########## @@ -121,6 +129,39 @@ void initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext PENDING_INPUT_EVENT_STATE_NAME, TypeInformation.of(Event.class))); } + /** + * When {@link AgentExecutionOptions#SHORT_TERM_MEMORY_STATE_TTL_MS} is positive, attaches Flink + * {@link StateTtlConfig} to the short-term memory {@link MapStateDescriptor}. Unset, null, or + * non-positive values disable TTL (Flink does not allow zero/negative TTL). Review Comment: Nice — recording *"Flink does not allow zero/negative TTL"* at the workaround site spares future maintainers a trip through Flink source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
