This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit b0e28b0dc20f9035663e1cf61c639d68dca571ec Author: WenjinXie <[email protected]> AuthorDate: Wed Nov 26 19:02:17 2025 +0800 fixup! [api][runtime][java] Introduce sensory memory in java. store memory type to resolve memory ref --- .../flink/agents/api/context/MemoryObject.java | 4 ++++ .../apache/flink/agents/api/context/MemoryRef.java | 25 +++++++++++++++++++--- .../agents/runtime/context/RunnerContextImpl.java | 2 ++ .../agents/runtime/memory/MemoryObjectImpl.java | 15 ++++++++----- .../agents/runtime/memory/MemoryObjectTest.java | 5 ++++- .../flink/agents/runtime/memory/MemoryRefTest.java | 3 ++- 6 files changed, 44 insertions(+), 10 deletions(-) diff --git a/api/src/main/java/org/apache/flink/agents/api/context/MemoryObject.java b/api/src/main/java/org/apache/flink/agents/api/context/MemoryObject.java index 6d6af4b..78c325e 100644 --- a/api/src/main/java/org/apache/flink/agents/api/context/MemoryObject.java +++ b/api/src/main/java/org/apache/flink/agents/api/context/MemoryObject.java @@ -27,6 +27,10 @@ import java.util.Map; * nested object.Fields can be accessed using an absolute or relative path. */ public interface MemoryObject { + enum MemoryType { + SENSORY, + SHORT_TERM + } /** * Returns a MemoryObject that represents the given path. * diff --git a/api/src/main/java/org/apache/flink/agents/api/context/MemoryRef.java b/api/src/main/java/org/apache/flink/agents/api/context/MemoryRef.java index 219909a..f3ff87e 100644 --- a/api/src/main/java/org/apache/flink/agents/api/context/MemoryRef.java +++ b/api/src/main/java/org/apache/flink/agents/api/context/MemoryRef.java @@ -17,6 +17,8 @@ */ package org.apache.flink.agents.api.context; +import org.apache.flink.annotation.VisibleForTesting; + import java.io.Serializable; import java.util.Objects; @@ -28,9 +30,15 @@ import java.util.Objects; public final class MemoryRef implements Serializable { private static final long serialVersionUID = 1L; + private final MemoryObject.MemoryType type; private final String path; private MemoryRef(String path) { + this(MemoryObject.MemoryType.SHORT_TERM, path); + } + + private MemoryRef(MemoryObject.MemoryType type, String path) { + this.type = type; this.path = path; } @@ -40,6 +48,11 @@ public final class MemoryRef implements Serializable { * @param path The absolute path of the data in Short-Term Memory. * @return A new MemoryRef instance. */ + public static MemoryRef create(MemoryObject.MemoryType type, String path) { + return new MemoryRef(type, path); + } + + @VisibleForTesting public static MemoryRef create(String path) { return new MemoryRef(path); } @@ -47,12 +60,18 @@ public final class MemoryRef implements Serializable { /** * Resolves the reference using the provided RunnerContext to get the actual data. * - * @param memory The memory this ref based on. + * @param ctx The current execution context, used to access Short-Term Memory. * @return The deserialized, original data object. * @throws Exception if the memory cannot be accessed or the data cannot be resolved. */ - public MemoryObject resolve(MemoryObject memory) throws Exception { - return memory.get(this); + public MemoryObject resolve(RunnerContext ctx) throws Exception { + if (type.equals(MemoryObject.MemoryType.SENSORY)) { + return ctx.getSensoryMemory().get(this); + } else if (type.equals(MemoryObject.MemoryType.SHORT_TERM)) { + return ctx.getShortTermMemory().get(this); + } else { + throw new RuntimeException(String.format("Unknown memory type %s", type)); + } } public String getPath() { diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java index e02ec45..6321d98 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java @@ -131,6 +131,7 @@ public class RunnerContextImpl implements RunnerContext { public MemoryObject getSensoryMemory() throws Exception { mailboxThreadChecker.run(); return new MemoryObjectImpl( + MemoryObject.MemoryType.SENSORY, sensoryMemStore, MemoryObjectImpl.ROOT_KEY, mailboxThreadChecker, @@ -141,6 +142,7 @@ public class RunnerContextImpl implements RunnerContext { public MemoryObject getShortTermMemory() throws Exception { mailboxThreadChecker.run(); return new MemoryObjectImpl( + MemoryObject.MemoryType.SHORT_TERM, shortTermMemStore, MemoryObjectImpl.ROOT_KEY, mailboxThreadChecker, diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/memory/MemoryObjectImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/memory/MemoryObjectImpl.java index a8fa1cb..b9550a6 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/memory/MemoryObjectImpl.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/memory/MemoryObjectImpl.java @@ -40,22 +40,27 @@ public class MemoryObjectImpl implements MemoryObject { public static final String ROOT_KEY = ""; private static final String SEPARATOR = "."; + private final MemoryType type; + private final MemoryStore store; private final List<MemoryUpdate> memoryUpdates; private final String prefix; private final Runnable mailboxThreadChecker; - public MemoryObjectImpl(MemoryStore store, String prefix, List<MemoryUpdate> memoryUpdates) + public MemoryObjectImpl( + MemoryType type, MemoryStore store, String prefix, List<MemoryUpdate> memoryUpdates) throws Exception { - this(store, prefix, () -> {}, memoryUpdates); + this(type, store, prefix, () -> {}, memoryUpdates); } public MemoryObjectImpl( + MemoryType type, MemoryStore store, String prefix, Runnable mailboxThreadChecker, List<MemoryUpdate> memoryUpdates) throws Exception { + this.type = type; this.store = store; this.prefix = prefix; this.mailboxThreadChecker = mailboxThreadChecker; @@ -70,7 +75,7 @@ public class MemoryObjectImpl implements MemoryObject { mailboxThreadChecker.run(); String absPath = fullPath(path); if (store.contains(absPath)) { - return new MemoryObjectImpl(store, absPath, memoryUpdates); + return new MemoryObjectImpl(type, store, absPath, memoryUpdates); } return null; } @@ -104,7 +109,7 @@ public class MemoryObjectImpl implements MemoryObject { store.put(absPath, val); memoryUpdates.add(new MemoryUpdate(absPath, value)); - return MemoryRef.create(absPath); + return MemoryRef.create(type, absPath); } @Override @@ -137,7 +142,7 @@ public class MemoryObjectImpl implements MemoryObject { parentItem.getSubKeys().add(parts[parts.length - 1]); store.put(parent, parentItem); - return new MemoryObjectImpl(store, absPath, memoryUpdates); + return new MemoryObjectImpl(type, store, absPath, memoryUpdates); } @Override diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryObjectTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryObjectTest.java index 970fe32..07949a6 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryObjectTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryObjectTest.java @@ -63,7 +63,10 @@ public class MemoryObjectTest { memoryUpdates = new LinkedList<>(); memory = new MemoryObjectImpl( - new CachedMemoryStore(mapState), MemoryObjectImpl.ROOT_KEY, memoryUpdates); + MemoryObject.MemoryType.SHORT_TERM, + new CachedMemoryStore(mapState), + MemoryObjectImpl.ROOT_KEY, + memoryUpdates); } @Test diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java index 46a68d9..8a0e02e 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java @@ -117,6 +117,7 @@ public class MemoryRefTest { ForTestMemoryMapState<MemoryObjectImpl.MemoryItem> mapState = new ForTestMemoryMapState<>(); memory = new MemoryObjectImpl( + MemoryObject.MemoryType.SHORT_TERM, new CachedMemoryStore(mapState), MemoryObjectImpl.ROOT_KEY, new LinkedList<>()); @@ -181,7 +182,7 @@ public class MemoryRefTest { for (Map.Entry<String, Object> entry : testData.entrySet()) { MemoryRef ref = memory.set(entry.getKey(), entry.getValue()); - Object resolvedValue = ref.resolve(ctx.getShortTermMemory()).getValue(); + Object resolvedValue = ref.resolve(ctx).getValue(); assertEquals(entry.getValue(), resolvedValue); } }
