xintongsong commented on code in PR #67: URL: https://github.com/apache/flink-agents/pull/67#discussion_r2214799072
########## runtime/src/main/java/org/apache/flink/agents/runtime/memory/MemoryObjectImpl.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.runtime.memory; + +import org.apache.flink.agents.api.context.MemoryObject; +import org.apache.flink.api.common.state.MapState; + +import java.util.*; + +public class MemoryObjectImpl implements MemoryObject { + + public enum ItemType { + OBJECT, + VALUE + } Review Comment: Should be private ########## python/flink_agents/api/memoryobject.py: ########## @@ -0,0 +1,111 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from abc import ABC, abstractmethod +from typing import Any, Dict, List + +from pydantic import BaseModel + + +class MemoryObject(BaseModel, ABC): Review Comment: File name: memory_object ########## python/flink_agents/runtime/local_memory_object.py: ########## @@ -0,0 +1,229 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from typing import Any, ClassVar, Dict, List + +from flink_agents.api.memoryobject import MemoryObject + + +class LocalMemoryObject(MemoryObject): + """LocalMemoryObject: Flattened hierarchical key-value store for local + python execution. + + Each object keeps a prefix to represent its logical path to a flattened + key in the store. + """ + + ROOT_KEY: ClassVar[str] = "" + SEPARATOR: ClassVar[str] = "." + NESTED_MARK: ClassVar[str] = "NestedObject" + + _store: dict[str, Any] + _prefix: str Review Comment: Should these be private? ########## runtime/src/main/java/org/apache/flink/agents/runtime/memory/MemoryObjectImpl.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.runtime.memory; + +import org.apache.flink.agents.api.context.MemoryObject; +import org.apache.flink.api.common.state.MapState; + +import java.util.*; + +public class MemoryObjectImpl implements MemoryObject { + + public enum ItemType { + OBJECT, + VALUE + } + + public static final String ROOT_KEY = ""; + private static final String SEPARATOR = "."; + + private final MapState<String, MemoryItem> store; + private final String prefix; + + public MemoryObjectImpl(MapState<String, MemoryItem> store, String prefix) throws Exception { + this.store = store; + this.prefix = prefix; + if (!store.contains(ROOT_KEY)) { + store.put(ROOT_KEY, new MemoryItem()); + } + } + + @Override + public MemoryObject get(String path) throws Exception { + String absPath = fullPath(path); + if (store.contains(absPath)) { + return new MemoryObjectImpl(store, absPath); + } + return null; + } + + @Override + public void set(String path, Object value) throws Exception { + String absPath = fullPath(path); + String[] parts = absPath.split("\\."); + fillParents(parts); + + String parent = + absPath.contains(SEPARATOR) + ? absPath.substring(0, absPath.lastIndexOf(SEPARATOR)) + : ROOT_KEY; + MemoryItem parentItem = store.get(parent); + parentItem.getSubKeys().add(parts[parts.length - 1]); + store.put(parent, parentItem); + + MemoryItem existing = store.get(absPath); + if (existing != null && existing.getType() == ItemType.OBJECT) { + throw new IllegalArgumentException("Cannot overwrite object with value: " + absPath); + } + + MemoryItem val = new MemoryItem(value); + store.put(absPath, val); + } + + @Override + public MemoryObject newObject(String path, boolean overwrite) throws Exception { + String absPath = fullPath(path); + String[] parts = absPath.split("\\."); + + fillParents(parts); + + if (store.contains(absPath)) { + MemoryItem existing = store.get(absPath); + if (existing.getType() != ItemType.OBJECT) { + if (!overwrite) { + throw new IllegalArgumentException( + "Field '" + absPath + "' exists but is not an object."); + } + store.put(absPath, new MemoryItem()); + } + } else { + store.put(absPath, new MemoryItem()); + } + + String parent = + absPath.contains(SEPARATOR) + ? absPath.substring(0, absPath.lastIndexOf(SEPARATOR)) + : ROOT_KEY; + MemoryItem parentItem = store.get(parent); + parentItem.getSubKeys().add(parts[parts.length - 1]); + store.put(parent, parentItem); + + return new MemoryObjectImpl(store, absPath); + } + + @Override + public boolean isExist(String path) { + try { + return store.contains(fullPath(path)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public List<String> getFieldNames() throws Exception { + MemoryItem memItem = store.get(prefix); + if (memItem != null && memItem.getType() == ItemType.OBJECT) { + return new ArrayList<>(memItem.getSubKeys()); + } + return Collections.emptyList(); + } + + @Override + public Map<String, Object> getFields() throws Exception { + Map<String, Object> result = new HashMap<>(); + for (String name : getFieldNames()) { + String absPath = fullPath(name); + MemoryItem memItem = store.get(absPath); + if (memItem.getType() == ItemType.OBJECT) { + result.put(name, "NestedObject"); + } else { + result.put(name, memItem.getValue()); + } + } + return result; + } + + @Override + public boolean isNestedObject() throws Exception { + MemoryItem memItem = store.get(prefix); + return memItem != null && memItem.getType() == ItemType.OBJECT; + } + + @Override + public Object getValue() throws Exception { + MemoryItem memItem = store.get(prefix); + if (memItem != null && memItem.getType() == ItemType.VALUE) { + return memItem.getValue(); + } + return null; + } + + private String fullPath(String path) { + return (prefix.isEmpty() ? path : prefix + SEPARATOR + path); + } + + private void fillParents(String[] parts) throws Exception { + StringBuilder path = new StringBuilder(); + for (int i = 0; i < parts.length - 1; i++) { + if (i > 0) path.append(SEPARATOR); + path.append(parts[i]); + + String cur = path.toString(); + String parent = (i == 0) ? ROOT_KEY : path.substring(0, path.lastIndexOf(SEPARATOR)); + + if (!store.contains(cur)) { + store.put(cur, new MemoryItem()); // OBJECT node + } + if (!store.contains(parent)) { + store.put(parent, new MemoryItem()); // make sure parent exists + } + // update parent.subKeys + MemoryItem parentNode = store.get(parent); + parentNode.getSubKeys().add(parts[i]); + store.put(parent, parentNode); + } + } + + /** Represents an item (nested object or primitive value) stored in the short-term memory. */ + public static final class MemoryItem { + private final ItemType type; + private Object value; + private Set<String> subKeys; Review Comment: Should be final ########## python/flink_agents/runtime/local_memory_object.py: ########## @@ -0,0 +1,229 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from typing import Any, ClassVar, Dict, List + +from flink_agents.api.memoryobject import MemoryObject + + +class LocalMemoryObject(MemoryObject): + """LocalMemoryObject: Flattened hierarchical key-value store for local + python execution. + + Each object keeps a prefix to represent its logical path to a flattened + key in the store. + """ + + ROOT_KEY: ClassVar[str] = "" + SEPARATOR: ClassVar[str] = "." + NESTED_MARK: ClassVar[str] = "NestedObject" + + _store: dict[str, Any] + _prefix: str + + def __init__(self, store: Dict[str, Any], prefix: str = ROOT_KEY) -> None: + """Initialize a LocalMemoryObject. + + Parameters + ---------- + store : Dict[str, Any] + The dictionary used as the underlying storage. + prefix : str, default ROOT_KEY + Path prefix that identifies the current position of the object in the + shared store. + """ + super().__init__() + self._store = store if store is not None else {} + self._prefix = prefix + + if self.ROOT_KEY not in self._store: + self._store[self.ROOT_KEY] = _ObjMarker() + + def get(self, path: str) -> Any: + """Get the value of a (direct or indirect) field in the object. + + Parameters + ---------- + path: str + Relative path from the current object to the target field. + + Returns: + ------- + Any + If the field is a direct field, returns the concrete data stored. + If the field is an indirect field, another MemoryObject will be returned. + If the field doesn't exist, returns None. + """ + abs_path = self._full_path(path) + if abs_path in self._store: + value = self._store[abs_path] + if self._is_nested_object(value): + return LocalMemoryObject(self._store, abs_path) + return value + return None + + def set(self, path: str, value: Any) -> None: + """Set the value of a (direct or indirect) field in the object. + This will also create the intermediate objects if not exist. + + Parameters + ---------- + path: str + Relative path from the current object to the target field. + value: Any + New value of the field. + The type of the value must be either a primary type, or MemoryObject. + """ + if isinstance(value, LocalMemoryObject): + msg = "Do not set a MemoryObject instance directly; use new_object()." + raise TypeError(msg) + + abs_path = self._full_path(path) + parts = abs_path.split(self.SEPARATOR) + + self._fill_parents(parts) + + parent_path = ( + self.SEPARATOR.join(parts[:-1]) if len(parts) > 1 else self.ROOT_KEY + ) + self._add_subfield(parent_path, parts[-1]) + + if abs_path in self._store and self._is_nested_object(self._store[abs_path]): + msg = f"Cannot overwrite object field '{abs_path}' with primitive." + raise ValueError(msg) + + self._store[abs_path] = value + + def new_object(self, path: str, *, overwrite: bool = False) -> "LocalMemoryObject": + """Create a new object as the value of an indirect field in the object. + + Parameters + ---------- + path: str + Relative path from the current object to the target field. + + Returns: + ------- + MemoryObject + The created object. + """ + abs_path = self._full_path(path) + parts = abs_path.split(self.SEPARATOR) + + self._fill_parents(parts) + + parent_path = ( + self.SEPARATOR.join(parts[:-1]) if len(parts) > 1 else self.ROOT_KEY + ) Review Comment: This has been duplicated a lot. Maybe making it a util method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
