This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit b0e28b0dc20f9035663e1cf61c639d68dca571ec
Author: WenjinXie <[email protected]>
AuthorDate: Wed Nov 26 19:02:17 2025 +0800

    fixup! [api][runtime][java] Introduce sensory memory in java.
    
    store memory type to resolve memory ref
---
 .../flink/agents/api/context/MemoryObject.java     |  4 ++++
 .../apache/flink/agents/api/context/MemoryRef.java | 25 +++++++++++++++++++---
 .../agents/runtime/context/RunnerContextImpl.java  |  2 ++
 .../agents/runtime/memory/MemoryObjectImpl.java    | 15 ++++++++-----
 .../agents/runtime/memory/MemoryObjectTest.java    |  5 ++++-
 .../flink/agents/runtime/memory/MemoryRefTest.java |  3 ++-
 6 files changed, 44 insertions(+), 10 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/context/MemoryObject.java 
b/api/src/main/java/org/apache/flink/agents/api/context/MemoryObject.java
index 6d6af4b..78c325e 100644
--- a/api/src/main/java/org/apache/flink/agents/api/context/MemoryObject.java
+++ b/api/src/main/java/org/apache/flink/agents/api/context/MemoryObject.java
@@ -27,6 +27,10 @@ import java.util.Map;
  * nested object.Fields can be accessed using an absolute or relative path.
  */
 public interface MemoryObject {
+    enum MemoryType {
+        SENSORY,
+        SHORT_TERM
+    }
     /**
      * Returns a MemoryObject that represents the given path.
      *
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/context/MemoryRef.java 
b/api/src/main/java/org/apache/flink/agents/api/context/MemoryRef.java
index 219909a..f3ff87e 100644
--- a/api/src/main/java/org/apache/flink/agents/api/context/MemoryRef.java
+++ b/api/src/main/java/org/apache/flink/agents/api/context/MemoryRef.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.agents.api.context;
 
+import org.apache.flink.annotation.VisibleForTesting;
+
 import java.io.Serializable;
 import java.util.Objects;
 
@@ -28,9 +30,15 @@ import java.util.Objects;
 public final class MemoryRef implements Serializable {
     private static final long serialVersionUID = 1L;
 
+    private final MemoryObject.MemoryType type;
     private final String path;
 
     private MemoryRef(String path) {
+        this(MemoryObject.MemoryType.SHORT_TERM, path);
+    }
+
+    private MemoryRef(MemoryObject.MemoryType type, String path) {
+        this.type = type;
         this.path = path;
     }
 
@@ -40,6 +48,11 @@ public final class MemoryRef implements Serializable {
      * @param path The absolute path of the data in Short-Term Memory.
      * @return A new MemoryRef instance.
      */
+    public static MemoryRef create(MemoryObject.MemoryType type, String path) {
+        return new MemoryRef(type, path);
+    }
+
+    @VisibleForTesting
     public static MemoryRef create(String path) {
         return new MemoryRef(path);
     }
@@ -47,12 +60,18 @@ public final class MemoryRef implements Serializable {
     /**
      * Resolves the reference using the provided RunnerContext to get the 
actual data.
      *
-     * @param memory The memory this ref based on.
+     * @param ctx The current execution context, used to access Short-Term 
Memory.
      * @return The deserialized, original data object.
      * @throws Exception if the memory cannot be accessed or the data cannot 
be resolved.
      */
-    public MemoryObject resolve(MemoryObject memory) throws Exception {
-        return memory.get(this);
+    public MemoryObject resolve(RunnerContext ctx) throws Exception {
+        if (type.equals(MemoryObject.MemoryType.SENSORY)) {
+            return ctx.getSensoryMemory().get(this);
+        } else if (type.equals(MemoryObject.MemoryType.SHORT_TERM)) {
+            return ctx.getShortTermMemory().get(this);
+        } else {
+            throw new RuntimeException(String.format("Unknown memory type %s", 
type));
+        }
     }
 
     public String getPath() {
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index e02ec45..6321d98 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -131,6 +131,7 @@ public class RunnerContextImpl implements RunnerContext {
     public MemoryObject getSensoryMemory() throws Exception {
         mailboxThreadChecker.run();
         return new MemoryObjectImpl(
+                MemoryObject.MemoryType.SENSORY,
                 sensoryMemStore,
                 MemoryObjectImpl.ROOT_KEY,
                 mailboxThreadChecker,
@@ -141,6 +142,7 @@ public class RunnerContextImpl implements RunnerContext {
     public MemoryObject getShortTermMemory() throws Exception {
         mailboxThreadChecker.run();
         return new MemoryObjectImpl(
+                MemoryObject.MemoryType.SHORT_TERM,
                 shortTermMemStore,
                 MemoryObjectImpl.ROOT_KEY,
                 mailboxThreadChecker,
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/memory/MemoryObjectImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/memory/MemoryObjectImpl.java
index a8fa1cb..b9550a6 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/memory/MemoryObjectImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/memory/MemoryObjectImpl.java
@@ -40,22 +40,27 @@ public class MemoryObjectImpl implements MemoryObject {
     public static final String ROOT_KEY = "";
     private static final String SEPARATOR = ".";
 
+    private final MemoryType type;
+
     private final MemoryStore store;
     private final List<MemoryUpdate> memoryUpdates;
     private final String prefix;
     private final Runnable mailboxThreadChecker;
 
-    public MemoryObjectImpl(MemoryStore store, String prefix, 
List<MemoryUpdate> memoryUpdates)
+    public MemoryObjectImpl(
+            MemoryType type, MemoryStore store, String prefix, 
List<MemoryUpdate> memoryUpdates)
             throws Exception {
-        this(store, prefix, () -> {}, memoryUpdates);
+        this(type, store, prefix, () -> {}, memoryUpdates);
     }
 
     public MemoryObjectImpl(
+            MemoryType type,
             MemoryStore store,
             String prefix,
             Runnable mailboxThreadChecker,
             List<MemoryUpdate> memoryUpdates)
             throws Exception {
+        this.type = type;
         this.store = store;
         this.prefix = prefix;
         this.mailboxThreadChecker = mailboxThreadChecker;
@@ -70,7 +75,7 @@ public class MemoryObjectImpl implements MemoryObject {
         mailboxThreadChecker.run();
         String absPath = fullPath(path);
         if (store.contains(absPath)) {
-            return new MemoryObjectImpl(store, absPath, memoryUpdates);
+            return new MemoryObjectImpl(type, store, absPath, memoryUpdates);
         }
         return null;
     }
@@ -104,7 +109,7 @@ public class MemoryObjectImpl implements MemoryObject {
         store.put(absPath, val);
         memoryUpdates.add(new MemoryUpdate(absPath, value));
 
-        return MemoryRef.create(absPath);
+        return MemoryRef.create(type, absPath);
     }
 
     @Override
@@ -137,7 +142,7 @@ public class MemoryObjectImpl implements MemoryObject {
         parentItem.getSubKeys().add(parts[parts.length - 1]);
         store.put(parent, parentItem);
 
-        return new MemoryObjectImpl(store, absPath, memoryUpdates);
+        return new MemoryObjectImpl(type, store, absPath, memoryUpdates);
     }
 
     @Override
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryObjectTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryObjectTest.java
index 970fe32..07949a6 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryObjectTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryObjectTest.java
@@ -63,7 +63,10 @@ public class MemoryObjectTest {
         memoryUpdates = new LinkedList<>();
         memory =
                 new MemoryObjectImpl(
-                        new CachedMemoryStore(mapState), 
MemoryObjectImpl.ROOT_KEY, memoryUpdates);
+                        MemoryObject.MemoryType.SHORT_TERM,
+                        new CachedMemoryStore(mapState),
+                        MemoryObjectImpl.ROOT_KEY,
+                        memoryUpdates);
     }
 
     @Test
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java
index 46a68d9..8a0e02e 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java
@@ -117,6 +117,7 @@ public class MemoryRefTest {
         ForTestMemoryMapState<MemoryObjectImpl.MemoryItem> mapState = new 
ForTestMemoryMapState<>();
         memory =
                 new MemoryObjectImpl(
+                        MemoryObject.MemoryType.SHORT_TERM,
                         new CachedMemoryStore(mapState),
                         MemoryObjectImpl.ROOT_KEY,
                         new LinkedList<>());
@@ -181,7 +182,7 @@ public class MemoryRefTest {
         for (Map.Entry<String, Object> entry : testData.entrySet()) {
             MemoryRef ref = memory.set(entry.getKey(), entry.getValue());
 
-            Object resolvedValue = 
ref.resolve(ctx.getShortTermMemory()).getValue();
+            Object resolvedValue = ref.resolve(ctx).getValue();
             assertEquals(entry.getValue(), resolvedValue);
         }
     }

Reply via email to