This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 42aea1c [api][runtime] Introduce configuration mechanism (#122)
42aea1c is described below
commit 42aea1cc26cba4704eef33c21f55149ebdc6aa1b
Author: Eugene <[email protected]>
AuthorDate: Thu Sep 4 15:20:38 2025 +0800
[api][runtime] Introduce configuration mechanism (#122)
---
.../agents/api/AgentsExecutionEnvironment.java | 8 +
.../api/configuration/AgentConfigOptions.java | 26 +++
.../agents/api/configuration/ConfigOption.java | 102 +++++++++
.../agents/api/configuration/Configuration.java | 24 +++
.../api/configuration/ReadableConfiguration.java | 84 ++++++++
.../api/configuration/WritableConfiguration.java | 77 +++++++
.../flink/agents/api/context/RunnerContext.java | 8 +
.../test-scripts/test_java_config_in_python.sh | 40 ++++
.../flink/agents/plan/AgentConfiguration.java | 176 ++++++++++++++++
.../org/apache/flink/agents/plan/AgentPlan.java | 30 +++
.../plan/serializer/AgentPlanJsonDeserializer.java | 16 +-
.../plan/serializer/AgentPlanJsonSerializer.java | 19 ++
.../flink/agents/plan/AgentConfigurationTest.java | 218 ++++++++++++++++++++
.../serializer/AgentPlanJsonDeserializerTest.java | 10 +
.../serializer/AgentPlanJsonSerializerTest.java | 11 +-
.../src/test/resources/agent_plans/agent_plan.json | 8 +
python/flink_agents/api/configuration.py | 169 +++++++++++++++
python/flink_agents/api/core_options.py | 70 +++++++
python/flink_agents/api/execution_environment.py | 11 +
python/flink_agents/api/runner_context.py | 11 +
python/flink_agents/plan/agent_plan.py | 7 +-
python/flink_agents/plan/configuration.py | 173 ++++++++++++++++
python/flink_agents/plan/resource_provider.py | 26 ++-
...py => create_python_option_from_java_option.py} | 35 ++--
.../compatibility/generate_agent_plan_json.py | 3 +-
.../plan/tests/resources/agent_plan.json | 5 +
python/flink_agents/plan/tests/test_agent_plan.py | 11 +-
.../flink_agents/plan/tests/test_configuration.py | 228 +++++++++++++++++++++
.../flink_agents/runtime/flink_runner_context.py | 12 ++
.../runtime/local_execution_environment.py | 19 +-
python/flink_agents/runtime/local_runner.py | 19 +-
.../runtime/remote_execution_environment.py | 29 ++-
python/pyproject.toml | 1 +
.../agents/runtime/context/RunnerContextImpl.java | 6 +
.../runtime/env/RemoteExecutionEnvironment.java | 49 ++++-
.../env/RemoteExecutionEnvironmentTest.java | 77 +++++++
.../flink/agents/runtime/memory/MemoryRefTest.java | 6 +
tools/e2e.sh | 1 +
38 files changed, 1777 insertions(+), 48 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
index e01e0a4..efac149 100644
---
a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
+++
b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
@@ -18,6 +18,7 @@
package org.apache.flink.agents.api;
+import org.apache.flink.agents.api.configuration.Configuration;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -86,6 +87,13 @@ public abstract class AgentsExecutionEnvironment {
return getExecutionEnvironment(null);
}
+ /**
+ * Returns a writable configuration object for setting configuration
values.
+ *
+ * @return the WritableConfiguration instance used to modify configuration
settings
+ */
+ public abstract Configuration getConfig();
+
/**
* Set input for agents from a list. Used for local execution.
*
diff --git
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
new file mode 100644
index 0000000..49e476d
--- /dev/null
+++
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.configuration;
+
+/** The set of configuration options for agents parameters. */
+public class AgentConfigOptions {
+
+ /** The config parameter specifies the directory for the FileEvent file. */
+ public static final ConfigOption<String> BASE_LOG_DIR =
+ new ConfigOption<>("baseLogDir", String.class, null);
+}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/configuration/ConfigOption.java
b/api/src/main/java/org/apache/flink/agents/api/configuration/ConfigOption.java
new file mode 100644
index 0000000..b43688e
--- /dev/null
+++
b/api/src/main/java/org/apache/flink/agents/api/configuration/ConfigOption.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.configuration;
+
+import java.util.Objects;
+
+/** A {@code ConfigOption} describes a configuration parameter. */
+public class ConfigOption<T> {
+ private final String key;
+ private final Class<T> type;
+ private final T defaultValue;
+
+ /**
+ * Constructs a new configuration option.
+ *
+ * @param key The configuration key name
+ * @param type The expected type of the configuration value
+ * @param defaultValue The default value for this configuration option
(can be null)
+ */
+ public ConfigOption(String key, Class<T> type, T defaultValue) {
+ this.key = Objects.requireNonNull(key);
+ this.type = Objects.requireNonNull(type);
+ this.defaultValue = defaultValue;
+ }
+
+ /**
+ * Gets the configuration key.
+ *
+ * @return the key
+ */
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * Gets the expected type of the configuration value.
+ *
+ * @return the type
+ */
+ public Class<T> getType() {
+ return type;
+ }
+
+ /**
+ * Gets the expected type name of the configuration value.
+ *
+ * @return the type name
+ */
+ public String getTypeName() {
+ return type.getTypeName();
+ }
+
+ /**
+ * Gets the default value of this configuration option.
+ *
+ * @return the default value, or null if not set
+ */
+ public T getDefaultValue() {
+ return defaultValue;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ConfigOption<?> that = (ConfigOption<?>) o;
+ return key.equals(that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return key.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "ConfigOption{"
+ + "key='"
+ + key
+ + '\''
+ + ", type="
+ + type.getName()
+ + ", defaultValue="
+ + defaultValue
+ + '}';
+ }
+}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/configuration/Configuration.java
b/api/src/main/java/org/apache/flink/agents/api/configuration/Configuration.java
new file mode 100644
index 0000000..743a939
--- /dev/null
+++
b/api/src/main/java/org/apache/flink/agents/api/configuration/Configuration.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.configuration;
+
+/**
+ * A configuration interface that provides both read and write access.
Combines the capabilities of
+ * {@link ReadableConfiguration} and {@link WritableConfiguration}.
+ */
+public interface Configuration extends ReadableConfiguration,
WritableConfiguration {}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/configuration/ReadableConfiguration.java
b/api/src/main/java/org/apache/flink/agents/api/configuration/ReadableConfiguration.java
new file mode 100644
index 0000000..8287629
--- /dev/null
+++
b/api/src/main/java/org/apache/flink/agents/api/configuration/ReadableConfiguration.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.configuration;
+
+/** Read access to a configuration object. */
+public interface ReadableConfiguration {
+ /**
+ * Get the integer configuration value by key.
+ *
+ * @param key The configuration key to retrieve
+ * @param defaultValue The default value to return if key is not found
+ * @return The integer value associated with the key or the default value
+ */
+ Integer getInt(String key, Integer defaultValue);
+
+ /**
+ * Get the long configuration value by key.
+ *
+ * @param key The configuration key to retrieve
+ * @param defaultValue The default value to return if key is not found
+ * @return The long value associated with the key or the default value
+ */
+ Long getLong(String key, Long defaultValue);
+
+ /**
+ * Get the float configuration value by key.
+ *
+ * @param key The configuration key to retrieve
+ * @param defaultValue The default value to return if key is not found
+ * @return The float value associated with the key or the default value
+ */
+ Float getFloat(String key, Float defaultValue);
+
+ /**
+ * Get the double configuration value by key.
+ *
+ * @param key The configuration key to retrieve
+ * @param defaultValue The default value to return if key is not found
+ * @return The double value associated with the key or the default value
+ */
+ Double getDouble(String key, Double defaultValue);
+
+ /**
+ * Get the boolean configuration value by key.
+ *
+ * @param key The configuration key to retrieve
+ * @param defaultValue The default value to return if key is not found
+ * @return The boolean value associated with the key or the default value
+ */
+ Boolean getBool(String key, Boolean defaultValue);
+
+ /**
+ * Get the string configuration value by key.
+ *
+ * @param key The configuration key to retrieve
+ * @param defaultValue The default value to return if key is not found
+ * @return The string value associated with the key or the default value
+ */
+ String getStr(String key, String defaultValue);
+
+ /**
+ * Get the configuration value by ConfigOption.
+ *
+ * @param option The metadata of the option to read
+ * @param <T> The type of the configuration value
+ * @return The value of the given option
+ */
+ <T> T get(ConfigOption<T> option);
+}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/configuration/WritableConfiguration.java
b/api/src/main/java/org/apache/flink/agents/api/configuration/WritableConfiguration.java
new file mode 100644
index 0000000..1c10165
--- /dev/null
+++
b/api/src/main/java/org/apache/flink/agents/api/configuration/WritableConfiguration.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.configuration;
+
+/** Write access to a configuration object. */
+public interface WritableConfiguration {
+ /**
+ * Set the string configuration value using the key.
+ *
+ * @param key The configuration key to set
+ * @param value The string value to set for the key
+ */
+ void setStr(String key, String value);
+
+ /**
+ * Set the integer configuration value using the key.
+ *
+ * @param key The configuration key to set
+ * @param value The integer value to set for the key
+ */
+ void setInt(String key, int value);
+
+ /**
+ * Set the long configuration value using the key.
+ *
+ * @param key The configuration key to set
+ * @param value The long value to set for the key
+ */
+ void setLong(String key, long value);
+
+ /**
+ * Set the float configuration value using the key.
+ *
+ * @param key The configuration key to set
+ * @param value The float value to set for the key
+ */
+ void setFloat(String key, float value);
+
+ /**
+ * Set the double configuration value using the key.
+ *
+ * @param key The configuration key to set
+ * @param value The double value to set for the key
+ */
+ void setDouble(String key, double value);
+
+ /**
+ * Set the boolean configuration value using the key.
+ *
+ * @param key The configuration key to set
+ * @param value The boolean value to set for the key
+ */
+ void setBool(String key, boolean value);
+
+ /**
+ * Set the configuration value using the ConfigOption.
+ *
+ * @param option The config option to set
+ * @param value The value to set for the key
+ */
+ <T> void set(ConfigOption<T> option, T value);
+}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java
b/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java
index 5776b09..67bf13a 100644
--- a/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java
+++ b/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java
@@ -18,6 +18,7 @@
package org.apache.flink.agents.api.context;
import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.configuration.ReadableConfiguration;
import org.apache.flink.agents.api.metrics.FlinkAgentsMetricGroup;
import org.apache.flink.agents.api.resource.Resource;
import org.apache.flink.agents.api.resource.ResourceType;
@@ -65,4 +66,11 @@ public interface RunnerContext {
* @throws Exception if the resource cannot be found or created
*/
Resource getResource(String name, ResourceType type) throws Exception;
+
+ /**
+ * Gets the configuration for Flink Agents.
+ *
+ * @return the configuration for Flink Agents.
+ */
+ ReadableConfiguration getConfig();
}
diff --git a/e2e-test/test-scripts/test_java_config_in_python.sh
b/e2e-test/test-scripts/test_java_config_in_python.sh
new file mode 100644
index 0000000..5302283
--- /dev/null
+++ b/e2e-test/test-scripts/test_java_config_in_python.sh
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+root_dir=$(pwd)
+
+echo $root_dir
+
+python_script_path=$root_dir/python/flink_agents/plan/tests/compatibility
+
+function test_create_python_option_from_java_option {
+ python $python_script_path/create_python_option_from_java_option.py
+
+ ret=$?
+ if [ "$ret" != "0" ]
+ then
+ echo "There is failure when create python option from java option, please
check the log for details."
+ rm -f $json_path
+ exit $ret
+ fi
+
+ rm -f $json_path
+}
+
+test_create_python_option_from_java_option
\ No newline at end of file
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/AgentConfiguration.java
b/plan/src/main/java/org/apache/flink/agents/plan/AgentConfiguration.java
new file mode 100644
index 0000000..34d6efa
--- /dev/null
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentConfiguration.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.plan;
+
+import org.apache.flink.agents.api.configuration.ConfigOption;
+import org.apache.flink.agents.api.configuration.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Agent configuration which stores key/value pairs. */
+public class AgentConfiguration implements Configuration {
+ private final Map<String, Object> confData;
+
+ public AgentConfiguration() {
+ this.confData = new HashMap<>();
+ }
+
+ public AgentConfiguration(Map<String, Object> confData) {
+ this.confData = flatten(confData, "", ".");
+ }
+
+ public Map<String, Object> getConfData() {
+ return confData;
+ }
+
+ @Override
+ public void setStr(String key, String value) {
+ confData.put(key, value);
+ }
+
+ @Override
+ public void setInt(String key, int value) {
+ confData.put(key, value);
+ }
+
+ @Override
+ public void setLong(String key, long value) {
+ confData.put(key, value);
+ }
+
+ @Override
+ public void setFloat(String key, float value) {
+ confData.put(key, value);
+ }
+
+ @Override
+ public void setDouble(String key, double value) {
+ confData.put(key, value);
+ }
+
+ @Override
+ public void setBool(String key, boolean value) {
+ confData.put(key, value);
+ }
+
+ @Override
+ public <T> void set(ConfigOption<T> option, T value) {
+ if (value == null && option.getDefaultValue() != null) {
+ return;
+ }
+ confData.put(option.getKey(), value);
+ }
+
+ @Override
+ public Integer getInt(String key, Integer defaultValue) {
+ return Optional.ofNullable(confData.get(key))
+ .map(Object::toString)
+ .map(Integer::parseInt)
+ .orElse(defaultValue);
+ }
+
+ @Override
+ public Long getLong(String key, Long defaultValue) {
+ return Optional.ofNullable(confData.get(key))
+ .map(Object::toString)
+ .map(Long::parseLong)
+ .orElse(defaultValue);
+ }
+
+ @Override
+ public Float getFloat(String key, Float defaultValue) {
+ return Optional.ofNullable(confData.get(key))
+ .map(Object::toString)
+ .map(Float::parseFloat)
+ .orElse(defaultValue);
+ }
+
+ @Override
+ public Double getDouble(String key, Double defaultValue) {
+ return Optional.ofNullable(confData.get(key))
+ .map(Object::toString)
+ .map(Double::parseDouble)
+ .orElse(defaultValue);
+ }
+
+ @Override
+ public Boolean getBool(String key, Boolean defaultValue) {
+ return Optional.ofNullable(confData.get(key))
+ .map(Object::toString)
+ .map(Boolean::valueOf)
+ .orElse(defaultValue);
+ }
+
+ @Override
+ public String getStr(String key, String defaultValue) {
+ return
Optional.ofNullable(confData.get(key)).map(Object::toString).orElse(defaultValue);
+ }
+
+ @Override
+ public <T> T get(ConfigOption<T> option) {
+ Object rawValue = confData.get(option.getKey());
+ if (rawValue == null) {
+ return option.getDefaultValue();
+ }
+
+ Class<T> targetType = option.getType();
+
+ if (targetType.isAssignableFrom(rawValue.getClass())) {
+ return targetType.cast(rawValue);
+ } else if (String.class.equals(targetType)) {
+ return targetType.cast(rawValue.toString());
+ } else if (Integer.class.equals(targetType)) {
+ return targetType.cast(Integer.parseInt(rawValue.toString()));
+ } else if (Long.class.equals(targetType)) {
+ return targetType.cast(Long.parseLong(rawValue.toString()));
+ } else if (Float.class.equals(targetType)) {
+ return targetType.cast(Float.parseFloat(rawValue.toString()));
+ } else if (Double.class.equals(targetType)) {
+ return targetType.cast(Double.parseDouble(rawValue.toString()));
+ } else if (Boolean.class.equals(targetType)) {
+ return targetType.cast(Boolean.parseBoolean(rawValue.toString()));
+ } else {
+ throw new ClassCastException(
+ "Unsupported type conversion from "
+ + rawValue.getClass().getName()
+ + " to "
+ + targetType.getName());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map<String, Object> flatten(
+ Map<String, Object> config, String keyPrefix, String keySeparator)
{
+ final Map<String, Object> flattenedMap = new HashMap<>();
+
+ config.forEach(
+ (key, value) -> {
+ String flattenedKey = keyPrefix + key;
+ if (value instanceof Map) {
+ Map<String, Object> e = (Map<String, Object>) value;
+ flattenedMap.putAll(flatten(e, flattenedKey +
keySeparator, keySeparator));
+ } else {
+ flattenedMap.put(flattenedKey, value);
+ }
+ });
+
+ return flattenedMap;
+ }
+}
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index b3bab29..3efee14 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -61,6 +61,8 @@ public class AgentPlan implements Serializable {
/** Two-level mapping of resource type to resource name to resource
provider. */
private Map<ResourceType, Map<String, ResourceProvider>> resourceProviders;
+ private AgentConfiguration config;
+
/** Cache for instantiated resources. */
private transient Map<ResourceType, Map<String, Resource>> resourceCache;
@@ -68,6 +70,7 @@ public class AgentPlan implements Serializable {
this.actions = actions;
this.actionsByEvent = actionsByEvent;
this.resourceProviders = new HashMap<>();
+ this.config = new AgentConfiguration();
this.resourceCache = new ConcurrentHashMap<>();
}
@@ -79,6 +82,19 @@ public class AgentPlan implements Serializable {
this.actionsByEvent = actionsByEvent;
this.resourceProviders = resourceProviders;
this.resourceCache = new ConcurrentHashMap<>();
+ this.config = new AgentConfiguration();
+ }
+
+ public AgentPlan(
+ Map<String, Action> actions,
+ Map<String, List<Action>> actionsByEvent,
+ Map<ResourceType, Map<String, ResourceProvider>> resourceProviders,
+ AgentConfiguration config) {
+ this.actions = actions;
+ this.actionsByEvent = actionsByEvent;
+ this.resourceProviders = resourceProviders;
+ this.resourceCache = new ConcurrentHashMap<>();
+ this.config = config;
}
/**
@@ -89,9 +105,14 @@ public class AgentPlan implements Serializable {
* @throws Exception if there's an error creating actions from the agent
*/
public AgentPlan(Agent agent) throws Exception {
+ this(agent, new AgentConfiguration());
+ }
+
+ public AgentPlan(Agent agent, AgentConfiguration config) throws Exception {
this(new HashMap<>(), new HashMap<>());
extractActionsFromAgent(agent);
extractResourceProvidersFromAgent(agent);
+ this.config = config;
}
public Map<String, Action> getActions() {
@@ -141,6 +162,14 @@ public class AgentPlan implements Serializable {
return resource;
}
+ public AgentConfiguration getConfig() {
+ return config;
+ }
+
+ public Map<String, Object> getConfigData() {
+ return config.getConfData();
+ }
+
private void writeObject(ObjectOutputStream out) throws IOException {
String serializedStr = new ObjectMapper().writeValueAsString(this);
out.writeUTF(serializedStr);
@@ -152,6 +181,7 @@ public class AgentPlan implements Serializable {
this.actions = agentPlan.getActions();
this.actionsByEvent = agentPlan.getActionsByEvent();
this.resourceProviders = agentPlan.getResourceProviders();
+ this.config = agentPlan.getConfig();
this.resourceCache = new ConcurrentHashMap<>();
}
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializer.java
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializer.java
index f278678..5b8e8f8 100644
---
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializer.java
+++
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.agents.plan.serializer;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.plan.Action;
+import org.apache.flink.agents.plan.AgentConfiguration;
import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JacksonException;
@@ -29,6 +30,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.Deseriali
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import java.io.IOException;
@@ -119,6 +121,18 @@ public class AgentPlanJsonDeserializer extends
StdDeserializer<AgentPlan> {
}
}
- return new AgentPlan(actions, actionsByEvent, resourceProviders);
+ // Deserialize config data
+ JsonNode configNode = node.get("config");
+ Map<String, Object> configData = new HashMap<>();
+ if (configNode != null && configNode.isObject()) {
+ JsonNode configDataNode = configNode.get("conf_data");
+ if (configDataNode != null && configDataNode.isObject()) {
+ ObjectMapper mapper = new ObjectMapper();
+ configData = mapper.convertValue(configDataNode, Map.class);
+ }
+ }
+ AgentConfiguration config = new AgentConfiguration(configData);
+
+ return new AgentPlan(actions, actionsByEvent, resourceProviders,
config);
}
}
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializer.java
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializer.java
index 05f2ea6..ca61c79 100644
---
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializer.java
+++
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializer.java
@@ -105,5 +105,24 @@ public class AgentPlanJsonSerializer extends
StdSerializer<AgentPlan> {
jsonGenerator.writeEndObject();
}
jsonGenerator.writeEndObject();
+
+ // Serialize config data
+ jsonGenerator.writeFieldName("config");
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeFieldName("conf_data");
+ jsonGenerator.writeStartObject();
+ agentPlan
+ .getConfigData()
+ .forEach(
+ (key, value) -> {
+ try {
+ jsonGenerator.writeFieldName(key);
+ jsonGenerator.writeObject(value);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ jsonGenerator.writeEndObject();
+ jsonGenerator.writeEndObject();
}
}
diff --git
a/plan/src/test/java/org/apache/flink/agents/plan/AgentConfigurationTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/AgentConfigurationTest.java
new file mode 100644
index 0000000..125c737
--- /dev/null
+++
b/plan/src/test/java/org/apache/flink/agents/plan/AgentConfigurationTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.plan;
+
+import org.apache.flink.agents.api.configuration.ConfigOption;
+import org.junit.jupiter.api.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AgentConfigurationTest {
+
+ @Test
+ void testGetInt() {
+ Map<String, Object> data = new HashMap<>();
+ data.put("int_key", 42);
+ data.put("str_key", "123");
+ data.put("invalid_key", "not_an_int");
+ AgentConfiguration config = new AgentConfiguration(data);
+
+ assertEquals(Integer.valueOf(42), config.getInt("int_key", null));
+ assertEquals(Integer.valueOf(123), config.getInt("str_key", null));
+ assertNull(config.getInt("missing_key", null));
+ assertEquals(Integer.valueOf(999), config.getInt("missing_key", 999));
+
+ assertThrows(
+ NumberFormatException.class,
+ () -> {
+ config.getInt("invalid_key", null);
+ });
+ }
+
+ @Test
+ void testGetLong() {
+ Map<String, Object> data = new HashMap<>();
+ data.put("long_key", 123456789012L);
+ data.put("str_long_key", "9876543210");
+ data.put("invalid_long_key", "not_a_long");
+
+ AgentConfiguration config = new AgentConfiguration(data);
+
+ assertEquals(Long.valueOf(123456789012L), config.getLong("long_key",
null));
+
+ assertEquals(Long.valueOf(9876543210L), config.getLong("str_long_key",
null));
+
+ assertNull(config.getLong("missing_key", null));
+
+ assertEquals(Long.valueOf(999999999999L),
config.getLong("missing_key", 999999999999L));
+
+ assertThrows(
+ NumberFormatException.class,
+ () -> {
+ config.getLong("invalid_long_key", null);
+ });
+ }
+
+ @Test
+ void testGetFloat() {
+ Map<String, Object> data = new HashMap<>();
+ data.put("float_key", 3.14f);
+ data.put("int_key", 42);
+ data.put("str_key", "2.5");
+ data.put("invalid_key", "not_a_float");
+ AgentConfiguration config = new AgentConfiguration(data);
+
+ assertEquals(Float.valueOf(3.14f), config.getFloat("float_key", null));
+ assertEquals(Float.valueOf(42.0f), config.getFloat("int_key", null));
+ assertEquals(Float.valueOf(2.5f), config.getFloat("str_key", null));
+ assertNull(config.getFloat("missing_key", null));
+ assertEquals(Float.valueOf(1.23f), config.getFloat("missing_key",
1.23f));
+
+ assertThrows(
+ NumberFormatException.class,
+ () -> {
+ config.getFloat("invalid_key", null);
+ });
+ }
+
+ @Test
+ void testGetDouble() {
+ Map<String, Object> data = new HashMap<>();
+ data.put("double_key", 3.14);
+ data.put("int_key", 42);
+ data.put("str_key", "2.5");
+ data.put("invalid_key", "not_a_double");
+
+ AgentConfiguration config = new AgentConfiguration(data);
+
+ assertEquals(Double.valueOf(3.14), config.getDouble("double_key",
null));
+
+ assertEquals(Double.valueOf(42.0), config.getDouble("int_key", null));
+
+ assertEquals(Double.valueOf(2.5), config.getDouble("str_key", null));
+
+ assertNull(config.getDouble("missing_key", null));
+
+ assertEquals(Double.valueOf(1.23), config.getDouble("missing_key",
1.23));
+
+ assertThrows(
+ NumberFormatException.class,
+ () -> {
+ config.getDouble("invalid_key", null);
+ });
+ }
+
+ @Test
+ void testGetBool() {
+ Map<String, Object> data = new HashMap<>();
+ data.put("true_key", true);
+ data.put("false_key", false);
+ data.put("str_key", "true");
+ AgentConfiguration config = new AgentConfiguration(data);
+
+ assertTrue(config.getBool("true_key", false));
+ assertFalse(config.getBool("false_key", true));
+ assertNull(config.getBool("missing_key", null));
+ assertTrue(config.getBool("missing_key", true));
+
+ // Note: Boolean.valueOf("true") is true in Java
+ assertTrue(config.getBool("str_key", null));
+ }
+
+ @Test
+ void testGetStr() {
+ Map<String, Object> data = new HashMap<>();
+ data.put("str_key", "hello");
+ data.put("int_key", 42);
+ data.put("float_key", 3.14);
+ AgentConfiguration config = new AgentConfiguration(data);
+
+ assertEquals("hello", config.getStr("str_key", null));
+ assertEquals("42", config.getStr("int_key", null));
+ assertEquals("3.14", config.getStr("float_key", null));
+ assertNull(config.getStr("missing_key", null));
+ assertEquals("default", config.getStr("missing_key", "default"));
+ }
+
+ @Test
+ void testGetWithConfigOption() {
+ Map<String, Object> data = new HashMap<>();
+ data.put("config.str", "config.value");
+ data.put("config.int", 6789);
+ data.put("config.float", "45.5");
+ data.put("config.boolean", true);
+
+ AgentConfiguration config = new AgentConfiguration(data);
+
+ ConfigOption<String> strOption =
+ new ConfigOption<>("config.str", String.class, "default_str");
+ ConfigOption<Integer> intOption = new ConfigOption<>("config.int",
Integer.class, 123);
+ ConfigOption<Long> longOption = new ConfigOption<>("config.int",
Long.class, 123L);
+ ConfigOption<Float> floatOption = new ConfigOption<>("config.float",
Float.class, 0.0f);
+ ConfigOption<Double> doubleOption = new ConfigOption<>("config.float",
Double.class, 0.0);
+ ConfigOption<Boolean> boolOption =
+ new ConfigOption<>("config.boolean", Boolean.class, false);
+
+ assertEquals("config.value", config.get(strOption));
+ assertEquals(Integer.valueOf(6789), config.get(intOption));
+ assertEquals(Long.valueOf(6789L), config.get(longOption));
+ assertEquals(Float.valueOf(45.5f), config.get(floatOption));
+ assertEquals(Double.valueOf(45.5), config.get(doubleOption));
+ assertEquals(Boolean.TRUE, config.get(boolOption));
+
+ ConfigOption<Integer> missingOption = new
ConfigOption<>("missing.key1", Integer.class, 22);
+ assertEquals(Integer.valueOf(22), config.get(missingOption));
+
+ ConfigOption<Integer> missingKey = new ConfigOption<>("missing.key2",
Integer.class, null);
+ assertNull(config.get(missingKey));
+ }
+
+ @Test
+ void testGetWithDefaultValue() {
+ ConfigOption<String> defaultStr =
+ new ConfigOption<>("default.str", String.class,
"default_value");
+ ConfigOption<Integer> defaultInt = new ConfigOption<>("default.int",
Integer.class, 100);
+ ConfigOption<Double> defaultDouble =
+ new ConfigOption<>("default.double", Double.class, 2.5);
+
+ AgentConfiguration config = new AgentConfiguration();
+
+ assertEquals("default_value", config.get(defaultStr));
+ assertEquals(Integer.valueOf(100), config.get(defaultInt));
+ assertEquals(Double.valueOf(2.5), config.get(defaultDouble));
+ }
+
+ @Test
+ void testGetWithNullAndDefault() {
+ ConfigOption<String> nullableStr =
+ new ConfigOption<>("nullable.str", String.class, "default");
+
+ AgentConfiguration config = new AgentConfiguration();
+ config.setStr("nullable.str", null);
+
+ assertEquals("default", config.get(nullableStr));
+ }
+}
diff --git
a/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializerTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializerTest.java
index 8bb231f..2cd3018 100644
---
a/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializerTest.java
+++
b/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializerTest.java
@@ -28,7 +28,9 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.Map;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -63,6 +65,14 @@ public class AgentPlanJsonDeserializerTest {
agentPlan.getActionsByEvent().get(InputEvent.class.getName()));
assertEquals(
List.of(secondAction),
agentPlan.getActionsByEvent().get(MyEvent.class.getName()));
+
+ // Check the flink agent config
+ Map<String, Object> configData = agentPlan.getConfigData();
+ assertThat(configData.keySet()).hasSize(4);
+ assertEquals(1, configData.get("key1"));
+ assertEquals(1.5, configData.get("key2"));
+ assertEquals(true, configData.get("key3"));
+ assertEquals("v1", configData.get("key4"));
}
private static class MyEvent extends Event {}
diff --git
a/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializerTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializerTest.java
index c703cc4..ec18751 100644
---
a/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializerTest.java
+++
b/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.api.context.RunnerContext;
import org.apache.flink.agents.plan.Action;
+import org.apache.flink.agents.plan.AgentConfiguration;
import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.plan.JavaFunction;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -190,8 +191,12 @@ public class AgentPlanJsonSerializerTest {
// Create a test agent
TestAgent agent = new TestAgent();
+ Map<String, Object> confData = new HashMap<>();
+ confData.put("config.key", "config.value");
+ AgentConfiguration agentConfiguration = new
AgentConfiguration(confData);
+
// Create AgentPlan from the agent
- AgentPlan agentPlan = new AgentPlan(agent);
+ AgentPlan agentPlan = new AgentPlan(agent, agentConfiguration);
// Serialize the agent plan to JSON
String json = new ObjectMapper().writeValueAsString(agentPlan);
@@ -199,6 +204,7 @@ public class AgentPlanJsonSerializerTest {
// Verify the JSON contains the expected fields
assertThat(json).contains("\"actions\":{");
assertThat(json).contains("\"actions_by_event\":{");
+ assertThat(json).contains("\"config\":{");
// Verify that the actions from @Action annotated methods are present
assertThat(json).contains("\"handleInputEvent\"");
@@ -217,5 +223,8 @@ public class AgentPlanJsonSerializerTest {
// Verify that regularMethod is not included (not annotated)
assertThat(json).doesNotContain("\"regularMethod\"");
+
+ // Verify that config data from AgentConfiguration is present
+
assertThat(json).contains("\"conf_data\":{\"config.key\":\"config.value\"}");
}
}
diff --git a/plan/src/test/resources/agent_plans/agent_plan.json
b/plan/src/test/resources/agent_plans/agent_plan.json
index 31af952..e66e20e 100644
--- a/plan/src/test/resources/agent_plans/agent_plan.json
+++ b/plan/src/test/resources/agent_plans/agent_plan.json
@@ -34,5 +34,13 @@
"org.apache.flink.agents.plan.serializer.AgentPlanJsonDeserializerTest$MyEvent":
[
"second_action"
]
+ },
+ "config": {
+ "conf_data": {
+ "key1": 1,
+ "key2": 1.5,
+ "key3": true,
+ "key4": "v1"
+ }
}
}
diff --git a/python/flink_agents/api/configuration.py
b/python/flink_agents/api/configuration.py
new file mode 100644
index 0000000..15d2766
--- /dev/null
+++ b/python/flink_agents/api/configuration.py
@@ -0,0 +1,169 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from abc import ABC, abstractmethod
+from typing import Any, Optional, Type
+
+
+class ConfigOption:
+ """A configuration option defines a configuration key with its type and
default
+ value.
+
+ Args:
+ key: The configuration key name
+ config_type: The expected type of the configuration value (int, float,
str,
+ or bool)
+ default: The default value for this configuration option
+ """
+
+ def __init__(self, key: str, config_type: Type[Any], default:
Optional[Any]=None) -> None:
+ """Initialize a configuration option."""
+ self._key = key
+ self._type = config_type
+ self._default_value = default
+ def get_key(self) -> str:
+ """Gets the configuration key."""
+ return self._key
+
+ def get_type(self) -> Type[Any]:
+ """Returns the type of the configuration value."""
+ return self._type
+
+ def get_default_value(self) -> Any:
+ """Returns the default value."""
+ return self._default_value
+
+class WritableConfiguration(ABC):
+ """Abstract base class providing write access to a configuration object.
+
+ This class enables modification of configuration settings.
+ """
+
+ @abstractmethod
+ def set_str(self, key: str, value: str) -> None:
+ """Set the string configuration value using the key.
+
+ Args:
+ key: The configuration key to set
+ value: The string value to set for the key
+ """
+
+ @abstractmethod
+ def set_int(self, key: str, value: int) -> None:
+ """Set the int configuration value using the key.
+
+ Args:
+ key: The configuration key to set
+ value: The integer value to set for the key
+ """
+
+ @abstractmethod
+ def set_float(self, key: str, value: float) -> None:
+ """Set the float configuration value using the key.
+
+ Args:
+ key: The configuration key to set
+ value: The float value to set for the key
+ """
+
+ @abstractmethod
+ def set_bool(self, key: str, value: bool) -> None: # noqa: FBT001
+ """Set the boolean configuration value using the key.
+
+ Args:
+ key: The configuration key to set
+ value: The boolean value to set for the key
+ """
+
+ @abstractmethod
+ def set(self, option: ConfigOption, value: Any) -> None:
+ """Set the configuration value using the ConfigOption.
+
+ Args:
+ option: The config option to set
+ value: The value to set for the key
+ """
+
+class ReadableConfiguration(ABC):
+ """Abstract base class providing read access to a configuration object.
+
+ This class enables retrieval of configuration settings.
+ """
+
+ @abstractmethod
+ def get_int(self, key: str, default: Optional[int]=None) -> int:
+ """Get the int configuration value by key.
+
+ Args:
+ key: The configuration key to retrieve
+ default: The default value to return if key is not found
+
+ Returns:
+ The integer value associated with the key or the default value
+ """
+
+ @abstractmethod
+ def get_float(self, key: str, default: Optional[float]=None) -> float:
+ """Get the float configuration value by key.
+
+ Args:
+ key: The configuration key to retrieve
+ default: The default value to return if key is not found
+
+ Returns:
+ The float value associated with the key or the default value
+ """
+
+ @abstractmethod
+ def get_bool(self, key: str, default: Optional[bool]=None) -> bool:
+ """Get the boolean configuration value by key.
+
+ Args:
+ key: The configuration key to retrieve
+ default: The default value to return if key is not found
+
+ Returns:
+ The boolean value associated with the key or the default value
+ """
+
+ @abstractmethod
+ def get_str(self, key: str, default: Optional[str]=None) -> str:
+ """Get the string configuration value by key.
+
+ Args:
+ key: The configuration key to retrieve
+ default: The default value to return if key is not found
+
+ Returns:
+ The string value associated with the key or the default value
+ """
+
+ @abstractmethod
+ def get(self, option: ConfigOption) -> Any:
+ """Get the configuration value by ConfigOption.
+
+ Args:
+ option: The metadata of the option to read
+
+ Returns:
+ The value of the given option
+ """
+
+class Configuration(WritableConfiguration, ReadableConfiguration, ABC):
+ """A configuration object that provides both read and write access to a
+ configuration object.
+ """
diff --git a/python/flink_agents/api/core_options.py
b/python/flink_agents/api/core_options.py
new file mode 100644
index 0000000..5599e24
--- /dev/null
+++ b/python/flink_agents/api/core_options.py
@@ -0,0 +1,70 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Any
+
+from pyflink.java_gateway import get_gateway
+
+from flink_agents.api.configuration import ConfigOption
+
+
+def covert_j_option_to_python_option(j_option: Any) -> ConfigOption:
+ """Convert a Java config option to a Python config option."""
+ key = j_option.getKey()
+ default = j_option.getDefaultValue()
+ type_name = j_option.getTypeName()
+
+ if type_name == "java.lang.String":
+ config_type = str
+ elif type_name == "java.lang.Integer":
+ config_type = int
+ elif type_name == "java.lang.Long":
+ config_type = int
+ elif type_name == "java.lang.Boolean":
+ config_type = bool
+ elif type_name == "java.lang.Float":
+ config_type = float
+ elif type_name == "java.lang.Double":
+ config_type = float
+ else:
+ msg = f"Unsupported type: {type_name}"
+ raise TypeError(msg)
+
+ return ConfigOption(key, config_type, default)
+
+
+class AgentConfigOptionsMeta(type):
+ """Metaclass for FlinkAgentsCoreOptions."""
+ def __init__(cls, name: str, bases: tuple[type, ...], attrs: dict[str,
Any]) -> None:
+ """Initialize the metaclass for FlinkAgentsCoreOptions."""
+ super().__init__(name, bases, attrs)
+
+ jvm = get_gateway().jvm
+ cls.jvm = jvm
+
+ def __getattr__(cls, item: str) -> ConfigOption:
+ j_option = getattr(
+
cls.jvm.org.apache.flink.agents.api.configuration.AgentConfigOptions,
+ item,
+ )
+
+ python_option = covert_j_option_to_python_option(j_option)
+ return python_option
+
+
+class AgentConfigOptions(metaclass=AgentConfigOptionsMeta):
+ """CoreOptions to manage core configuration parameters for Flink Agents."""
diff --git a/python/flink_agents/api/execution_environment.py
b/python/flink_agents/api/execution_environment.py
index e6d9efd..b2eef8c 100644
--- a/python/flink_agents/api/execution_environment.py
+++ b/python/flink_agents/api/execution_environment.py
@@ -24,6 +24,7 @@ from pyflink.datastream import DataStream, KeySelector,
StreamExecutionEnvironme
from pyflink.table import Schema, StreamTableEnvironment, Table
from flink_agents.api.agent import Agent
+from flink_agents.api.configuration import Configuration
class AgentBuilder(ABC):
@@ -108,6 +109,16 @@ class AgentsExecutionEnvironment(ABC):
"flink_agents.runtime.remote_execution_environment"
).create_instance(env=env, **kwargs)
+ @abstractmethod
+ def get_config(self, path: Optional[str] = None) -> Configuration:
+ """Get the writable configuration for flink agents.
+
+ Returns:
+ -------
+ WritableConfiguration
+ The configuration for flink agents.
+ """
+
@abstractmethod
def from_list(self, input: List[Dict[str, Any]]) -> AgentBuilder:
"""Set input for agents. Used for local execution.
diff --git a/python/flink_agents/api/runner_context.py
b/python/flink_agents/api/runner_context.py
index 03e7954..c822a49 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/runner_context.py
@@ -18,6 +18,7 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple
+from flink_agents.api.configuration import ReadableConfiguration
from flink_agents.api.events.event import Event
from flink_agents.api.metric_group import MetricGroup
from flink_agents.api.resource import Resource, ResourceType
@@ -107,3 +108,13 @@ class RunnerContext(ABC):
Any
The result of the function.
"""
+
+ @abstractmethod
+ def get_config(self) -> ReadableConfiguration:
+ """Get the readable configuration for flink agents.
+
+ Returns:
+ -------
+ ReadableConfiguration
+ The configuration for flink agents.
+ """
diff --git a/python/flink_agents/plan/agent_plan.py
b/python/flink_agents/plan/agent_plan.py
index 83f4ef8..c697fc3 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -24,6 +24,7 @@ from flink_agents.api.resource import Resource, ResourceType
from flink_agents.plan.actions.action import Action
from flink_agents.plan.actions.chat_model_action import CHAT_MODEL_ACTION
from flink_agents.plan.actions.tool_call_action import TOOL_CALL_ACTION
+from flink_agents.plan.configuration import AgentConfiguration
from flink_agents.plan.function import PythonFunction
from flink_agents.plan.resource_provider import (
JavaResourceProvider,
@@ -53,6 +54,7 @@ class AgentPlan(BaseModel):
actions: Dict[str, Action]
actions_by_event: Dict[str, List[str]]
resource_providers: Optional[Dict[ResourceType, Dict[str,
ResourceProvider]]] = None
+ config: Optional[AgentConfiguration] = None
__resources: Dict[ResourceType, Dict[str, Resource]] = {}
@field_serializer("resource_providers")
@@ -116,7 +118,7 @@ class AgentPlan(BaseModel):
return self
@staticmethod
- def from_agent(agent: Agent) -> "AgentPlan":
+ def from_agent(agent: Agent, config: AgentConfiguration) -> "AgentPlan":
"""Build a AgentPlan from user defined agent."""
actions = {}
actions_by_event = {}
@@ -142,6 +144,7 @@ class AgentPlan(BaseModel):
actions=actions,
actions_by_event=actions_by_event,
resource_providers=resource_providers,
+ config=config,
)
def get_actions(self, event_type: str) -> List[Action]:
@@ -173,7 +176,7 @@ class AgentPlan(BaseModel):
self.__resources[type] = {}
if name not in self.__resources[type]:
resource_provider = self.resource_providers[type][name]
- resource =
resource_provider.provide(get_resource=self.get_resource)
+ resource =
resource_provider.provide(get_resource=self.get_resource, config=self.config)
self.__resources[type][name] = resource
return self.__resources[type][name]
diff --git a/python/flink_agents/plan/configuration.py
b/python/flink_agents/plan/configuration.py
new file mode 100644
index 0000000..b327c13
--- /dev/null
+++ b/python/flink_agents/plan/configuration.py
@@ -0,0 +1,173 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from pathlib import Path
+from typing import Any, Dict, Optional, Type
+
+import yaml
+from pydantic import BaseModel
+from typing_extensions import override
+
+from flink_agents.api.configuration import (
+ ConfigOption,
+ Configuration,
+)
+
+
+def flatten_dict(d: Dict, parent_key: str = '', sep: str = '.') -> Dict[str,
Any]:
+ """Flatten a nested dictionary into a single-level dictionary.
+
+ This function recursively traverses the dictionary, converting multi-level
+ nested key-value pairs into a single-level structure, where nested levels
+ are represented by joining key names with the specified separator.
+
+ Args:
+ d (Dict): The nested dictionary to be flattened
+ parent_key (str): The parent key name, used in recursion to track the
+ upper-level key path. Defaults to an empty string.
+ sep (str): The separator used to join parent and child keys.
+ Defaults to dot ('.').
+
+ Returns:
+ Dict[str, Any]: A flattened single-level dictionary where keys from
+ the original nested structure are joined with the
separator
+ """
+ items = {}
+ for k, v in d.items():
+ new_key = f"{parent_key}{sep}{k}" if parent_key else k
+ if isinstance(v, dict):
+ items.update(flatten_dict(v, new_key, sep=sep))
+ else:
+ items[new_key] = v
+ return items
+
+class AgentConfiguration(BaseModel, Configuration):
+ """Base class for config objects in the system.
+ Provides a flat dict interface to access nested config values.
+ """
+
+ conf_data: Dict[str, Any]
+
+ def __init__(self, conf_data: Optional[Dict[str, Any]] = None) -> None:
+ """Initialize with optional configuration data."""
+ if conf_data is None:
+ super().__init__(conf_data = {})
+ else:
+ super().__init__(conf_data = conf_data)
+
+ def get_value_with_type(self, key: str, config_type: Type[Any], default:
Any) -> Any:
+ """Helper method for all the get_xxx functions to avoid duplicate code.
+
+ Args:
+ key: The configuration key to retrieve
+ config_type: The expected type of the configuration value (int,
float, str,
+ or bool)
+ default: The default value to return if key is not found
+
+ Returns:
+ The value associated with the key or the default value
+ """
+ value = self.conf_data.get(key)
+ if value is None:
+ return default
+
+ try:
+ return config_type(value)
+ except (ValueError, TypeError) as e:
+ msg = f"Invalid value for {key}: {value}"
+ raise ValueError(msg) from e
+
+ @override
+ def get_int(self, key: str, default: Optional[int]=None) -> int:
+ return self.get_value_with_type(key, int, default)
+
+ @override
+ def get_float(self, key: str, default: Optional[float]=None) -> float:
+ return self.get_value_with_type(key, float, default)
+
+ @override
+ def get_bool(self, key: str, default: Optional[bool]=None) -> bool:
+ return self.get_value_with_type(key, bool, default)
+
+ @override
+ def get_str(self, key: str, default: Optional[str]=None) -> str:
+ return self.get_value_with_type(key, str, default)
+
+ @override
+ def get(self, option: ConfigOption) -> Any:
+ return self.get_value_with_type(option.get_key(), option.get_type(),
option.get_default_value())
+
+ @override
+ def set_str(self, key: str, value: str) -> None:
+ self.conf_data[key] = value
+
+ @override
+ def set_int(self, key: str, value: int) -> None:
+ self.conf_data[key] = value
+
+ @override
+ def set_float(self, key: str, value: float) -> None:
+ self.conf_data[key] = value
+
+ @override
+ def set_bool(self, key: str, value: bool) -> None:
+ self.conf_data[key] = value
+
+ @override
+ def set(self, option: ConfigOption, value: Any) -> None:
+ self.conf_data[option.get_key()] = value
+
+ def load_from_file(self, config_path: Optional[str] = None) -> None:
+ """Load configuration from a YAML file and update current
configuration data.
+
+ Args:
+ config_path (str, optional): Path to the configuration file.
+ """
+ if config_path:
+ path = Path(config_path)
+ with path.open() as f:
+ raw_config = yaml.safe_load(f)
+ self.conf_data.update(flatten_dict(raw_config.get('agent',
{})))
+
+ def get_conf_data(self) -> dict:
+ """Get the configuration data dictionary.
+
+ Returns:
+ dict: A dictionary containing all configuration items
+ """
+ return self.conf_data
+
+ def get_config_data_by_prefix(self, prefix: str) -> dict:
+ """Extract configuration items for a specific prefix from the
configuration
+ data.
+
+ Parameters:
+ prefix: The prefix for the key of configuration items.
+
+ Returns:
+ dict: A dictionary contains the configuration items for the
specified
+ key with the prefix. The keys are the configuration item names with
+ the prefix removed, and the values are the corresponding
configuration
+ values.
+ """
+ prefix = f"{prefix}."
+ result = {}
+ for key, value in self.conf_data.items():
+ if key.startswith(prefix):
+ sub_key = key[len(prefix) :]
+ result[sub_key] = value
+ return result
diff --git a/python/flink_agents/plan/resource_provider.py
b/python/flink_agents/plan/resource_provider.py
index fbcaa2d..61a15dc 100644
--- a/python/flink_agents/plan/resource_provider.py
+++ b/python/flink_agents/plan/resource_provider.py
@@ -27,6 +27,7 @@ from flink_agents.api.resource import (
ResourceType,
SerializableResource,
)
+from flink_agents.plan.configuration import AgentConfiguration
class ResourceProvider(BaseModel, ABC):
@@ -45,13 +46,16 @@ class ResourceProvider(BaseModel, ABC):
type: ResourceType
@abstractmethod
- def provide(self, get_resource: Callable) -> Resource:
+ def provide(self, get_resource: Callable, config: AgentConfiguration) ->
Resource:
"""Create resource in runtime.
Parameters
----------
get_resource : Callable
The helper function to get other resource declared in the same
Agent.
+
+ config : AgentConfiguration
+ Configuration for Flink Agents.
"""
@@ -88,11 +92,21 @@ class PythonResourceProvider(ResourceProvider):
clazz: str
kwargs: Dict[str, Any]
- def provide(self, get_resource: Callable) -> Resource:
+ def provide(self, get_resource: Callable, config: AgentConfiguration) ->
Resource:
"""Create resource in runtime."""
module = importlib.import_module(self.module)
cls = getattr(module, self.clazz)
- resource = cls(**self.kwargs, get_resource=get_resource)
+
+ final_kwargs = {}
+
+ resource_class_config = config.get_config_data_by_prefix(self.clazz)
+ resource_config = config.get_config_data_by_prefix(self.name)
+
+ final_kwargs.update(self.kwargs)
+ final_kwargs.update(resource_class_config)
+ final_kwargs.update(resource_config)
+
+ resource = cls(**final_kwargs, get_resource=get_resource)
return resource
@@ -124,7 +138,7 @@ class
PythonSerializableResourceProvider(SerializableResourceProvider):
resource=resource,
)
- def provide(self, get_resource: Callable) -> Resource:
+ def provide(self, get_resource: Callable, config: AgentConfiguration) ->
Resource:
"""Get or deserialize resource in runtime."""
if self.resource is None:
module = importlib.import_module(self.module)
@@ -140,7 +154,7 @@ class JavaResourceProvider(ResourceProvider):
Currently, this class only used for deserializing Java agent plan json
"""
- def provide(self, get_resource: Callable) -> Resource:
+ def provide(self, get_resource: Callable, config: AgentConfiguration) ->
Resource:
"""Create resource in runtime."""
err_msg = (
"Currently, flink-agents doesn't support create resource "
@@ -156,7 +170,7 @@ class
JavaSerializableResourceProvider(SerializableResourceProvider):
Currently, this class only used for deserializing Java agent plan json
"""
- def provide(self, get_resource: Callable) -> Resource:
+ def provide(self, get_resource: Callable, config: AgentConfiguration) ->
Resource:
"""Get or deserialize resource in runtime."""
err_msg = (
"Currently, flink-agents doesn't support create resource "
diff --git
a/python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py
b/python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py
similarity index 50%
copy from
python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py
copy to
python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py
index cde375c..3aa094b 100644
--- a/python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py
+++
b/python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py
@@ -15,21 +15,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
-import sys
from pathlib import Path
-from flink_agents.plan.agent_plan import AgentPlan
-from
flink_agents.plan.tests.compatibility.python_agent_plan_compatibility_test_agent
import (
- PythonAgentPlanCompatibilityTestAgent,
-)
+from pyflink.util.java_utils import add_jars_to_context_class_loader
-# The agent plan json will be checked by
-# flink-agents/plan/src/test/java/org/apache/flink/agents/plan
-# /compatibility/GenerateAgentPlanJson.java
-# correspond modification should be applied to it when modify this file.
+from flink_agents.api.core_options import AgentConfigOptions
+
+# This script is used to verify that Java-defined configuration options
+# (e.g., AgentConfigOptions) are correctly exposed and accessible in the
+# Python environment via the JAR file. It loads a Java JAR into the Python
+# context and performs basic assertions on the configuration keys, types,
+# and default values to ensure compatibility between Java and Python layers.
+#
+# The JAR file path is relative to this script and should be updated if
+# the build structure changes.
if __name__ == "__main__":
- json_path = sys.argv[1]
- agent_plan = AgentPlan.from_agent(PythonAgentPlanCompatibilityTestAgent())
- json_value = agent_plan.model_dump_json(serialize_as_any=True, indent=4)
- with Path(json_path).open("w") as f:
- f.write(json_value)
+ current_dir = Path(__file__).parent
+ add_jars_to_context_class_loader(
+ [
+
f"file:///{current_dir}/../../../../../api/target/flink-agents-api-0.1-SNAPSHOT.jar"
+ ]
+ )
+
+ assert AgentConfigOptions.BASE_LOG_DIR.get_key() == "baseLogDir"
+ assert AgentConfigOptions.BASE_LOG_DIR.get_type() is str
+ assert AgentConfigOptions.BASE_LOG_DIR.get_default_value() is None
diff --git
a/python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py
b/python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py
index cde375c..24ad9cf 100644
--- a/python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py
+++ b/python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py
@@ -19,6 +19,7 @@ import sys
from pathlib import Path
from flink_agents.plan.agent_plan import AgentPlan
+from flink_agents.plan.configuration import AgentConfiguration
from
flink_agents.plan.tests.compatibility.python_agent_plan_compatibility_test_agent
import (
PythonAgentPlanCompatibilityTestAgent,
)
@@ -29,7 +30,7 @@ from
flink_agents.plan.tests.compatibility.python_agent_plan_compatibility_test_
# correspond modification should be applied to it when modify this file.
if __name__ == "__main__":
json_path = sys.argv[1]
- agent_plan = AgentPlan.from_agent(PythonAgentPlanCompatibilityTestAgent())
+ agent_plan = AgentPlan.from_agent(PythonAgentPlanCompatibilityTestAgent(),
AgentConfiguration())
json_value = agent_plan.model_dump_json(serialize_as_any=True, indent=4)
with Path(json_path).open("w") as f:
f.write(json_value)
diff --git a/python/flink_agents/plan/tests/resources/agent_plan.json
b/python/flink_agents/plan/tests/resources/agent_plan.json
index 86780bd..dd2a382 100644
--- a/python/flink_agents/plan/tests/resources/agent_plan.json
+++ b/python/flink_agents/plan/tests/resources/agent_plan.json
@@ -81,5 +81,10 @@
"__resource_provider_type__": "PythonResourceProvider"
}
}
+ },
+ "config": {
+ "conf_data": {
+ "mock.key": "mock.value"
+ }
}
}
\ No newline at end of file
diff --git a/python/flink_agents/plan/tests/test_agent_plan.py
b/python/flink_agents/plan/tests/test_agent_plan.py
index fa8cdff..20d61d8 100644
--- a/python/flink_agents/plan/tests/test_agent_plan.py
+++ b/python/flink_agents/plan/tests/test_agent_plan.py
@@ -29,6 +29,7 @@ from flink_agents.api.events.event import Event, InputEvent,
OutputEvent
from flink_agents.api.resource import Resource, ResourceType
from flink_agents.api.runner_context import RunnerContext
from flink_agents.plan.agent_plan import AgentPlan
+from flink_agents.plan.configuration import AgentConfiguration
from flink_agents.plan.function import PythonFunction
@@ -43,7 +44,7 @@ class AgentForTest(Agent): # noqa D101
def test_from_agent(): # noqa D102
agent = AgentForTest()
- agent_plan = AgentPlan.from_agent(agent)
+ agent_plan = AgentPlan.from_agent(agent, AgentConfiguration())
event_type = f"{InputEvent.__module__}.{InputEvent.__name__}"
actions = agent_plan.get_actions(event_type)
assert len(actions) == 1
@@ -66,7 +67,7 @@ class InvalidAgent(Agent): # noqa D101
def test_to_agent_invalid_signature() -> None: # noqa D103
agent = InvalidAgent()
with pytest.raises(TypeError):
- AgentPlan.from_agent(agent)
+ AgentPlan.from_agent(agent, AgentConfiguration())
class MyEvent(Event):
@@ -116,7 +117,7 @@ class MyAgent(Agent): # noqa: D101
@pytest.fixture(scope="module")
def agent_plan() -> AgentPlan: # noqa: D103
- return AgentPlan.from_agent(MyAgent())
+ return AgentPlan.from_agent(MyAgent(), AgentConfiguration({"mock.key":
"mock.value"}))
current_dir = Path(__file__).parent
@@ -139,7 +140,7 @@ def test_agent_plan_deserialize(agent_plan: AgentPlan) ->
None: # noqa: D103
def test_get_resource() -> None: # noqa: D103
- agent_plan = AgentPlan.from_agent(MyAgent())
+ agent_plan = AgentPlan.from_agent(MyAgent(), AgentConfiguration())
mock = agent_plan.get_resource("mock", ResourceType.CHAT_MODEL)
assert (
mock.chat(ChatMessage(role=MessageRole.USER, content="")).content
@@ -162,7 +163,7 @@ def test_add_action_and_resource_to_agent() -> None: #
noqa: D103
desc="mock resource just for testing.",
connection="mock",
)
- agent_plan = AgentPlan.from_agent(my_agent)
+ agent_plan = AgentPlan.from_agent(my_agent,
AgentConfiguration({"mock.key": "mock.value"}))
json_value = agent_plan.model_dump_json(serialize_as_any=True, indent=4)
with Path.open(Path(f"{current_dir}/resources/agent_plan.json")) as f:
expected_json = f.read()
diff --git a/python/flink_agents/plan/tests/test_configuration.py
b/python/flink_agents/plan/tests/test_configuration.py
new file mode 100644
index 0000000..7a874ed
--- /dev/null
+++ b/python/flink_agents/plan/tests/test_configuration.py
@@ -0,0 +1,228 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import tempfile
+from pathlib import Path
+
+import pytest
+import yaml
+
+from flink_agents.api.configuration import ConfigOption
+from flink_agents.plan.configuration import AgentConfiguration
+
+
+def test_load_configuration_from_file() -> None:
+ """Test loading configuration from a YAML file."""
+ # Create a temporary YAML file with test data
+ test_data = {
+ "agent": {
+ "database": {
+ "host": "localhost",
+ "port": 5432,
+ "credentials": {"username": "admin", "password": "secret"},
+ },
+ "api": {"endpoint": "/api/v1", "timeout": 30.0},
+ "debug": True,
+ }
+ }
+
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False)
as f:
+ yaml.dump(test_data, f)
+ config_file = f.name
+
+ try:
+ # Load configuration
+ config = AgentConfiguration()
+ config.load_from_file(config_file)
+
+ # Test that nested configuration is properly flattened
+ assert config.get_str('database.host') == 'localhost'
+ assert config.get_int('database.port') == 5432
+ assert config.get_str('database.credentials.username') == 'admin'
+ assert config.get_str('database.credentials.password') == 'secret'
+ assert config.get_str('api.endpoint') == '/api/v1'
+ assert config.get_float('api.timeout') == 30.0
+ assert config.get_bool('debug') is True
+ finally:
+ config_file = Path(config_file)
+ config_file.unlink()
+
+
+def test_load_configuration_without_path() -> None:
+ """Test loading configuration without a path (should not change
_conf_data)."""
+ config = AgentConfiguration()
+ # Initially _conf_data should be empty or unchanged
+ original_conf_data = config.get_conf_data().copy()
+ config.load_from_file(None)
+ # _conf_data should remain unchanged
+ assert config.get_conf_data() == original_conf_data
+
+
+def test_load_configuration_with_invalid_file() -> None:
+ """Test loading configuration with a non-existent file."""
+ config = AgentConfiguration()
+ with pytest.raises(FileNotFoundError):
+ config.load_from_file('/path/to/nonexistent/file.yaml')
+
+
+def test_load_configuration_with_invalid_yaml() -> None:
+ """Test loading configuration with invalid YAML content."""
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False)
as f:
+ f.write('invalid: yaml: content: [')
+ config_file = f.name
+
+ try:
+ config = AgentConfiguration()
+ with pytest.raises(yaml.YAMLError):
+ config.load_from_file(config_file)
+ finally:
+ config_file = Path(config_file)
+ config_file.unlink()
+
+
+def test_get_int() -> None:
+ """Test get_int method with various inputs."""
+ config = AgentConfiguration({'int_key': 42, 'str_key': '123',
'invalid_key': 'not_an_int'})
+
+ # Test normal integer value
+ assert config.get_int('int_key') == 42
+
+ # Test string that can be converted to int
+ assert config.get_int('str_key') == 123
+
+ # Test default value when key is not found
+ assert config.get_int('missing_key', 999) == 999
+
+ # Test default value when no default specified
+ assert config.get_int('missing_key') is None
+
+ # Test invalid value that cannot be converted to int
+ with pytest.raises(ValueError, match="Invalid value for invalid_key:
not_an_int"):
+ config.get_int('invalid_key')
+
+
+def test_get_float() -> None:
+ """Test get_float method with various inputs."""
+ config = AgentConfiguration({'float_key': 3.14, 'int_key': 42, 'str_key':
'2.5', 'invalid_key': 'not_a_float'})
+
+ # Test normal float value
+ assert config.get_float('float_key') == 3.14
+
+ # Test int value converted to float
+ assert config.get_float('int_key') == 42.0
+
+ # Test string that can be converted to float
+ assert config.get_float('str_key') == 2.5
+
+ # Test default value when key is not found
+ assert config.get_float('missing_key', 1.23) == 1.23
+
+ # Test default value when no default specified
+ assert config.get_float('missing_key') is None
+
+ # Test invalid value that cannot be converted to float
+ with pytest.raises(ValueError, match="Invalid value for invalid_key:
not_a_float"):
+ config.get_float('invalid_key')
+
+
+def test_get_bool() -> None:
+ """Test get_bool method with various inputs."""
+ config = AgentConfiguration({'bool_key': True, 'false_key': False,
'str_key': 'true'})
+
+ # Test normal boolean values
+ assert config.get_bool('bool_key') is True
+ assert config.get_bool('false_key') is False
+
+ # Test default value when key is not found
+ assert config.get_bool('missing_key', True) is True
+
+ # Test default value when no default specified
+ assert config.get_bool('missing_key') is None
+
+ # Note: bool() in Python behaves differently than might be expected
+ # bool('true') is True, but that's Python behavior, not a bug in our code
+ assert config.get_bool('str_key') is True
+
+
+def test_get_str() -> None:
+ """Test get_str method with various inputs."""
+ config = AgentConfiguration({'str_key': 'hello', 'int_key': 42,
'float_key': 3.14})
+
+ # Test normal string value
+ assert config.get_str('str_key') == 'hello'
+
+ # Test int value converted to string
+ assert config.get_str('int_key') == '42'
+
+ # Test float value converted to string
+ assert config.get_str('float_key') == '3.14'
+
+ # Test default value when key is not found
+ assert config.get_str('missing_key', 'default') == 'default'
+
+ # Test default value when no default specified
+ assert config.get_str('missing_key') is None
+
+ # Test None value
+ assert config.get_str('none_key') is None
+
+def test_get_with_config_option() -> None: # noqa: D103
+ data = {
+ "config.str": "config.value",
+ "config.int": 6789,
+ "config.float": "45.5",
+ "config.boolean": True,
+ }
+
+ config = AgentConfiguration(data)
+
+ str_option = ConfigOption("config.str", str, "default_str")
+ int_option = ConfigOption("config.int", int, 123)
+ float_option = ConfigOption("config.float", float, 0.0)
+ bool_option = ConfigOption("config.boolean", bool, False)
+
+ assert config.get(str_option) == "config.value"
+ assert config.get(int_option) == 6789
+ assert config.get(float_option) == 45.5
+ assert config.get(bool_option) is True
+
+ missing_option = ConfigOption("missing.key1", int, 22)
+ assert config.get(missing_option) == 22
+
+ missing_key = ConfigOption("missing.key2", int, None)
+ assert config.get(missing_key) is None
+
+
+def test_get_with_default_value() -> None: # noqa: D103
+ default_str = ConfigOption("default.str", str, "default_value")
+ default_int = ConfigOption("default.int", int, 100)
+ default_double = ConfigOption("default.double", float, 2.5)
+
+ config = AgentConfiguration()
+
+ assert config.get(default_str) == "default_value"
+ assert config.get(default_int) == 100
+ assert config.get(default_double) == 2.5
+
+
+def test_get_with_null_and_default() -> None: # noqa: D103
+ nullable_str = ConfigOption("nullable.str", str, "default")
+
+ config = AgentConfiguration()
+ config.set_str("nullable.str", None)
+
+ assert config.get(nullable_str) == "default"
diff --git a/python/flink_agents/runtime/flink_runner_context.py
b/python/flink_agents/runtime/flink_runner_context.py
index 33ffe64..f15d9d9 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -22,6 +22,7 @@ from typing import Any, Callable, Dict, Tuple
import cloudpickle
from typing_extensions import override
+from flink_agents.api.configuration import ReadableConfiguration
from flink_agents.api.events.event import Event
from flink_agents.api.resource import Resource, ResourceType
from flink_agents.api.runner_context import RunnerContext
@@ -129,6 +130,17 @@ class FlinkRunnerContext(RunnerContext):
yield
return future.result()
+ @override
+ def get_config(self) -> ReadableConfiguration:
+ """Get the readable configuration for flink agents.
+
+ Returns:
+ -------
+ ReadableConfiguration
+ The configuration for flink agents.
+ """
+ return self.__agent_plan.config
+
def create_flink_runner_context(
j_runner_context: Any, agent_plan_json: str, executor: ThreadPoolExecutor
diff --git a/python/flink_agents/runtime/local_execution_environment.py
b/python/flink_agents/runtime/local_execution_environment.py
index 395ff5a..aa5605f 100644
--- a/python/flink_agents/runtime/local_execution_environment.py
+++ b/python/flink_agents/runtime/local_execution_environment.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Optional
from pyflink.common import TypeInformation
from pyflink.datastream import DataStream, KeySelector,
StreamExecutionEnvironment
@@ -26,6 +26,7 @@ from flink_agents.api.execution_environment import (
AgentBuilder,
AgentsExecutionEnvironment,
)
+from flink_agents.plan.configuration import AgentConfiguration
from flink_agents.runtime.local_runner import LocalRunner
@@ -37,15 +38,16 @@ class LocalAgentBuilder(AgentBuilder):
__output: List[Any]
__runner: LocalRunner = None
__executed: bool = False
+ __config: AgentConfiguration
def __init__(
- self, env: "LocalExecutionEnvironment", input: List[Dict[str, Any]]
+ self, env: "LocalExecutionEnvironment", input: List[Dict[str, Any]],
config: AgentConfiguration
) -> None:
"""Init empty output list."""
self.__env = env
self.__input = input
self.__output = []
-
+ self.__config = config
def apply(self, agent: Agent) -> AgentBuilder:
"""Create local runner to execute given agent.
@@ -54,7 +56,7 @@ class LocalAgentBuilder(AgentBuilder):
if self.__runner is not None:
err_msg = "LocalAgentBuilder doesn't support apply multiple
agents."
raise RuntimeError(err_msg)
- self.__runner = LocalRunner(agent)
+ self.__runner = LocalRunner(agent, self.__config)
self.__env.set_agent(self.__input, self.__output, self.__runner)
return self
@@ -86,6 +88,13 @@ class LocalExecutionEnvironment(AgentsExecutionEnvironment):
__output: List[Any] = None
__runner: LocalRunner = None
__executed: bool = False
+ __config: AgentConfiguration = AgentConfiguration()
+
+ def get_config(self, path: Optional[str] = None) -> AgentConfiguration:
+ """Get configuration of execution environment."""
+ if path is not None:
+ self.__config.load_from_file(path)
+ return self.__config
def from_list(self, input: list) -> LocalAgentBuilder:
"""Set input list of execution environment."""
@@ -94,7 +103,7 @@ class LocalExecutionEnvironment(AgentsExecutionEnvironment):
raise RuntimeError(err_msg)
self.__input = input
- return LocalAgentBuilder(env=self, input=input)
+ return LocalAgentBuilder(env=self, input=input, config=self.__config)
def set_agent(self, input: list, output: list, runner: LocalRunner) ->
None:
"""Set agent input, output and runner."""
diff --git a/python/flink_agents/runtime/local_runner.py
b/python/flink_agents/runtime/local_runner.py
index bfca55f..a9ada75 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -29,6 +29,7 @@ from flink_agents.api.metric_group import MetricGroup
from flink_agents.api.resource import Resource, ResourceType
from flink_agents.api.runner_context import RunnerContext
from flink_agents.plan.agent_plan import AgentPlan
+from flink_agents.plan.configuration import AgentConfiguration
from flink_agents.runtime.agent_runner import AgentRunner
from flink_agents.runtime.local_memory_object import LocalMemoryObject
@@ -58,8 +59,9 @@ class LocalRunnerContext(RunnerContext):
events: deque[Event]
_store: dict[str, Any]
_short_term_memory: MemoryObject
+ _config: AgentConfiguration
- def __init__(self, agent_plan: AgentPlan, key: Any) -> None:
+ def __init__(self, agent_plan: AgentPlan, key: Any, config:
AgentConfiguration) -> None:
"""Initialize a new context with the given agent and key.
Parameters
@@ -77,6 +79,7 @@ class LocalRunnerContext(RunnerContext):
self._short_term_memory = LocalMemoryObject(
self._store, LocalMemoryObject.ROOT_KEY
)
+ self._config = config
@property
def key(self) -> Any:
@@ -144,6 +147,10 @@ class LocalRunnerContext(RunnerContext):
yield
return func_result
+ @override
+ def get_config(self) -> AgentConfiguration:
+ return self._config
+
class LocalRunner(AgentRunner):
"""Agent runner implementation for local execution, which is
@@ -157,13 +164,16 @@ class LocalRunner(AgentRunner):
Dictionary of active contexts indexed by key.
__outputs:
Outputs generated by agent execution.
+ __config:
+ Internal configration.
"""
__agent_plan: AgentPlan
__keyed_contexts: Dict[Any, LocalRunnerContext]
__outputs: List[Dict[str, Any]]
+ __config: AgentConfiguration
- def __init__(self, agent: Agent) -> None:
+ def __init__(self, agent: Agent, config: AgentConfiguration) -> None:
"""Initialize the runner with the provided agent.
Parameters
@@ -171,9 +181,10 @@ class LocalRunner(AgentRunner):
agent : Agent
The agent class to convert and run.
"""
- self.__agent_plan = AgentPlan.from_agent(agent)
+ self.__agent_plan = AgentPlan.from_agent(agent, config)
self.__keyed_contexts = {}
self.__outputs = []
+ self.__config = config
@override
def run(self, **data: Dict[str, Any]) -> Any:
@@ -197,7 +208,7 @@ class LocalRunner(AgentRunner):
key = uuid.uuid4()
if key not in self.__keyed_contexts:
- self.__keyed_contexts[key] = LocalRunnerContext(self.__agent_plan,
key)
+ self.__keyed_contexts[key] = LocalRunnerContext(self.__agent_plan,
key, self.__config)
context = self.__keyed_contexts[key]
if "value" in data:
diff --git a/python/flink_agents/runtime/remote_execution_environment.py
b/python/flink_agents/runtime/remote_execution_environment.py
index 8b8c0fe..6faf3b0 100644
--- a/python/flink_agents/runtime/remote_execution_environment.py
+++ b/python/flink_agents/runtime/remote_execution_environment.py
@@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
+import os
+from pathlib import Path
from typing import Any, Dict, List, Optional
import cloudpickle
@@ -37,6 +39,7 @@ from flink_agents.api.execution_environment import (
AgentsExecutionEnvironment,
)
from flink_agents.plan.agent_plan import AgentPlan
+from flink_agents.plan.configuration import AgentConfiguration
class RemoteAgentBuilder(AgentBuilder):
@@ -46,13 +49,15 @@ class RemoteAgentBuilder(AgentBuilder):
__agent_plan: AgentPlan = None
__output: DataStream = None
__t_env: StreamTableEnvironment
+ __config: AgentConfiguration
def __init__(
- self, input: DataStream, t_env: Optional[StreamTableEnvironment] = None
+ self, input: DataStream, config: AgentConfiguration, t_env:
Optional[StreamTableEnvironment] = None
) -> None:
"""Init method of RemoteAgentBuilder."""
self.__input = input
self.__t_env = t_env
+ self.__config = config
def apply(self, agent: Agent) -> "AgentBuilder":
"""Set agent of execution environment.
@@ -65,7 +70,7 @@ class RemoteAgentBuilder(AgentBuilder):
if self.__agent_plan is not None:
err_msg = "RemoteAgentBuilder doesn't support apply multiple
agents yet."
raise RuntimeError(err_msg)
- self.__agent_plan = AgentPlan.from_agent(agent)
+ self.__agent_plan = AgentPlan.from_agent(agent, self.__config)
return self
def to_datastream(
@@ -133,10 +138,26 @@ class
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
"""Implementation of AgentsExecutionEnvironment for execution with
DataStream."""
__env: StreamExecutionEnvironment
+ __config: AgentConfiguration
def __init__(self, env: StreamExecutionEnvironment) -> None:
"""Init method of RemoteExecutionEnvironment."""
self.__env = env
+ self.__config = AgentConfiguration()
+ flink_conf_dir = os.environ.get("FLINK_CONF_DIR")
+ if flink_conf_dir is not None:
+ config_dir = Path(flink_conf_dir) / "config.yaml"
+ self.__config.load_from_file(str(config_dir))
+
+ def get_config(self, path: Optional[str] = None) -> AgentConfiguration:
+ """Get the writable configuration for flink agents.
+
+ Returns:
+ -------
+ LocalConfiguration
+ The configuration for flink agents.
+ """
+ return self.__config
@staticmethod
def __process_input_datastream(
@@ -166,7 +187,7 @@ class
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
"""
input = self.__process_input_datastream(input, key_selector)
- return RemoteAgentBuilder(input=input)
+ return RemoteAgentBuilder(input=input, config=self.__config)
def from_table(
self,
@@ -190,7 +211,7 @@ class
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
input = input.map(lambda x: x, output_type=PickledBytesTypeInfo())
input = self.__process_input_datastream(input, key_selector)
- return RemoteAgentBuilder(input=input, t_env=t_env)
+ return RemoteAgentBuilder(input=input, config=self.__config,
t_env=t_env)
def from_list(self, input: List[Dict[str, Any]]) ->
"AgentsExecutionEnvironment":
"""Set input list of agent execution.
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 8ba7b83..a586a00 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -46,6 +46,7 @@ dependencies = [
"apache-flink==1.20.1",
"pydantic==2.11.4",
"docstring-parser==0.16",
+ "pyyaml==6.0.2",
#TODO: Seperate integration dependencies from project
"ollama==0.4.8",
"dashscope~=1.24.2",
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index 41644d5..a6c4e7a 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -18,6 +18,7 @@
package org.apache.flink.agents.runtime.context;
import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.configuration.ReadableConfiguration;
import org.apache.flink.agents.api.context.MemoryObject;
import org.apache.flink.agents.api.context.RunnerContext;
import org.apache.flink.agents.api.resource.Resource;
@@ -109,4 +110,9 @@ public class RunnerContextImpl implements RunnerContext {
}
return agentPlan.getResource(name, type);
}
+
+ @Override
+ public ReadableConfiguration getConfig() {
+ return agentPlan.getConfig();
+ }
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java
index 424c2fd..f4f9e6a 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java
@@ -21,15 +21,20 @@ package org.apache.flink.agents.runtime.env;
import org.apache.flink.agents.api.Agent;
import org.apache.flink.agents.api.AgentBuilder;
import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.agents.plan.AgentConfiguration;
import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.runtime.CompileUtils;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.YamlParserUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import java.io.File;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -43,8 +48,19 @@ public class RemoteExecutionEnvironment extends
AgentsExecutionEnvironment {
private final StreamExecutionEnvironment env;
+ private final AgentConfiguration config;
+
+ public static final String FLINK_CONF_FILENAME = "config.yaml";
+
public RemoteExecutionEnvironment(StreamExecutionEnvironment env) {
this.env = env;
+ final String configDir =
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+ this.config = loadAgentConfiguration(configDir);
+ }
+
+ @Override
+ public AgentConfiguration getConfig() {
+ return config;
}
@Override
@@ -55,13 +71,13 @@ public class RemoteExecutionEnvironment extends
AgentsExecutionEnvironment {
@Override
public <T, K> AgentBuilder fromDataStream(DataStream<T> input,
KeySelector<T, K> keySelector) {
- return new RemoteAgentBuilder<>(input, keySelector, env);
+ return new RemoteAgentBuilder<>(input, keySelector, env, config);
}
@Override
public <K> AgentBuilder fromTable(
Table input, StreamTableEnvironment tableEnv, KeySelector<Object,
K> keySelector) {
- return new RemoteAgentBuilder<>(input, tableEnv, keySelector, env);
+ return new RemoteAgentBuilder<>(input, tableEnv, keySelector, env,
config);
}
@Override
@@ -69,6 +85,24 @@ public class RemoteExecutionEnvironment extends
AgentsExecutionEnvironment {
env.execute();
}
+ @SuppressWarnings("unchecked")
+ public static AgentConfiguration loadAgentConfiguration(String configDir) {
+ try {
+ if (configDir == null) {
+ return new AgentConfiguration();
+ }
+ final Map<String, Object> configData =
+ (Map<String, Object>)
+ YamlParserUtils.loadYamlFile(new File(configDir,
FLINK_CONF_FILENAME))
+ .getOrDefault("agent", new HashMap<>());
+
+ return new AgentConfiguration(configData);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to load Flink Agents configuration from " +
configDir, e);
+ }
+ }
+
/** Implementation of AgentBuilder for remote execution environment. */
private static class RemoteAgentBuilder<T, K> implements AgentBuilder {
@@ -76,6 +110,7 @@ public class RemoteExecutionEnvironment extends
AgentsExecutionEnvironment {
private final KeySelector<T, K> keySelector;
private final StreamExecutionEnvironment env;
private final StreamTableEnvironment tableEnv;
+ private final AgentConfiguration config;
private AgentPlan agentPlan;
private DataStream<Object> outputDataStream;
@@ -84,11 +119,13 @@ public class RemoteExecutionEnvironment extends
AgentsExecutionEnvironment {
public RemoteAgentBuilder(
DataStream<T> inputDataStream,
KeySelector<T, K> keySelector,
- StreamExecutionEnvironment env) {
+ StreamExecutionEnvironment env,
+ AgentConfiguration config) {
this.inputDataStream = inputDataStream;
this.keySelector = keySelector;
this.env = env;
this.tableEnv = null;
+ this.config = config;
}
// Constructor for Table input
@@ -97,17 +134,19 @@ public class RemoteExecutionEnvironment extends
AgentsExecutionEnvironment {
Table inputTable,
StreamTableEnvironment tableEnv,
KeySelector<Object, K> keySelector,
- StreamExecutionEnvironment env) {
+ StreamExecutionEnvironment env,
+ AgentConfiguration config) {
this.inputDataStream = (DataStream<T>)
tableEnv.toDataStream(inputTable);
this.keySelector = (KeySelector<T, K>) keySelector;
this.env = env;
this.tableEnv = tableEnv;
+ this.config = config;
}
@Override
public AgentBuilder apply(Agent agent) {
try {
- this.agentPlan = new AgentPlan(agent);
+ this.agentPlan = new AgentPlan(agent, config);
return this;
} catch (Exception e) {
throw new RuntimeException("Failed to create agent plan from
agent", e);
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironmentTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironmentTest.java
new file mode 100644
index 0000000..4784a28
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironmentTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.env;
+
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.util.UUID;
+
+import static
org.apache.flink.agents.runtime.env.RemoteExecutionEnvironment.FLINK_CONF_FILENAME;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class RemoteExecutionEnvironmentTest {
+ @TempDir private File tmpDir;
+
+ @Test
+ void testLoadAgentConfiguration() throws FileNotFoundException {
+ File confFile = new File(tmpDir, FLINK_CONF_FILENAME);
+
+ try (final PrintWriter pw = new PrintWriter(confFile)) {
+ pw.println("agent:");
+ pw.println(" key1: ");
+ pw.println(" key2: v1");
+ pw.println(" key3: 'v2'");
+ pw.println(" key4: 1");
+ pw.println(" key5: 'v3'");
+ pw.println(" key6: 1.5");
+ pw.println(" key7: true");
+ }
+
+ AgentConfiguration conf =
+
RemoteExecutionEnvironment.loadAgentConfiguration(tmpDir.getAbsolutePath());
+
+ assertThat(conf.getConfData().keySet()).hasSize(6);
+ assert conf.getConfData().get("key1.key2").equals("v1");
+ assert conf.getConfData().get("key1.key3").equals("v2");
+ assert conf.getConfData().get("key4").equals(1);
+ assert conf.getConfData().get("key5").equals("v3");
+ assert conf.getConfData().get("key6").equals(1.5);
+ assert conf.getConfData().get("key7").equals(true);
+ }
+
+ @Test
+ void testLoadAgentConfigurationFailIfNotLoaded() {
+ assertThatThrownBy(
+ () ->
+
RemoteExecutionEnvironment.loadAgentConfiguration(
+ "/some/path/" + UUID.randomUUID()))
+ .isInstanceOf(RuntimeException.class);
+ }
+
+ @Test
+ void testLoadAgentConfigurationFailIfNull() {
+ AgentConfiguration conf =
RemoteExecutionEnvironment.loadAgentConfiguration(null);
+ assertThat(conf.getConfData()).isEmpty();
+ }
+}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java
index 2b95f37..ca49b85 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.agents.runtime.memory;
+import org.apache.flink.agents.api.configuration.ReadableConfiguration;
import org.apache.flink.agents.api.context.MemoryObject;
import org.apache.flink.agents.api.context.MemoryRef;
import org.apache.flink.agents.api.context.RunnerContext;
@@ -89,6 +90,11 @@ public class MemoryRefTest {
public Resource getResource(String name, ResourceType type) throws
Exception {
return null;
}
+
+ @Override
+ public ReadableConfiguration getConfig() {
+ return null;
+ }
}
@BeforeEach
diff --git a/tools/e2e.sh b/tools/e2e.sh
old mode 100644
new mode 100755
index f967c34..1a15cfb
--- a/tools/e2e.sh
+++ b/tools/e2e.sh
@@ -50,6 +50,7 @@ echo "tmpdir:$tempdir"
jar_path=e2e-test/agent-plan-compatibility-test/target/flink-agents*.jar
run_test "Agent plan compatibility end-to-end test" "bash
e2e-test/test-scripts/test_agent_plan_compatibility.sh $tempdir $jar_path"
+run_test "Cross-Language Config Option end-to-end test" "bash
e2e-test/test-scripts/test_java_config_in_python.sh"
printf "\n$PASSED/$TOTAL bash e2e-tests passed\n"