This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 42aea1c  [api][runtime] Introduce configuration mechanism (#122)
42aea1c is described below

commit 42aea1cc26cba4704eef33c21f55149ebdc6aa1b
Author: Eugene <[email protected]>
AuthorDate: Thu Sep 4 15:20:38 2025 +0800

    [api][runtime] Introduce configuration mechanism (#122)
---
 .../agents/api/AgentsExecutionEnvironment.java     |   8 +
 .../api/configuration/AgentConfigOptions.java      |  26 +++
 .../agents/api/configuration/ConfigOption.java     | 102 +++++++++
 .../agents/api/configuration/Configuration.java    |  24 +++
 .../api/configuration/ReadableConfiguration.java   |  84 ++++++++
 .../api/configuration/WritableConfiguration.java   |  77 +++++++
 .../flink/agents/api/context/RunnerContext.java    |   8 +
 .../test-scripts/test_java_config_in_python.sh     |  40 ++++
 .../flink/agents/plan/AgentConfiguration.java      | 176 ++++++++++++++++
 .../org/apache/flink/agents/plan/AgentPlan.java    |  30 +++
 .../plan/serializer/AgentPlanJsonDeserializer.java |  16 +-
 .../plan/serializer/AgentPlanJsonSerializer.java   |  19 ++
 .../flink/agents/plan/AgentConfigurationTest.java  | 218 ++++++++++++++++++++
 .../serializer/AgentPlanJsonDeserializerTest.java  |  10 +
 .../serializer/AgentPlanJsonSerializerTest.java    |  11 +-
 .../src/test/resources/agent_plans/agent_plan.json |   8 +
 python/flink_agents/api/configuration.py           | 169 +++++++++++++++
 python/flink_agents/api/core_options.py            |  70 +++++++
 python/flink_agents/api/execution_environment.py   |  11 +
 python/flink_agents/api/runner_context.py          |  11 +
 python/flink_agents/plan/agent_plan.py             |   7 +-
 python/flink_agents/plan/configuration.py          | 173 ++++++++++++++++
 python/flink_agents/plan/resource_provider.py      |  26 ++-
 ...py => create_python_option_from_java_option.py} |  35 ++--
 .../compatibility/generate_agent_plan_json.py      |   3 +-
 .../plan/tests/resources/agent_plan.json           |   5 +
 python/flink_agents/plan/tests/test_agent_plan.py  |  11 +-
 .../flink_agents/plan/tests/test_configuration.py  | 228 +++++++++++++++++++++
 .../flink_agents/runtime/flink_runner_context.py   |  12 ++
 .../runtime/local_execution_environment.py         |  19 +-
 python/flink_agents/runtime/local_runner.py        |  19 +-
 .../runtime/remote_execution_environment.py        |  29 ++-
 python/pyproject.toml                              |   1 +
 .../agents/runtime/context/RunnerContextImpl.java  |   6 +
 .../runtime/env/RemoteExecutionEnvironment.java    |  49 ++++-
 .../env/RemoteExecutionEnvironmentTest.java        |  77 +++++++
 .../flink/agents/runtime/memory/MemoryRefTest.java |   6 +
 tools/e2e.sh                                       |   1 +
 38 files changed, 1777 insertions(+), 48 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java 
b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
index e01e0a4..efac149 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.agents.api;
 
+import org.apache.flink.agents.api.configuration.Configuration;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -86,6 +87,13 @@ public abstract class AgentsExecutionEnvironment {
         return getExecutionEnvironment(null);
     }
 
+    /**
+     * Returns a writable configuration object for setting configuration 
values.
+     *
+     * @return the WritableConfiguration instance used to modify configuration 
settings
+     */
+    public abstract Configuration getConfig();
+
     /**
      * Set input for agents from a list. Used for local execution.
      *
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
 
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
new file mode 100644
index 0000000..49e476d
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.configuration;
+
+/** The set of configuration options for agents parameters. */
+public class AgentConfigOptions {
+
+    /** The config parameter specifies the directory for the FileEvent file. */
+    public static final ConfigOption<String> BASE_LOG_DIR =
+            new ConfigOption<>("baseLogDir", String.class, null);
+}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/configuration/ConfigOption.java 
b/api/src/main/java/org/apache/flink/agents/api/configuration/ConfigOption.java
new file mode 100644
index 0000000..b43688e
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/configuration/ConfigOption.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.configuration;
+
+import java.util.Objects;
+
+/** A {@code ConfigOption} describes a configuration parameter. */
+public class ConfigOption<T> {
+    private final String key;
+    private final Class<T> type;
+    private final T defaultValue;
+
+    /**
+     * Constructs a new configuration option.
+     *
+     * @param key The configuration key name
+     * @param type The expected type of the configuration value
+     * @param defaultValue The default value for this configuration option 
(can be null)
+     */
+    public ConfigOption(String key, Class<T> type, T defaultValue) {
+        this.key = Objects.requireNonNull(key);
+        this.type = Objects.requireNonNull(type);
+        this.defaultValue = defaultValue;
+    }
+
+    /**
+     * Gets the configuration key.
+     *
+     * @return the key
+     */
+    public String getKey() {
+        return key;
+    }
+
+    /**
+     * Gets the expected type of the configuration value.
+     *
+     * @return the type
+     */
+    public Class<T> getType() {
+        return type;
+    }
+
+    /**
+     * Gets the expected type name of the configuration value.
+     *
+     * @return the type name
+     */
+    public String getTypeName() {
+        return type.getTypeName();
+    }
+
+    /**
+     * Gets the default value of this configuration option.
+     *
+     * @return the default value, or null if not set
+     */
+    public T getDefaultValue() {
+        return defaultValue;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConfigOption<?> that = (ConfigOption<?>) o;
+        return key.equals(that.key);
+    }
+
+    @Override
+    public int hashCode() {
+        return key.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "ConfigOption{"
+                + "key='"
+                + key
+                + '\''
+                + ", type="
+                + type.getName()
+                + ", defaultValue="
+                + defaultValue
+                + '}';
+    }
+}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/configuration/Configuration.java
 
b/api/src/main/java/org/apache/flink/agents/api/configuration/Configuration.java
new file mode 100644
index 0000000..743a939
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/configuration/Configuration.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.configuration;
+
+/**
+ * A configuration interface that provides both read and write access. 
Combines the capabilities of
+ * {@link ReadableConfiguration} and {@link WritableConfiguration}.
+ */
+public interface Configuration extends ReadableConfiguration, 
WritableConfiguration {}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/configuration/ReadableConfiguration.java
 
b/api/src/main/java/org/apache/flink/agents/api/configuration/ReadableConfiguration.java
new file mode 100644
index 0000000..8287629
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/configuration/ReadableConfiguration.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.configuration;
+
+/** Read access to a configuration object. */
+public interface ReadableConfiguration {
+    /**
+     * Get the integer configuration value by key.
+     *
+     * @param key The configuration key to retrieve
+     * @param defaultValue The default value to return if key is not found
+     * @return The integer value associated with the key or the default value
+     */
+    Integer getInt(String key, Integer defaultValue);
+
+    /**
+     * Get the long configuration value by key.
+     *
+     * @param key The configuration key to retrieve
+     * @param defaultValue The default value to return if key is not found
+     * @return The long value associated with the key or the default value
+     */
+    Long getLong(String key, Long defaultValue);
+
+    /**
+     * Get the float configuration value by key.
+     *
+     * @param key The configuration key to retrieve
+     * @param defaultValue The default value to return if key is not found
+     * @return The float value associated with the key or the default value
+     */
+    Float getFloat(String key, Float defaultValue);
+
+    /**
+     * Get the double configuration value by key.
+     *
+     * @param key The configuration key to retrieve
+     * @param defaultValue The default value to return if key is not found
+     * @return The double value associated with the key or the default value
+     */
+    Double getDouble(String key, Double defaultValue);
+
+    /**
+     * Get the boolean configuration value by key.
+     *
+     * @param key The configuration key to retrieve
+     * @param defaultValue The default value to return if key is not found
+     * @return The boolean value associated with the key or the default value
+     */
+    Boolean getBool(String key, Boolean defaultValue);
+
+    /**
+     * Get the string configuration value by key.
+     *
+     * @param key The configuration key to retrieve
+     * @param defaultValue The default value to return if key is not found
+     * @return The string value associated with the key or the default value
+     */
+    String getStr(String key, String defaultValue);
+
+    /**
+     * Get the configuration value by ConfigOption.
+     *
+     * @param option The metadata of the option to read
+     * @param <T> The type of the configuration value
+     * @return The value of the given option
+     */
+    <T> T get(ConfigOption<T> option);
+}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/configuration/WritableConfiguration.java
 
b/api/src/main/java/org/apache/flink/agents/api/configuration/WritableConfiguration.java
new file mode 100644
index 0000000..1c10165
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/configuration/WritableConfiguration.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.configuration;
+
+/** Write access to a configuration object. */
+public interface WritableConfiguration {
+    /**
+     * Set the string configuration value using the key.
+     *
+     * @param key The configuration key to set
+     * @param value The string value to set for the key
+     */
+    void setStr(String key, String value);
+
+    /**
+     * Set the integer configuration value using the key.
+     *
+     * @param key The configuration key to set
+     * @param value The integer value to set for the key
+     */
+    void setInt(String key, int value);
+
+    /**
+     * Set the long configuration value using the key.
+     *
+     * @param key The configuration key to set
+     * @param value The long value to set for the key
+     */
+    void setLong(String key, long value);
+
+    /**
+     * Set the float configuration value using the key.
+     *
+     * @param key The configuration key to set
+     * @param value The float value to set for the key
+     */
+    void setFloat(String key, float value);
+
+    /**
+     * Set the double configuration value using the key.
+     *
+     * @param key The configuration key to set
+     * @param value The double value to set for the key
+     */
+    void setDouble(String key, double value);
+
+    /**
+     * Set the boolean configuration value using the key.
+     *
+     * @param key The configuration key to set
+     * @param value The boolean value to set for the key
+     */
+    void setBool(String key, boolean value);
+
+    /**
+     * Set the configuration value using the ConfigOption.
+     *
+     * @param option The config option to set
+     * @param value The value to set for the key
+     */
+    <T> void set(ConfigOption<T> option, T value);
+}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java 
b/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java
index 5776b09..67bf13a 100644
--- a/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java
+++ b/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java
@@ -18,6 +18,7 @@
 package org.apache.flink.agents.api.context;
 
 import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.configuration.ReadableConfiguration;
 import org.apache.flink.agents.api.metrics.FlinkAgentsMetricGroup;
 import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceType;
@@ -65,4 +66,11 @@ public interface RunnerContext {
      * @throws Exception if the resource cannot be found or created
      */
     Resource getResource(String name, ResourceType type) throws Exception;
+
+    /**
+     * Gets the configuration for Flink Agents.
+     *
+     * @return the configuration for Flink Agents.
+     */
+    ReadableConfiguration getConfig();
 }
diff --git a/e2e-test/test-scripts/test_java_config_in_python.sh 
b/e2e-test/test-scripts/test_java_config_in_python.sh
new file mode 100644
index 0000000..5302283
--- /dev/null
+++ b/e2e-test/test-scripts/test_java_config_in_python.sh
@@ -0,0 +1,40 @@
+#
+#   Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+root_dir=$(pwd)
+
+echo $root_dir
+
+python_script_path=$root_dir/python/flink_agents/plan/tests/compatibility
+
+function test_create_python_option_from_java_option {
+  python $python_script_path/create_python_option_from_java_option.py
+
+  ret=$?
+  if [ "$ret" != "0" ]
+  then
+    echo "There is failure when create python option from java option, please 
check the log for details."
+    rm -f $json_path
+    exit $ret
+  fi
+
+  rm -f $json_path
+}
+
+test_create_python_option_from_java_option
\ No newline at end of file
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/AgentConfiguration.java 
b/plan/src/main/java/org/apache/flink/agents/plan/AgentConfiguration.java
new file mode 100644
index 0000000..34d6efa
--- /dev/null
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentConfiguration.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.plan;
+
+import org.apache.flink.agents.api.configuration.ConfigOption;
+import org.apache.flink.agents.api.configuration.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Agent configuration which stores key/value pairs. */
+public class AgentConfiguration implements Configuration {
+    private final Map<String, Object> confData;
+
+    public AgentConfiguration() {
+        this.confData = new HashMap<>();
+    }
+
+    public AgentConfiguration(Map<String, Object> confData) {
+        this.confData = flatten(confData, "", ".");
+    }
+
+    public Map<String, Object> getConfData() {
+        return confData;
+    }
+
+    @Override
+    public void setStr(String key, String value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public void setInt(String key, int value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public void setLong(String key, long value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public void setFloat(String key, float value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public void setDouble(String key, double value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public void setBool(String key, boolean value) {
+        confData.put(key, value);
+    }
+
+    @Override
+    public <T> void set(ConfigOption<T> option, T value) {
+        if (value == null && option.getDefaultValue() != null) {
+            return;
+        }
+        confData.put(option.getKey(), value);
+    }
+
+    @Override
+    public Integer getInt(String key, Integer defaultValue) {
+        return Optional.ofNullable(confData.get(key))
+                .map(Object::toString)
+                .map(Integer::parseInt)
+                .orElse(defaultValue);
+    }
+
+    @Override
+    public Long getLong(String key, Long defaultValue) {
+        return Optional.ofNullable(confData.get(key))
+                .map(Object::toString)
+                .map(Long::parseLong)
+                .orElse(defaultValue);
+    }
+
+    @Override
+    public Float getFloat(String key, Float defaultValue) {
+        return Optional.ofNullable(confData.get(key))
+                .map(Object::toString)
+                .map(Float::parseFloat)
+                .orElse(defaultValue);
+    }
+
+    @Override
+    public Double getDouble(String key, Double defaultValue) {
+        return Optional.ofNullable(confData.get(key))
+                .map(Object::toString)
+                .map(Double::parseDouble)
+                .orElse(defaultValue);
+    }
+
+    @Override
+    public Boolean getBool(String key, Boolean defaultValue) {
+        return Optional.ofNullable(confData.get(key))
+                .map(Object::toString)
+                .map(Boolean::valueOf)
+                .orElse(defaultValue);
+    }
+
+    @Override
+    public String getStr(String key, String defaultValue) {
+        return 
Optional.ofNullable(confData.get(key)).map(Object::toString).orElse(defaultValue);
+    }
+
+    @Override
+    public <T> T get(ConfigOption<T> option) {
+        Object rawValue = confData.get(option.getKey());
+        if (rawValue == null) {
+            return option.getDefaultValue();
+        }
+
+        Class<T> targetType = option.getType();
+
+        if (targetType.isAssignableFrom(rawValue.getClass())) {
+            return targetType.cast(rawValue);
+        } else if (String.class.equals(targetType)) {
+            return targetType.cast(rawValue.toString());
+        } else if (Integer.class.equals(targetType)) {
+            return targetType.cast(Integer.parseInt(rawValue.toString()));
+        } else if (Long.class.equals(targetType)) {
+            return targetType.cast(Long.parseLong(rawValue.toString()));
+        } else if (Float.class.equals(targetType)) {
+            return targetType.cast(Float.parseFloat(rawValue.toString()));
+        } else if (Double.class.equals(targetType)) {
+            return targetType.cast(Double.parseDouble(rawValue.toString()));
+        } else if (Boolean.class.equals(targetType)) {
+            return targetType.cast(Boolean.parseBoolean(rawValue.toString()));
+        } else {
+            throw new ClassCastException(
+                    "Unsupported type conversion from "
+                            + rawValue.getClass().getName()
+                            + " to "
+                            + targetType.getName());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Map<String, Object> flatten(
+            Map<String, Object> config, String keyPrefix, String keySeparator) 
{
+        final Map<String, Object> flattenedMap = new HashMap<>();
+
+        config.forEach(
+                (key, value) -> {
+                    String flattenedKey = keyPrefix + key;
+                    if (value instanceof Map) {
+                        Map<String, Object> e = (Map<String, Object>) value;
+                        flattenedMap.putAll(flatten(e, flattenedKey + 
keySeparator, keySeparator));
+                    } else {
+                        flattenedMap.put(flattenedKey, value);
+                    }
+                });
+
+        return flattenedMap;
+    }
+}
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java 
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index b3bab29..3efee14 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -61,6 +61,8 @@ public class AgentPlan implements Serializable {
     /** Two-level mapping of resource type to resource name to resource 
provider. */
     private Map<ResourceType, Map<String, ResourceProvider>> resourceProviders;
 
+    private AgentConfiguration config;
+
     /** Cache for instantiated resources. */
     private transient Map<ResourceType, Map<String, Resource>> resourceCache;
 
@@ -68,6 +70,7 @@ public class AgentPlan implements Serializable {
         this.actions = actions;
         this.actionsByEvent = actionsByEvent;
         this.resourceProviders = new HashMap<>();
+        this.config = new AgentConfiguration();
         this.resourceCache = new ConcurrentHashMap<>();
     }
 
@@ -79,6 +82,19 @@ public class AgentPlan implements Serializable {
         this.actionsByEvent = actionsByEvent;
         this.resourceProviders = resourceProviders;
         this.resourceCache = new ConcurrentHashMap<>();
+        this.config = new AgentConfiguration();
+    }
+
+    public AgentPlan(
+            Map<String, Action> actions,
+            Map<String, List<Action>> actionsByEvent,
+            Map<ResourceType, Map<String, ResourceProvider>> resourceProviders,
+            AgentConfiguration config) {
+        this.actions = actions;
+        this.actionsByEvent = actionsByEvent;
+        this.resourceProviders = resourceProviders;
+        this.resourceCache = new ConcurrentHashMap<>();
+        this.config = config;
     }
 
     /**
@@ -89,9 +105,14 @@ public class AgentPlan implements Serializable {
      * @throws Exception if there's an error creating actions from the agent
      */
     public AgentPlan(Agent agent) throws Exception {
+        this(agent, new AgentConfiguration());
+    }
+
+    public AgentPlan(Agent agent, AgentConfiguration config) throws Exception {
         this(new HashMap<>(), new HashMap<>());
         extractActionsFromAgent(agent);
         extractResourceProvidersFromAgent(agent);
+        this.config = config;
     }
 
     public Map<String, Action> getActions() {
@@ -141,6 +162,14 @@ public class AgentPlan implements Serializable {
         return resource;
     }
 
+    public AgentConfiguration getConfig() {
+        return config;
+    }
+
+    public Map<String, Object> getConfigData() {
+        return config.getConfData();
+    }
+
     private void writeObject(ObjectOutputStream out) throws IOException {
         String serializedStr = new ObjectMapper().writeValueAsString(this);
         out.writeUTF(serializedStr);
@@ -152,6 +181,7 @@ public class AgentPlan implements Serializable {
         this.actions = agentPlan.getActions();
         this.actionsByEvent = agentPlan.getActionsByEvent();
         this.resourceProviders = agentPlan.getResourceProviders();
+        this.config = agentPlan.getConfig();
         this.resourceCache = new ConcurrentHashMap<>();
     }
 
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializer.java
 
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializer.java
index f278678..5b8e8f8 100644
--- 
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializer.java
+++ 
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.agents.plan.serializer;
 
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.plan.Action;
+import org.apache.flink.agents.plan.AgentConfiguration;
 import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JacksonException;
@@ -29,6 +30,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.Deseriali
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 
 import java.io.IOException;
@@ -119,6 +121,18 @@ public class AgentPlanJsonDeserializer extends 
StdDeserializer<AgentPlan> {
             }
         }
 
-        return new AgentPlan(actions, actionsByEvent, resourceProviders);
+        // Deserialize config data
+        JsonNode configNode = node.get("config");
+        Map<String, Object> configData = new HashMap<>();
+        if (configNode != null && configNode.isObject()) {
+            JsonNode configDataNode = configNode.get("conf_data");
+            if (configDataNode != null && configDataNode.isObject()) {
+                ObjectMapper mapper = new ObjectMapper();
+                configData = mapper.convertValue(configDataNode, Map.class);
+            }
+        }
+        AgentConfiguration config = new AgentConfiguration(configData);
+
+        return new AgentPlan(actions, actionsByEvent, resourceProviders, 
config);
     }
 }
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializer.java
 
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializer.java
index 05f2ea6..ca61c79 100644
--- 
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializer.java
+++ 
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializer.java
@@ -105,5 +105,24 @@ public class AgentPlanJsonSerializer extends 
StdSerializer<AgentPlan> {
             jsonGenerator.writeEndObject();
         }
         jsonGenerator.writeEndObject();
+
+        // Serialize config data
+        jsonGenerator.writeFieldName("config");
+        jsonGenerator.writeStartObject();
+        jsonGenerator.writeFieldName("conf_data");
+        jsonGenerator.writeStartObject();
+        agentPlan
+                .getConfigData()
+                .forEach(
+                        (key, value) -> {
+                            try {
+                                jsonGenerator.writeFieldName(key);
+                                jsonGenerator.writeObject(value);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+        jsonGenerator.writeEndObject();
+        jsonGenerator.writeEndObject();
     }
 }
diff --git 
a/plan/src/test/java/org/apache/flink/agents/plan/AgentConfigurationTest.java 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentConfigurationTest.java
new file mode 100644
index 0000000..125c737
--- /dev/null
+++ 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentConfigurationTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.plan;
+
+import org.apache.flink.agents.api.configuration.ConfigOption;
+import org.junit.jupiter.api.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AgentConfigurationTest {
+
+    @Test
+    void testGetInt() {
+        Map<String, Object> data = new HashMap<>();
+        data.put("int_key", 42);
+        data.put("str_key", "123");
+        data.put("invalid_key", "not_an_int");
+        AgentConfiguration config = new AgentConfiguration(data);
+
+        assertEquals(Integer.valueOf(42), config.getInt("int_key", null));
+        assertEquals(Integer.valueOf(123), config.getInt("str_key", null));
+        assertNull(config.getInt("missing_key", null));
+        assertEquals(Integer.valueOf(999), config.getInt("missing_key", 999));
+
+        assertThrows(
+                NumberFormatException.class,
+                () -> {
+                    config.getInt("invalid_key", null);
+                });
+    }
+
+    @Test
+    void testGetLong() {
+        Map<String, Object> data = new HashMap<>();
+        data.put("long_key", 123456789012L);
+        data.put("str_long_key", "9876543210");
+        data.put("invalid_long_key", "not_a_long");
+
+        AgentConfiguration config = new AgentConfiguration(data);
+
+        assertEquals(Long.valueOf(123456789012L), config.getLong("long_key", 
null));
+
+        assertEquals(Long.valueOf(9876543210L), config.getLong("str_long_key", 
null));
+
+        assertNull(config.getLong("missing_key", null));
+
+        assertEquals(Long.valueOf(999999999999L), 
config.getLong("missing_key", 999999999999L));
+
+        assertThrows(
+                NumberFormatException.class,
+                () -> {
+                    config.getLong("invalid_long_key", null);
+                });
+    }
+
+    @Test
+    void testGetFloat() {
+        Map<String, Object> data = new HashMap<>();
+        data.put("float_key", 3.14f);
+        data.put("int_key", 42);
+        data.put("str_key", "2.5");
+        data.put("invalid_key", "not_a_float");
+        AgentConfiguration config = new AgentConfiguration(data);
+
+        assertEquals(Float.valueOf(3.14f), config.getFloat("float_key", null));
+        assertEquals(Float.valueOf(42.0f), config.getFloat("int_key", null));
+        assertEquals(Float.valueOf(2.5f), config.getFloat("str_key", null));
+        assertNull(config.getFloat("missing_key", null));
+        assertEquals(Float.valueOf(1.23f), config.getFloat("missing_key", 
1.23f));
+
+        assertThrows(
+                NumberFormatException.class,
+                () -> {
+                    config.getFloat("invalid_key", null);
+                });
+    }
+
+    @Test
+    void testGetDouble() {
+        Map<String, Object> data = new HashMap<>();
+        data.put("double_key", 3.14);
+        data.put("int_key", 42);
+        data.put("str_key", "2.5");
+        data.put("invalid_key", "not_a_double");
+
+        AgentConfiguration config = new AgentConfiguration(data);
+
+        assertEquals(Double.valueOf(3.14), config.getDouble("double_key", 
null));
+
+        assertEquals(Double.valueOf(42.0), config.getDouble("int_key", null));
+
+        assertEquals(Double.valueOf(2.5), config.getDouble("str_key", null));
+
+        assertNull(config.getDouble("missing_key", null));
+
+        assertEquals(Double.valueOf(1.23), config.getDouble("missing_key", 
1.23));
+
+        assertThrows(
+                NumberFormatException.class,
+                () -> {
+                    config.getDouble("invalid_key", null);
+                });
+    }
+
+    @Test
+    void testGetBool() {
+        Map<String, Object> data = new HashMap<>();
+        data.put("true_key", true);
+        data.put("false_key", false);
+        data.put("str_key", "true");
+        AgentConfiguration config = new AgentConfiguration(data);
+
+        assertTrue(config.getBool("true_key", false));
+        assertFalse(config.getBool("false_key", true));
+        assertNull(config.getBool("missing_key", null));
+        assertTrue(config.getBool("missing_key", true));
+
+        // Note: Boolean.valueOf("true") is true in Java
+        assertTrue(config.getBool("str_key", null));
+    }
+
+    @Test
+    void testGetStr() {
+        Map<String, Object> data = new HashMap<>();
+        data.put("str_key", "hello");
+        data.put("int_key", 42);
+        data.put("float_key", 3.14);
+        AgentConfiguration config = new AgentConfiguration(data);
+
+        assertEquals("hello", config.getStr("str_key", null));
+        assertEquals("42", config.getStr("int_key", null));
+        assertEquals("3.14", config.getStr("float_key", null));
+        assertNull(config.getStr("missing_key", null));
+        assertEquals("default", config.getStr("missing_key", "default"));
+    }
+
+    @Test
+    void testGetWithConfigOption() {
+        Map<String, Object> data = new HashMap<>();
+        data.put("config.str", "config.value");
+        data.put("config.int", 6789);
+        data.put("config.float", "45.5");
+        data.put("config.boolean", true);
+
+        AgentConfiguration config = new AgentConfiguration(data);
+
+        ConfigOption<String> strOption =
+                new ConfigOption<>("config.str", String.class, "default_str");
+        ConfigOption<Integer> intOption = new ConfigOption<>("config.int", 
Integer.class, 123);
+        ConfigOption<Long> longOption = new ConfigOption<>("config.int", 
Long.class, 123L);
+        ConfigOption<Float> floatOption = new ConfigOption<>("config.float", 
Float.class, 0.0f);
+        ConfigOption<Double> doubleOption = new ConfigOption<>("config.float", 
Double.class, 0.0);
+        ConfigOption<Boolean> boolOption =
+                new ConfigOption<>("config.boolean", Boolean.class, false);
+
+        assertEquals("config.value", config.get(strOption));
+        assertEquals(Integer.valueOf(6789), config.get(intOption));
+        assertEquals(Long.valueOf(6789L), config.get(longOption));
+        assertEquals(Float.valueOf(45.5f), config.get(floatOption));
+        assertEquals(Double.valueOf(45.5), config.get(doubleOption));
+        assertEquals(Boolean.TRUE, config.get(boolOption));
+
+        ConfigOption<Integer> missingOption = new 
ConfigOption<>("missing.key1", Integer.class, 22);
+        assertEquals(Integer.valueOf(22), config.get(missingOption));
+
+        ConfigOption<Integer> missingKey = new ConfigOption<>("missing.key2", 
Integer.class, null);
+        assertNull(config.get(missingKey));
+    }
+
+    @Test
+    void testGetWithDefaultValue() {
+        ConfigOption<String> defaultStr =
+                new ConfigOption<>("default.str", String.class, 
"default_value");
+        ConfigOption<Integer> defaultInt = new ConfigOption<>("default.int", 
Integer.class, 100);
+        ConfigOption<Double> defaultDouble =
+                new ConfigOption<>("default.double", Double.class, 2.5);
+
+        AgentConfiguration config = new AgentConfiguration();
+
+        assertEquals("default_value", config.get(defaultStr));
+        assertEquals(Integer.valueOf(100), config.get(defaultInt));
+        assertEquals(Double.valueOf(2.5), config.get(defaultDouble));
+    }
+
+    @Test
+    void testGetWithNullAndDefault() {
+        ConfigOption<String> nullableStr =
+                new ConfigOption<>("nullable.str", String.class, "default");
+
+        AgentConfiguration config = new AgentConfiguration();
+        config.setStr("nullable.str", null);
+
+        assertEquals("default", config.get(nullableStr));
+    }
+}
diff --git 
a/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializerTest.java
 
b/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializerTest.java
index 8bb231f..2cd3018 100644
--- 
a/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializerTest.java
+++ 
b/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonDeserializerTest.java
@@ -28,7 +28,9 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
+import java.util.Map;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -63,6 +65,14 @@ public class AgentPlanJsonDeserializerTest {
                 agentPlan.getActionsByEvent().get(InputEvent.class.getName()));
         assertEquals(
                 List.of(secondAction), 
agentPlan.getActionsByEvent().get(MyEvent.class.getName()));
+
+        // Check the flink agent config
+        Map<String, Object> configData = agentPlan.getConfigData();
+        assertThat(configData.keySet()).hasSize(4);
+        assertEquals(1, configData.get("key1"));
+        assertEquals(1.5, configData.get("key2"));
+        assertEquals(true, configData.get("key3"));
+        assertEquals("v1", configData.get("key4"));
     }
 
     private static class MyEvent extends Event {}
diff --git 
a/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializerTest.java
 
b/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializerTest.java
index c703cc4..ec18751 100644
--- 
a/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializerTest.java
+++ 
b/plan/src/test/java/org/apache/flink/agents/plan/serializer/AgentPlanJsonSerializerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.agents.api.InputEvent;
 import org.apache.flink.agents.api.OutputEvent;
 import org.apache.flink.agents.api.context.RunnerContext;
 import org.apache.flink.agents.plan.Action;
+import org.apache.flink.agents.plan.AgentConfiguration;
 import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.plan.JavaFunction;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -190,8 +191,12 @@ public class AgentPlanJsonSerializerTest {
         // Create a test agent
         TestAgent agent = new TestAgent();
 
+        Map<String, Object> confData = new HashMap<>();
+        confData.put("config.key", "config.value");
+        AgentConfiguration agentConfiguration = new 
AgentConfiguration(confData);
+
         // Create AgentPlan from the agent
-        AgentPlan agentPlan = new AgentPlan(agent);
+        AgentPlan agentPlan = new AgentPlan(agent, agentConfiguration);
 
         // Serialize the agent plan to JSON
         String json = new ObjectMapper().writeValueAsString(agentPlan);
@@ -199,6 +204,7 @@ public class AgentPlanJsonSerializerTest {
         // Verify the JSON contains the expected fields
         assertThat(json).contains("\"actions\":{");
         assertThat(json).contains("\"actions_by_event\":{");
+        assertThat(json).contains("\"config\":{");
 
         // Verify that the actions from @Action annotated methods are present
         assertThat(json).contains("\"handleInputEvent\"");
@@ -217,5 +223,8 @@ public class AgentPlanJsonSerializerTest {
 
         // Verify that regularMethod is not included (not annotated)
         assertThat(json).doesNotContain("\"regularMethod\"");
+
+        // Verify that config data from AgentConfiguration is present
+        
assertThat(json).contains("\"conf_data\":{\"config.key\":\"config.value\"}");
     }
 }
diff --git a/plan/src/test/resources/agent_plans/agent_plan.json 
b/plan/src/test/resources/agent_plans/agent_plan.json
index 31af952..e66e20e 100644
--- a/plan/src/test/resources/agent_plans/agent_plan.json
+++ b/plan/src/test/resources/agent_plans/agent_plan.json
@@ -34,5 +34,13 @@
     
"org.apache.flink.agents.plan.serializer.AgentPlanJsonDeserializerTest$MyEvent":
 [
       "second_action"
     ]
+  },
+  "config": {
+    "conf_data": {
+      "key1": 1,
+      "key2": 1.5,
+      "key3": true,
+      "key4": "v1"
+    }
   }
 }
diff --git a/python/flink_agents/api/configuration.py 
b/python/flink_agents/api/configuration.py
new file mode 100644
index 0000000..15d2766
--- /dev/null
+++ b/python/flink_agents/api/configuration.py
@@ -0,0 +1,169 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from abc import ABC, abstractmethod
+from typing import Any, Optional, Type
+
+
+class ConfigOption:
+    """A configuration option defines a configuration key with its type and 
default
+    value.
+
+    Args:
+        key: The configuration key name
+        config_type: The expected type of the configuration value (int, float, 
str,
+        or bool)
+        default: The default value for this configuration option
+    """
+
+    def __init__(self, key: str, config_type: Type[Any], default: 
Optional[Any]=None) -> None:
+        """Initialize a configuration option."""
+        self._key = key
+        self._type = config_type
+        self._default_value = default
+    def get_key(self) -> str:
+        """Gets the configuration key."""
+        return self._key
+
+    def get_type(self) -> Type[Any]:
+        """Returns the type of the configuration value."""
+        return self._type
+
+    def get_default_value(self) -> Any:
+        """Returns the default value."""
+        return self._default_value
+
+class WritableConfiguration(ABC):
+    """Abstract base class providing write access to a configuration object.
+
+    This class enables modification of configuration settings.
+    """
+
+    @abstractmethod
+    def set_str(self, key: str, value: str) -> None:
+        """Set the string configuration value using the key.
+
+        Args:
+            key: The configuration key to set
+            value: The string value to set for the key
+        """
+
+    @abstractmethod
+    def set_int(self, key: str, value: int) -> None:
+        """Set the int configuration value using the key.
+
+        Args:
+            key: The configuration key to set
+            value: The integer value to set for the key
+        """
+
+    @abstractmethod
+    def set_float(self, key: str, value: float) -> None:
+        """Set the float configuration value using the key.
+
+        Args:
+            key: The configuration key to set
+            value: The float value to set for the key
+        """
+
+    @abstractmethod
+    def set_bool(self, key: str, value: bool) -> None:  # noqa: FBT001
+        """Set the boolean configuration value using the key.
+
+        Args:
+            key: The configuration key to set
+            value: The boolean value to set for the key
+        """
+
+    @abstractmethod
+    def set(self, option: ConfigOption, value: Any) -> None:
+        """Set the configuration value using the ConfigOption.
+
+        Args:
+            option: The config option to set
+            value: The value to set for the key
+        """
+
+class ReadableConfiguration(ABC):
+    """Abstract base class providing read access to a configuration object.
+
+    This class enables retrieval of configuration settings.
+    """
+
+    @abstractmethod
+    def get_int(self, key: str, default: Optional[int]=None) -> int:
+        """Get the int configuration value by key.
+
+        Args:
+            key: The configuration key to retrieve
+            default: The default value to return if key is not found
+
+        Returns:
+            The integer value associated with the key or the default value
+        """
+
+    @abstractmethod
+    def get_float(self, key: str, default: Optional[float]=None) -> float:
+        """Get the float configuration value by key.
+
+        Args:
+            key: The configuration key to retrieve
+            default: The default value to return if key is not found
+
+        Returns:
+            The float value associated with the key or the default value
+        """
+
+    @abstractmethod
+    def get_bool(self, key: str, default: Optional[bool]=None) -> bool:
+        """Get the boolean configuration value by key.
+
+        Args:
+            key: The configuration key to retrieve
+            default: The default value to return if key is not found
+
+        Returns:
+            The boolean value associated with the key or the default value
+        """
+
+    @abstractmethod
+    def get_str(self, key: str, default: Optional[str]=None) -> str:
+        """Get the string configuration value by key.
+
+        Args:
+            key: The configuration key to retrieve
+            default: The default value to return if key is not found
+
+        Returns:
+            The string value associated with the key or the default value
+        """
+
+    @abstractmethod
+    def get(self, option: ConfigOption) -> Any:
+        """Get the configuration value by ConfigOption.
+
+        Args:
+            option: The metadata of the option to read
+
+        Returns:
+            The value of the given option
+        """
+
+class Configuration(WritableConfiguration, ReadableConfiguration, ABC):
+    """A configuration object that provides both read and write access to a
+    configuration object.
+    """
diff --git a/python/flink_agents/api/core_options.py 
b/python/flink_agents/api/core_options.py
new file mode 100644
index 0000000..5599e24
--- /dev/null
+++ b/python/flink_agents/api/core_options.py
@@ -0,0 +1,70 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Any
+
+from pyflink.java_gateway import get_gateway
+
+from flink_agents.api.configuration import ConfigOption
+
+
+def covert_j_option_to_python_option(j_option: Any) -> ConfigOption:
+    """Convert a Java config option to a Python config option."""
+    key = j_option.getKey()
+    default = j_option.getDefaultValue()
+    type_name = j_option.getTypeName()
+
+    if type_name == "java.lang.String":
+        config_type = str
+    elif type_name == "java.lang.Integer":
+        config_type = int
+    elif type_name == "java.lang.Long":
+        config_type = int
+    elif type_name == "java.lang.Boolean":
+        config_type = bool
+    elif type_name == "java.lang.Float":
+        config_type = float
+    elif type_name == "java.lang.Double":
+        config_type = float
+    else:
+        msg = f"Unsupported type: {type_name}"
+        raise TypeError(msg)
+
+    return ConfigOption(key, config_type, default)
+
+
+class AgentConfigOptionsMeta(type):
+    """Metaclass for FlinkAgentsCoreOptions."""
+    def __init__(cls, name: str, bases: tuple[type, ...], attrs: dict[str, 
Any]) -> None:
+        """Initialize the metaclass for FlinkAgentsCoreOptions."""
+        super().__init__(name, bases, attrs)
+
+        jvm = get_gateway().jvm
+        cls.jvm = jvm
+
+    def __getattr__(cls, item: str) -> ConfigOption:
+        j_option = getattr(
+            
cls.jvm.org.apache.flink.agents.api.configuration.AgentConfigOptions,
+            item,
+        )
+
+        python_option = covert_j_option_to_python_option(j_option)
+        return python_option
+
+
+class AgentConfigOptions(metaclass=AgentConfigOptionsMeta):
+    """CoreOptions to manage core configuration parameters for Flink Agents."""
diff --git a/python/flink_agents/api/execution_environment.py 
b/python/flink_agents/api/execution_environment.py
index e6d9efd..b2eef8c 100644
--- a/python/flink_agents/api/execution_environment.py
+++ b/python/flink_agents/api/execution_environment.py
@@ -24,6 +24,7 @@ from pyflink.datastream import DataStream, KeySelector, 
StreamExecutionEnvironme
 from pyflink.table import Schema, StreamTableEnvironment, Table
 
 from flink_agents.api.agent import Agent
+from flink_agents.api.configuration import Configuration
 
 
 class AgentBuilder(ABC):
@@ -108,6 +109,16 @@ class AgentsExecutionEnvironment(ABC):
                 "flink_agents.runtime.remote_execution_environment"
             ).create_instance(env=env, **kwargs)
 
+    @abstractmethod
+    def get_config(self, path: Optional[str] = None) -> Configuration:
+        """Get the writable configuration for flink agents.
+
+        Returns:
+        -------
+        WritableConfiguration
+            The configuration for flink agents.
+        """
+
     @abstractmethod
     def from_list(self, input: List[Dict[str, Any]]) -> AgentBuilder:
         """Set input for agents. Used for local execution.
diff --git a/python/flink_agents/api/runner_context.py 
b/python/flink_agents/api/runner_context.py
index 03e7954..c822a49 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/runner_context.py
@@ -18,6 +18,7 @@
 from abc import ABC, abstractmethod
 from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple
 
+from flink_agents.api.configuration import ReadableConfiguration
 from flink_agents.api.events.event import Event
 from flink_agents.api.metric_group import MetricGroup
 from flink_agents.api.resource import Resource, ResourceType
@@ -107,3 +108,13 @@ class RunnerContext(ABC):
         Any
             The result of the function.
         """
+
+    @abstractmethod
+    def get_config(self) -> ReadableConfiguration:
+        """Get the readable configuration for flink agents.
+
+        Returns:
+        -------
+        ReadableConfiguration
+            The configuration for flink agents.
+        """
diff --git a/python/flink_agents/plan/agent_plan.py 
b/python/flink_agents/plan/agent_plan.py
index 83f4ef8..c697fc3 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -24,6 +24,7 @@ from flink_agents.api.resource import Resource, ResourceType
 from flink_agents.plan.actions.action import Action
 from flink_agents.plan.actions.chat_model_action import CHAT_MODEL_ACTION
 from flink_agents.plan.actions.tool_call_action import TOOL_CALL_ACTION
+from flink_agents.plan.configuration import AgentConfiguration
 from flink_agents.plan.function import PythonFunction
 from flink_agents.plan.resource_provider import (
     JavaResourceProvider,
@@ -53,6 +54,7 @@ class AgentPlan(BaseModel):
     actions: Dict[str, Action]
     actions_by_event: Dict[str, List[str]]
     resource_providers: Optional[Dict[ResourceType, Dict[str, 
ResourceProvider]]] = None
+    config: Optional[AgentConfiguration] = None
     __resources: Dict[ResourceType, Dict[str, Resource]] = {}
 
     @field_serializer("resource_providers")
@@ -116,7 +118,7 @@ class AgentPlan(BaseModel):
         return self
 
     @staticmethod
-    def from_agent(agent: Agent) -> "AgentPlan":
+    def from_agent(agent: Agent, config: AgentConfiguration) -> "AgentPlan":
         """Build a AgentPlan from user defined agent."""
         actions = {}
         actions_by_event = {}
@@ -142,6 +144,7 @@ class AgentPlan(BaseModel):
             actions=actions,
             actions_by_event=actions_by_event,
             resource_providers=resource_providers,
+            config=config,
         )
 
     def get_actions(self, event_type: str) -> List[Action]:
@@ -173,7 +176,7 @@ class AgentPlan(BaseModel):
             self.__resources[type] = {}
         if name not in self.__resources[type]:
             resource_provider = self.resource_providers[type][name]
-            resource = 
resource_provider.provide(get_resource=self.get_resource)
+            resource = 
resource_provider.provide(get_resource=self.get_resource, config=self.config)
             self.__resources[type][name] = resource
         return self.__resources[type][name]
 
diff --git a/python/flink_agents/plan/configuration.py 
b/python/flink_agents/plan/configuration.py
new file mode 100644
index 0000000..b327c13
--- /dev/null
+++ b/python/flink_agents/plan/configuration.py
@@ -0,0 +1,173 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from pathlib import Path
+from typing import Any, Dict, Optional, Type
+
+import yaml
+from pydantic import BaseModel
+from typing_extensions import override
+
+from flink_agents.api.configuration import (
+    ConfigOption,
+    Configuration,
+)
+
+
+def flatten_dict(d: Dict, parent_key: str = '', sep: str = '.') -> Dict[str, 
Any]:
+    """Flatten a nested dictionary into a single-level dictionary.
+
+    This function recursively traverses the dictionary, converting multi-level
+    nested key-value pairs into a single-level structure, where nested levels
+    are represented by joining key names with the specified separator.
+
+    Args:
+        d (Dict): The nested dictionary to be flattened
+        parent_key (str): The parent key name, used in recursion to track the
+                         upper-level key path. Defaults to an empty string.
+        sep (str): The separator used to join parent and child keys.
+                  Defaults to dot ('.').
+
+    Returns:
+        Dict[str, Any]: A flattened single-level dictionary where keys from
+                       the original nested structure are joined with the 
separator
+    """
+    items = {}
+    for k, v in d.items():
+        new_key = f"{parent_key}{sep}{k}" if parent_key else k
+        if isinstance(v, dict):
+            items.update(flatten_dict(v, new_key, sep=sep))
+        else:
+            items[new_key] = v
+    return items
+
+class AgentConfiguration(BaseModel, Configuration):
+    """Base class for config objects in the system.
+    Provides a flat dict interface to access nested config values.
+    """
+
+    conf_data: Dict[str, Any]
+
+    def __init__(self, conf_data: Optional[Dict[str, Any]] = None) -> None:
+        """Initialize with optional configuration data."""
+        if conf_data is None:
+            super().__init__(conf_data = {})
+        else:
+            super().__init__(conf_data = conf_data)
+
+    def get_value_with_type(self, key: str, config_type: Type[Any], default: 
Any) -> Any:
+        """Helper method for all the get_xxx functions to avoid duplicate code.
+
+        Args:
+            key: The configuration key to retrieve
+            config_type: The expected type of the configuration value (int, 
float, str,
+            or bool)
+            default: The default value to return if key is not found
+
+        Returns:
+            The value associated with the key or the default value
+        """
+        value = self.conf_data.get(key)
+        if value is None:
+            return default
+
+        try:
+            return config_type(value)
+        except (ValueError, TypeError) as e:
+            msg = f"Invalid value for {key}: {value}"
+            raise ValueError(msg) from e
+
+    @override
+    def get_int(self, key: str, default: Optional[int]=None) -> int:
+        return self.get_value_with_type(key, int, default)
+
+    @override
+    def get_float(self, key: str, default: Optional[float]=None) -> float:
+        return self.get_value_with_type(key, float, default)
+
+    @override
+    def get_bool(self, key: str, default: Optional[bool]=None) -> bool:
+        return self.get_value_with_type(key, bool, default)
+
+    @override
+    def get_str(self, key: str, default: Optional[str]=None) -> str:
+        return self.get_value_with_type(key, str, default)
+
+    @override
+    def get(self, option: ConfigOption) -> Any:
+        return self.get_value_with_type(option.get_key(), option.get_type(), 
option.get_default_value())
+
+    @override
+    def set_str(self, key: str, value: str) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set_int(self, key: str, value: int) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set_float(self, key: str, value: float) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set_bool(self, key: str, value: bool) -> None:
+        self.conf_data[key] = value
+
+    @override
+    def set(self, option: ConfigOption, value: Any) -> None:
+        self.conf_data[option.get_key()] = value
+
+    def load_from_file(self, config_path: Optional[str] = None) -> None:
+        """Load configuration from a YAML file and update current 
configuration data.
+
+        Args:
+            config_path (str, optional): Path to the configuration file.
+        """
+        if config_path:
+            path = Path(config_path)
+            with path.open() as f:
+                raw_config = yaml.safe_load(f)
+                self.conf_data.update(flatten_dict(raw_config.get('agent', 
{})))
+
+    def get_conf_data(self) -> dict:
+        """Get the configuration data dictionary.
+
+        Returns:
+            dict: A dictionary containing all configuration items
+        """
+        return self.conf_data
+
+    def get_config_data_by_prefix(self, prefix: str) -> dict:
+        """Extract configuration items for a specific prefix from the 
configuration
+        data.
+
+        Parameters:
+            prefix: The prefix for the key of configuration items.
+
+        Returns:
+            dict: A dictionary contains the configuration items for the 
specified
+            key with the prefix. The keys are the configuration item names with
+            the prefix removed, and the values are the corresponding 
configuration
+            values.
+        """
+        prefix = f"{prefix}."
+        result = {}
+        for key, value in self.conf_data.items():
+            if key.startswith(prefix):
+                sub_key = key[len(prefix) :]
+                result[sub_key] = value
+        return result
diff --git a/python/flink_agents/plan/resource_provider.py 
b/python/flink_agents/plan/resource_provider.py
index fbcaa2d..61a15dc 100644
--- a/python/flink_agents/plan/resource_provider.py
+++ b/python/flink_agents/plan/resource_provider.py
@@ -27,6 +27,7 @@ from flink_agents.api.resource import (
     ResourceType,
     SerializableResource,
 )
+from flink_agents.plan.configuration import AgentConfiguration
 
 
 class ResourceProvider(BaseModel, ABC):
@@ -45,13 +46,16 @@ class ResourceProvider(BaseModel, ABC):
     type: ResourceType
 
     @abstractmethod
-    def provide(self, get_resource: Callable) -> Resource:
+    def provide(self, get_resource: Callable, config: AgentConfiguration) -> 
Resource:
         """Create resource in runtime.
 
         Parameters
         ----------
         get_resource : Callable
             The helper function to get other resource declared in the same 
Agent.
+
+        config : AgentConfiguration
+            Configuration for Flink Agents.
         """
 
 
@@ -88,11 +92,21 @@ class PythonResourceProvider(ResourceProvider):
     clazz: str
     kwargs: Dict[str, Any]
 
-    def provide(self, get_resource: Callable) -> Resource:
+    def provide(self, get_resource: Callable, config: AgentConfiguration) -> 
Resource:
         """Create resource in runtime."""
         module = importlib.import_module(self.module)
         cls = getattr(module, self.clazz)
-        resource = cls(**self.kwargs, get_resource=get_resource)
+
+        final_kwargs = {}
+
+        resource_class_config = config.get_config_data_by_prefix(self.clazz)
+        resource_config = config.get_config_data_by_prefix(self.name)
+
+        final_kwargs.update(self.kwargs)
+        final_kwargs.update(resource_class_config)
+        final_kwargs.update(resource_config)
+
+        resource = cls(**final_kwargs, get_resource=get_resource)
         return resource
 
 
@@ -124,7 +138,7 @@ class 
PythonSerializableResourceProvider(SerializableResourceProvider):
             resource=resource,
         )
 
-    def provide(self, get_resource: Callable) -> Resource:
+    def provide(self, get_resource: Callable, config: AgentConfiguration) -> 
Resource:
         """Get or deserialize resource in runtime."""
         if self.resource is None:
             module = importlib.import_module(self.module)
@@ -140,7 +154,7 @@ class JavaResourceProvider(ResourceProvider):
     Currently, this class only used for deserializing Java agent plan json
     """
 
-    def provide(self, get_resource: Callable) -> Resource:
+    def provide(self, get_resource: Callable, config: AgentConfiguration) -> 
Resource:
         """Create resource in runtime."""
         err_msg = (
             "Currently, flink-agents doesn't support create resource "
@@ -156,7 +170,7 @@ class 
JavaSerializableResourceProvider(SerializableResourceProvider):
     Currently, this class only used for deserializing Java agent plan json
     """
 
-    def provide(self, get_resource: Callable) -> Resource:
+    def provide(self, get_resource: Callable, config: AgentConfiguration) -> 
Resource:
         """Get or deserialize resource in runtime."""
         err_msg = (
             "Currently, flink-agents doesn't support create resource "
diff --git 
a/python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py 
b/python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py
similarity index 50%
copy from 
python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py
copy to 
python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py
index cde375c..3aa094b 100644
--- a/python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py
+++ 
b/python/flink_agents/plan/tests/compatibility/create_python_option_from_java_option.py
@@ -15,21 +15,28 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-import sys
 from pathlib import Path
 
-from flink_agents.plan.agent_plan import AgentPlan
-from 
flink_agents.plan.tests.compatibility.python_agent_plan_compatibility_test_agent
 import (
-    PythonAgentPlanCompatibilityTestAgent,
-)
+from pyflink.util.java_utils import add_jars_to_context_class_loader
 
-# The agent plan json will be checked by
-# flink-agents/plan/src/test/java/org/apache/flink/agents/plan
-# /compatibility/GenerateAgentPlanJson.java
-# correspond modification should be applied to it when modify this file.
+from flink_agents.api.core_options import AgentConfigOptions
+
+# This script is used to verify that Java-defined configuration options
+# (e.g., AgentConfigOptions) are correctly exposed and accessible in the
+# Python environment via the JAR file. It loads a Java JAR into the Python
+# context and performs basic assertions on the configuration keys, types,
+# and default values to ensure compatibility between Java and Python layers.
+#
+# The JAR file path is relative to this script and should be updated if
+# the build structure changes.
 if __name__ == "__main__":
-    json_path = sys.argv[1]
-    agent_plan = AgentPlan.from_agent(PythonAgentPlanCompatibilityTestAgent())
-    json_value = agent_plan.model_dump_json(serialize_as_any=True, indent=4)
-    with Path(json_path).open("w") as f:
-        f.write(json_value)
+    current_dir = Path(__file__).parent
+    add_jars_to_context_class_loader(
+        [
+            
f"file:///{current_dir}/../../../../../api/target/flink-agents-api-0.1-SNAPSHOT.jar"
+        ]
+    )
+
+    assert AgentConfigOptions.BASE_LOG_DIR.get_key() == "baseLogDir"
+    assert AgentConfigOptions.BASE_LOG_DIR.get_type() is str
+    assert AgentConfigOptions.BASE_LOG_DIR.get_default_value() is None
diff --git 
a/python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py 
b/python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py
index cde375c..24ad9cf 100644
--- a/python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py
+++ b/python/flink_agents/plan/tests/compatibility/generate_agent_plan_json.py
@@ -19,6 +19,7 @@ import sys
 from pathlib import Path
 
 from flink_agents.plan.agent_plan import AgentPlan
+from flink_agents.plan.configuration import AgentConfiguration
 from 
flink_agents.plan.tests.compatibility.python_agent_plan_compatibility_test_agent
 import (
     PythonAgentPlanCompatibilityTestAgent,
 )
@@ -29,7 +30,7 @@ from 
flink_agents.plan.tests.compatibility.python_agent_plan_compatibility_test_
 # correspond modification should be applied to it when modify this file.
 if __name__ == "__main__":
     json_path = sys.argv[1]
-    agent_plan = AgentPlan.from_agent(PythonAgentPlanCompatibilityTestAgent())
+    agent_plan = AgentPlan.from_agent(PythonAgentPlanCompatibilityTestAgent(), 
AgentConfiguration())
     json_value = agent_plan.model_dump_json(serialize_as_any=True, indent=4)
     with Path(json_path).open("w") as f:
         f.write(json_value)
diff --git a/python/flink_agents/plan/tests/resources/agent_plan.json 
b/python/flink_agents/plan/tests/resources/agent_plan.json
index 86780bd..dd2a382 100644
--- a/python/flink_agents/plan/tests/resources/agent_plan.json
+++ b/python/flink_agents/plan/tests/resources/agent_plan.json
@@ -81,5 +81,10 @@
                 "__resource_provider_type__": "PythonResourceProvider"
             }
         }
+    },
+    "config": {
+        "conf_data": {
+            "mock.key": "mock.value"
+        }
     }
 }
\ No newline at end of file
diff --git a/python/flink_agents/plan/tests/test_agent_plan.py 
b/python/flink_agents/plan/tests/test_agent_plan.py
index fa8cdff..20d61d8 100644
--- a/python/flink_agents/plan/tests/test_agent_plan.py
+++ b/python/flink_agents/plan/tests/test_agent_plan.py
@@ -29,6 +29,7 @@ from flink_agents.api.events.event import Event, InputEvent, 
OutputEvent
 from flink_agents.api.resource import Resource, ResourceType
 from flink_agents.api.runner_context import RunnerContext
 from flink_agents.plan.agent_plan import AgentPlan
+from flink_agents.plan.configuration import AgentConfiguration
 from flink_agents.plan.function import PythonFunction
 
 
@@ -43,7 +44,7 @@ class AgentForTest(Agent):  # noqa D101
 
 def test_from_agent():  # noqa D102
     agent = AgentForTest()
-    agent_plan = AgentPlan.from_agent(agent)
+    agent_plan = AgentPlan.from_agent(agent, AgentConfiguration())
     event_type = f"{InputEvent.__module__}.{InputEvent.__name__}"
     actions = agent_plan.get_actions(event_type)
     assert len(actions) == 1
@@ -66,7 +67,7 @@ class InvalidAgent(Agent):  # noqa D101
 def test_to_agent_invalid_signature() -> None:  # noqa D103
     agent = InvalidAgent()
     with pytest.raises(TypeError):
-        AgentPlan.from_agent(agent)
+        AgentPlan.from_agent(agent, AgentConfiguration())
 
 
 class MyEvent(Event):
@@ -116,7 +117,7 @@ class MyAgent(Agent):  # noqa: D101
 
 @pytest.fixture(scope="module")
 def agent_plan() -> AgentPlan:  # noqa: D103
-    return AgentPlan.from_agent(MyAgent())
+    return AgentPlan.from_agent(MyAgent(), AgentConfiguration({"mock.key": 
"mock.value"}))
 
 
 current_dir = Path(__file__).parent
@@ -139,7 +140,7 @@ def test_agent_plan_deserialize(agent_plan: AgentPlan) -> 
None:  # noqa: D103
 
 
 def test_get_resource() -> None:  # noqa: D103
-    agent_plan = AgentPlan.from_agent(MyAgent())
+    agent_plan = AgentPlan.from_agent(MyAgent(), AgentConfiguration())
     mock = agent_plan.get_resource("mock", ResourceType.CHAT_MODEL)
     assert (
         mock.chat(ChatMessage(role=MessageRole.USER, content="")).content
@@ -162,7 +163,7 @@ def test_add_action_and_resource_to_agent() -> None:  # 
noqa: D103
         desc="mock resource just for testing.",
         connection="mock",
     )
-    agent_plan = AgentPlan.from_agent(my_agent)
+    agent_plan = AgentPlan.from_agent(my_agent, 
AgentConfiguration({"mock.key": "mock.value"}))
     json_value = agent_plan.model_dump_json(serialize_as_any=True, indent=4)
     with Path.open(Path(f"{current_dir}/resources/agent_plan.json")) as f:
         expected_json = f.read()
diff --git a/python/flink_agents/plan/tests/test_configuration.py 
b/python/flink_agents/plan/tests/test_configuration.py
new file mode 100644
index 0000000..7a874ed
--- /dev/null
+++ b/python/flink_agents/plan/tests/test_configuration.py
@@ -0,0 +1,228 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import tempfile
+from pathlib import Path
+
+import pytest
+import yaml
+
+from flink_agents.api.configuration import ConfigOption
+from flink_agents.plan.configuration import AgentConfiguration
+
+
+def test_load_configuration_from_file() -> None:
+    """Test loading configuration from a YAML file."""
+    # Create a temporary YAML file with test data
+    test_data = {
+        "agent": {
+            "database": {
+                "host": "localhost",
+                "port": 5432,
+                "credentials": {"username": "admin", "password": "secret"},
+            },
+            "api": {"endpoint": "/api/v1", "timeout": 30.0},
+            "debug": True,
+        }
+    }
+
+    with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) 
as f:
+        yaml.dump(test_data, f)
+        config_file = f.name
+
+    try:
+        # Load configuration
+        config = AgentConfiguration()
+        config.load_from_file(config_file)
+
+        # Test that nested configuration is properly flattened
+        assert config.get_str('database.host') == 'localhost'
+        assert config.get_int('database.port') == 5432
+        assert config.get_str('database.credentials.username') == 'admin'
+        assert config.get_str('database.credentials.password') == 'secret'
+        assert config.get_str('api.endpoint') == '/api/v1'
+        assert config.get_float('api.timeout') == 30.0
+        assert config.get_bool('debug') is True
+    finally:
+        config_file = Path(config_file)
+        config_file.unlink()
+
+
+def test_load_configuration_without_path() -> None:
+    """Test loading configuration without a path (should not change 
_conf_data)."""
+    config = AgentConfiguration()
+    # Initially _conf_data should be empty or unchanged
+    original_conf_data = config.get_conf_data().copy()
+    config.load_from_file(None)
+    # _conf_data should remain unchanged
+    assert config.get_conf_data() == original_conf_data
+
+
+def test_load_configuration_with_invalid_file() -> None:
+    """Test loading configuration with a non-existent file."""
+    config = AgentConfiguration()
+    with pytest.raises(FileNotFoundError):
+        config.load_from_file('/path/to/nonexistent/file.yaml')
+
+
+def test_load_configuration_with_invalid_yaml() -> None:
+    """Test loading configuration with invalid YAML content."""
+    with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) 
as f:
+        f.write('invalid: yaml: content: [')
+        config_file = f.name
+
+    try:
+        config = AgentConfiguration()
+        with pytest.raises(yaml.YAMLError):
+            config.load_from_file(config_file)
+    finally:
+        config_file = Path(config_file)
+        config_file.unlink()
+
+
+def test_get_int() -> None:
+    """Test get_int method with various inputs."""
+    config = AgentConfiguration({'int_key': 42, 'str_key': '123', 
'invalid_key': 'not_an_int'})
+
+    # Test normal integer value
+    assert config.get_int('int_key') == 42
+
+    # Test string that can be converted to int
+    assert config.get_int('str_key') == 123
+
+    # Test default value when key is not found
+    assert config.get_int('missing_key', 999) == 999
+
+    # Test default value when no default specified
+    assert config.get_int('missing_key') is None
+
+    # Test invalid value that cannot be converted to int
+    with pytest.raises(ValueError, match="Invalid value for invalid_key: 
not_an_int"):
+        config.get_int('invalid_key')
+
+
+def test_get_float() -> None:
+    """Test get_float method with various inputs."""
+    config = AgentConfiguration({'float_key': 3.14, 'int_key': 42, 'str_key': 
'2.5', 'invalid_key': 'not_a_float'})
+
+    # Test normal float value
+    assert config.get_float('float_key') == 3.14
+
+    # Test int value converted to float
+    assert config.get_float('int_key') == 42.0
+
+    # Test string that can be converted to float
+    assert config.get_float('str_key') == 2.5
+
+    # Test default value when key is not found
+    assert config.get_float('missing_key', 1.23) == 1.23
+
+    # Test default value when no default specified
+    assert config.get_float('missing_key') is None
+
+    # Test invalid value that cannot be converted to float
+    with pytest.raises(ValueError, match="Invalid value for invalid_key: 
not_a_float"):
+        config.get_float('invalid_key')
+
+
+def test_get_bool() -> None:
+    """Test get_bool method with various inputs."""
+    config = AgentConfiguration({'bool_key': True, 'false_key': False, 
'str_key': 'true'})
+
+    # Test normal boolean values
+    assert config.get_bool('bool_key') is True
+    assert config.get_bool('false_key') is False
+
+    # Test default value when key is not found
+    assert config.get_bool('missing_key', True) is True
+
+    # Test default value when no default specified
+    assert config.get_bool('missing_key') is None
+
+    # Note: bool() in Python behaves differently than might be expected
+    # bool('true') is True, but that's Python behavior, not a bug in our code
+    assert config.get_bool('str_key') is True
+
+
+def test_get_str() -> None:
+    """Test get_str method with various inputs."""
+    config = AgentConfiguration({'str_key': 'hello', 'int_key': 42, 
'float_key': 3.14})
+
+    # Test normal string value
+    assert config.get_str('str_key') == 'hello'
+
+    # Test int value converted to string
+    assert config.get_str('int_key') == '42'
+
+    # Test float value converted to string
+    assert config.get_str('float_key') == '3.14'
+
+    # Test default value when key is not found
+    assert config.get_str('missing_key', 'default') == 'default'
+
+    # Test default value when no default specified
+    assert config.get_str('missing_key') is None
+
+    # Test None value
+    assert config.get_str('none_key') is None
+
+def test_get_with_config_option() -> None:  # noqa: D103
+    data = {
+        "config.str": "config.value",
+        "config.int": 6789,
+        "config.float": "45.5",
+        "config.boolean": True,
+    }
+
+    config = AgentConfiguration(data)
+
+    str_option = ConfigOption("config.str", str, "default_str")
+    int_option = ConfigOption("config.int", int, 123)
+    float_option = ConfigOption("config.float", float, 0.0)
+    bool_option = ConfigOption("config.boolean", bool, False)
+
+    assert config.get(str_option) == "config.value"
+    assert config.get(int_option) == 6789
+    assert config.get(float_option) == 45.5
+    assert config.get(bool_option) is True
+
+    missing_option = ConfigOption("missing.key1", int, 22)
+    assert config.get(missing_option) == 22
+
+    missing_key = ConfigOption("missing.key2", int, None)
+    assert config.get(missing_key) is None
+
+
+def test_get_with_default_value() -> None:  # noqa: D103
+    default_str = ConfigOption("default.str", str, "default_value")
+    default_int = ConfigOption("default.int", int, 100)
+    default_double = ConfigOption("default.double", float, 2.5)
+
+    config = AgentConfiguration()
+
+    assert config.get(default_str) == "default_value"
+    assert config.get(default_int) == 100
+    assert config.get(default_double) == 2.5
+
+
+def test_get_with_null_and_default() -> None:  # noqa: D103
+    nullable_str = ConfigOption("nullable.str", str, "default")
+
+    config = AgentConfiguration()
+    config.set_str("nullable.str", None)
+
+    assert config.get(nullable_str) == "default"
diff --git a/python/flink_agents/runtime/flink_runner_context.py 
b/python/flink_agents/runtime/flink_runner_context.py
index 33ffe64..f15d9d9 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -22,6 +22,7 @@ from typing import Any, Callable, Dict, Tuple
 import cloudpickle
 from typing_extensions import override
 
+from flink_agents.api.configuration import ReadableConfiguration
 from flink_agents.api.events.event import Event
 from flink_agents.api.resource import Resource, ResourceType
 from flink_agents.api.runner_context import RunnerContext
@@ -129,6 +130,17 @@ class FlinkRunnerContext(RunnerContext):
             yield
         return future.result()
 
+    @override
+    def get_config(self) -> ReadableConfiguration:
+        """Get the readable configuration for flink agents.
+
+        Returns:
+        -------
+        ReadableConfiguration
+            The configuration for flink agents.
+        """
+        return self.__agent_plan.config
+
 
 def create_flink_runner_context(
     j_runner_context: Any, agent_plan_json: str, executor: ThreadPoolExecutor
diff --git a/python/flink_agents/runtime/local_execution_environment.py 
b/python/flink_agents/runtime/local_execution_environment.py
index 395ff5a..aa5605f 100644
--- a/python/flink_agents/runtime/local_execution_environment.py
+++ b/python/flink_agents/runtime/local_execution_environment.py
@@ -15,7 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Optional
 
 from pyflink.common import TypeInformation
 from pyflink.datastream import DataStream, KeySelector, 
StreamExecutionEnvironment
@@ -26,6 +26,7 @@ from flink_agents.api.execution_environment import (
     AgentBuilder,
     AgentsExecutionEnvironment,
 )
+from flink_agents.plan.configuration import AgentConfiguration
 from flink_agents.runtime.local_runner import LocalRunner
 
 
@@ -37,15 +38,16 @@ class LocalAgentBuilder(AgentBuilder):
     __output: List[Any]
     __runner: LocalRunner = None
     __executed: bool = False
+    __config: AgentConfiguration
 
     def __init__(
-        self, env: "LocalExecutionEnvironment", input: List[Dict[str, Any]]
+        self, env: "LocalExecutionEnvironment", input: List[Dict[str, Any]], 
config: AgentConfiguration
     ) -> None:
         """Init empty output list."""
         self.__env = env
         self.__input = input
         self.__output = []
-
+        self.__config = config
     def apply(self, agent: Agent) -> AgentBuilder:
         """Create local runner to execute given agent.
 
@@ -54,7 +56,7 @@ class LocalAgentBuilder(AgentBuilder):
         if self.__runner is not None:
             err_msg = "LocalAgentBuilder doesn't support apply multiple 
agents."
             raise RuntimeError(err_msg)
-        self.__runner = LocalRunner(agent)
+        self.__runner = LocalRunner(agent, self.__config)
         self.__env.set_agent(self.__input, self.__output, self.__runner)
         return self
 
@@ -86,6 +88,13 @@ class LocalExecutionEnvironment(AgentsExecutionEnvironment):
     __output: List[Any] = None
     __runner: LocalRunner = None
     __executed: bool = False
+    __config: AgentConfiguration = AgentConfiguration()
+
+    def get_config(self, path: Optional[str] = None) -> AgentConfiguration:
+        """Get configuration of execution environment."""
+        if path is not None:
+            self.__config.load_from_file(path)
+        return self.__config
 
     def from_list(self, input: list) -> LocalAgentBuilder:
         """Set input list of execution environment."""
@@ -94,7 +103,7 @@ class LocalExecutionEnvironment(AgentsExecutionEnvironment):
             raise RuntimeError(err_msg)
 
         self.__input = input
-        return LocalAgentBuilder(env=self, input=input)
+        return LocalAgentBuilder(env=self, input=input, config=self.__config)
 
     def set_agent(self, input: list, output: list, runner: LocalRunner) -> 
None:
         """Set agent input, output and runner."""
diff --git a/python/flink_agents/runtime/local_runner.py 
b/python/flink_agents/runtime/local_runner.py
index bfca55f..a9ada75 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -29,6 +29,7 @@ from flink_agents.api.metric_group import MetricGroup
 from flink_agents.api.resource import Resource, ResourceType
 from flink_agents.api.runner_context import RunnerContext
 from flink_agents.plan.agent_plan import AgentPlan
+from flink_agents.plan.configuration import AgentConfiguration
 from flink_agents.runtime.agent_runner import AgentRunner
 from flink_agents.runtime.local_memory_object import LocalMemoryObject
 
@@ -58,8 +59,9 @@ class LocalRunnerContext(RunnerContext):
     events: deque[Event]
     _store: dict[str, Any]
     _short_term_memory: MemoryObject
+    _config: AgentConfiguration
 
-    def __init__(self, agent_plan: AgentPlan, key: Any) -> None:
+    def __init__(self, agent_plan: AgentPlan, key: Any, config: 
AgentConfiguration) -> None:
         """Initialize a new context with the given agent and key.
 
         Parameters
@@ -77,6 +79,7 @@ class LocalRunnerContext(RunnerContext):
         self._short_term_memory = LocalMemoryObject(
             self._store, LocalMemoryObject.ROOT_KEY
         )
+        self._config = config
 
     @property
     def key(self) -> Any:
@@ -144,6 +147,10 @@ class LocalRunnerContext(RunnerContext):
         yield
         return func_result
 
+    @override
+    def get_config(self) -> AgentConfiguration:
+        return self._config
+
 
 class LocalRunner(AgentRunner):
     """Agent runner implementation for local execution, which is
@@ -157,13 +164,16 @@ class LocalRunner(AgentRunner):
         Dictionary of active contexts indexed by key.
     __outputs:
         Outputs generated by agent execution.
+    __config:
+        Internal configration.
     """
 
     __agent_plan: AgentPlan
     __keyed_contexts: Dict[Any, LocalRunnerContext]
     __outputs: List[Dict[str, Any]]
+    __config: AgentConfiguration
 
-    def __init__(self, agent: Agent) -> None:
+    def __init__(self, agent: Agent, config: AgentConfiguration) -> None:
         """Initialize the runner with the provided agent.
 
         Parameters
@@ -171,9 +181,10 @@ class LocalRunner(AgentRunner):
         agent : Agent
             The agent class to convert and run.
         """
-        self.__agent_plan = AgentPlan.from_agent(agent)
+        self.__agent_plan = AgentPlan.from_agent(agent, config)
         self.__keyed_contexts = {}
         self.__outputs = []
+        self.__config = config
 
     @override
     def run(self, **data: Dict[str, Any]) -> Any:
@@ -197,7 +208,7 @@ class LocalRunner(AgentRunner):
             key = uuid.uuid4()
 
         if key not in self.__keyed_contexts:
-            self.__keyed_contexts[key] = LocalRunnerContext(self.__agent_plan, 
key)
+            self.__keyed_contexts[key] = LocalRunnerContext(self.__agent_plan, 
key, self.__config)
         context = self.__keyed_contexts[key]
 
         if "value" in data:
diff --git a/python/flink_agents/runtime/remote_execution_environment.py 
b/python/flink_agents/runtime/remote_execution_environment.py
index 8b8c0fe..6faf3b0 100644
--- a/python/flink_agents/runtime/remote_execution_environment.py
+++ b/python/flink_agents/runtime/remote_execution_environment.py
@@ -15,6 +15,8 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
+import os
+from pathlib import Path
 from typing import Any, Dict, List, Optional
 
 import cloudpickle
@@ -37,6 +39,7 @@ from flink_agents.api.execution_environment import (
     AgentsExecutionEnvironment,
 )
 from flink_agents.plan.agent_plan import AgentPlan
+from flink_agents.plan.configuration import AgentConfiguration
 
 
 class RemoteAgentBuilder(AgentBuilder):
@@ -46,13 +49,15 @@ class RemoteAgentBuilder(AgentBuilder):
     __agent_plan: AgentPlan = None
     __output: DataStream = None
     __t_env: StreamTableEnvironment
+    __config: AgentConfiguration
 
     def __init__(
-        self, input: DataStream, t_env: Optional[StreamTableEnvironment] = None
+        self, input: DataStream, config: AgentConfiguration, t_env: 
Optional[StreamTableEnvironment] = None
     ) -> None:
         """Init method of RemoteAgentBuilder."""
         self.__input = input
         self.__t_env = t_env
+        self.__config = config
 
     def apply(self, agent: Agent) -> "AgentBuilder":
         """Set agent of execution environment.
@@ -65,7 +70,7 @@ class RemoteAgentBuilder(AgentBuilder):
         if self.__agent_plan is not None:
             err_msg = "RemoteAgentBuilder doesn't support apply multiple 
agents yet."
             raise RuntimeError(err_msg)
-        self.__agent_plan = AgentPlan.from_agent(agent)
+        self.__agent_plan = AgentPlan.from_agent(agent, self.__config)
         return self
 
     def to_datastream(
@@ -133,10 +138,26 @@ class 
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
     """Implementation of AgentsExecutionEnvironment for execution with 
DataStream."""
 
     __env: StreamExecutionEnvironment
+    __config: AgentConfiguration
 
     def __init__(self, env: StreamExecutionEnvironment) -> None:
         """Init method of RemoteExecutionEnvironment."""
         self.__env = env
+        self.__config = AgentConfiguration()
+        flink_conf_dir = os.environ.get("FLINK_CONF_DIR")
+        if flink_conf_dir is not None:
+            config_dir = Path(flink_conf_dir) / "config.yaml"
+            self.__config.load_from_file(str(config_dir))
+
+    def get_config(self, path: Optional[str] = None) -> AgentConfiguration:
+        """Get the writable configuration for flink agents.
+
+        Returns:
+        -------
+        LocalConfiguration
+            The configuration for flink agents.
+        """
+        return self.__config
 
     @staticmethod
     def __process_input_datastream(
@@ -166,7 +187,7 @@ class 
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
         """
         input = self.__process_input_datastream(input, key_selector)
 
-        return RemoteAgentBuilder(input=input)
+        return RemoteAgentBuilder(input=input, config=self.__config)
 
     def from_table(
         self,
@@ -190,7 +211,7 @@ class 
RemoteExecutionEnvironment(AgentsExecutionEnvironment):
         input = input.map(lambda x: x, output_type=PickledBytesTypeInfo())
 
         input = self.__process_input_datastream(input, key_selector)
-        return RemoteAgentBuilder(input=input, t_env=t_env)
+        return RemoteAgentBuilder(input=input, config=self.__config, 
t_env=t_env)
 
     def from_list(self, input: List[Dict[str, Any]]) -> 
"AgentsExecutionEnvironment":
         """Set input list of agent execution.
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 8ba7b83..a586a00 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -46,6 +46,7 @@ dependencies = [
     "apache-flink==1.20.1",
     "pydantic==2.11.4",
     "docstring-parser==0.16",
+    "pyyaml==6.0.2",
     #TODO: Seperate integration dependencies from project
     "ollama==0.4.8",
     "dashscope~=1.24.2",
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index 41644d5..a6c4e7a 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -18,6 +18,7 @@
 package org.apache.flink.agents.runtime.context;
 
 import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.configuration.ReadableConfiguration;
 import org.apache.flink.agents.api.context.MemoryObject;
 import org.apache.flink.agents.api.context.RunnerContext;
 import org.apache.flink.agents.api.resource.Resource;
@@ -109,4 +110,9 @@ public class RunnerContextImpl implements RunnerContext {
         }
         return agentPlan.getResource(name, type);
     }
+
+    @Override
+    public ReadableConfiguration getConfig() {
+        return agentPlan.getConfig();
+    }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java
index 424c2fd..f4f9e6a 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java
@@ -21,15 +21,20 @@ package org.apache.flink.agents.runtime.env;
 import org.apache.flink.agents.api.Agent;
 import org.apache.flink.agents.api.AgentBuilder;
 import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.agents.plan.AgentConfiguration;
 import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.runtime.CompileUtils;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.YamlParserUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
+import java.io.File;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -43,8 +48,19 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
 
     private final StreamExecutionEnvironment env;
 
+    private final AgentConfiguration config;
+
+    public static final String FLINK_CONF_FILENAME = "config.yaml";
+
     public RemoteExecutionEnvironment(StreamExecutionEnvironment env) {
         this.env = env;
+        final String configDir = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+        this.config = loadAgentConfiguration(configDir);
+    }
+
+    @Override
+    public AgentConfiguration getConfig() {
+        return config;
     }
 
     @Override
@@ -55,13 +71,13 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
 
     @Override
     public <T, K> AgentBuilder fromDataStream(DataStream<T> input, 
KeySelector<T, K> keySelector) {
-        return new RemoteAgentBuilder<>(input, keySelector, env);
+        return new RemoteAgentBuilder<>(input, keySelector, env, config);
     }
 
     @Override
     public <K> AgentBuilder fromTable(
             Table input, StreamTableEnvironment tableEnv, KeySelector<Object, 
K> keySelector) {
-        return new RemoteAgentBuilder<>(input, tableEnv, keySelector, env);
+        return new RemoteAgentBuilder<>(input, tableEnv, keySelector, env, 
config);
     }
 
     @Override
@@ -69,6 +85,24 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
         env.execute();
     }
 
+    @SuppressWarnings("unchecked")
+    public static AgentConfiguration loadAgentConfiguration(String configDir) {
+        try {
+            if (configDir == null) {
+                return new AgentConfiguration();
+            }
+            final Map<String, Object> configData =
+                    (Map<String, Object>)
+                            YamlParserUtils.loadYamlFile(new File(configDir, 
FLINK_CONF_FILENAME))
+                                    .getOrDefault("agent", new HashMap<>());
+
+            return new AgentConfiguration(configData);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to load Flink Agents configuration from " + 
configDir, e);
+        }
+    }
+
     /** Implementation of AgentBuilder for remote execution environment. */
     private static class RemoteAgentBuilder<T, K> implements AgentBuilder {
 
@@ -76,6 +110,7 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
         private final KeySelector<T, K> keySelector;
         private final StreamExecutionEnvironment env;
         private final StreamTableEnvironment tableEnv;
+        private final AgentConfiguration config;
 
         private AgentPlan agentPlan;
         private DataStream<Object> outputDataStream;
@@ -84,11 +119,13 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
         public RemoteAgentBuilder(
                 DataStream<T> inputDataStream,
                 KeySelector<T, K> keySelector,
-                StreamExecutionEnvironment env) {
+                StreamExecutionEnvironment env,
+                AgentConfiguration config) {
             this.inputDataStream = inputDataStream;
             this.keySelector = keySelector;
             this.env = env;
             this.tableEnv = null;
+            this.config = config;
         }
 
         // Constructor for Table input
@@ -97,17 +134,19 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
                 Table inputTable,
                 StreamTableEnvironment tableEnv,
                 KeySelector<Object, K> keySelector,
-                StreamExecutionEnvironment env) {
+                StreamExecutionEnvironment env,
+                AgentConfiguration config) {
             this.inputDataStream = (DataStream<T>) 
tableEnv.toDataStream(inputTable);
             this.keySelector = (KeySelector<T, K>) keySelector;
             this.env = env;
             this.tableEnv = tableEnv;
+            this.config = config;
         }
 
         @Override
         public AgentBuilder apply(Agent agent) {
             try {
-                this.agentPlan = new AgentPlan(agent);
+                this.agentPlan = new AgentPlan(agent, config);
                 return this;
             } catch (Exception e) {
                 throw new RuntimeException("Failed to create agent plan from 
agent", e);
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironmentTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironmentTest.java
new file mode 100644
index 0000000..4784a28
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironmentTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.env;
+
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.util.UUID;
+
+import static 
org.apache.flink.agents.runtime.env.RemoteExecutionEnvironment.FLINK_CONF_FILENAME;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class RemoteExecutionEnvironmentTest {
+    @TempDir private File tmpDir;
+
+    @Test
+    void testLoadAgentConfiguration() throws FileNotFoundException {
+        File confFile = new File(tmpDir, FLINK_CONF_FILENAME);
+
+        try (final PrintWriter pw = new PrintWriter(confFile)) {
+            pw.println("agent:");
+            pw.println("    key1: ");
+            pw.println("        key2: v1");
+            pw.println("        key3: 'v2'");
+            pw.println("    key4: 1");
+            pw.println("    key5: 'v3'");
+            pw.println("    key6: 1.5");
+            pw.println("    key7: true");
+        }
+
+        AgentConfiguration conf =
+                
RemoteExecutionEnvironment.loadAgentConfiguration(tmpDir.getAbsolutePath());
+
+        assertThat(conf.getConfData().keySet()).hasSize(6);
+        assert conf.getConfData().get("key1.key2").equals("v1");
+        assert conf.getConfData().get("key1.key3").equals("v2");
+        assert conf.getConfData().get("key4").equals(1);
+        assert conf.getConfData().get("key5").equals("v3");
+        assert conf.getConfData().get("key6").equals(1.5);
+        assert conf.getConfData().get("key7").equals(true);
+    }
+
+    @Test
+    void testLoadAgentConfigurationFailIfNotLoaded() {
+        assertThatThrownBy(
+                        () ->
+                                
RemoteExecutionEnvironment.loadAgentConfiguration(
+                                        "/some/path/" + UUID.randomUUID()))
+                .isInstanceOf(RuntimeException.class);
+    }
+
+    @Test
+    void testLoadAgentConfigurationFailIfNull() {
+        AgentConfiguration conf = 
RemoteExecutionEnvironment.loadAgentConfiguration(null);
+        assertThat(conf.getConfData()).isEmpty();
+    }
+}
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java
index 2b95f37..ca49b85 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/MemoryRefTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.agents.runtime.memory;
 
+import org.apache.flink.agents.api.configuration.ReadableConfiguration;
 import org.apache.flink.agents.api.context.MemoryObject;
 import org.apache.flink.agents.api.context.MemoryRef;
 import org.apache.flink.agents.api.context.RunnerContext;
@@ -89,6 +90,11 @@ public class MemoryRefTest {
         public Resource getResource(String name, ResourceType type) throws 
Exception {
             return null;
         }
+
+        @Override
+        public ReadableConfiguration getConfig() {
+            return null;
+        }
     }
 
     @BeforeEach
diff --git a/tools/e2e.sh b/tools/e2e.sh
old mode 100644
new mode 100755
index f967c34..1a15cfb
--- a/tools/e2e.sh
+++ b/tools/e2e.sh
@@ -50,6 +50,7 @@ echo "tmpdir:$tempdir"
 
 jar_path=e2e-test/agent-plan-compatibility-test/target/flink-agents*.jar
 run_test "Agent plan compatibility end-to-end test" "bash 
e2e-test/test-scripts/test_agent_plan_compatibility.sh $tempdir $jar_path"
+run_test "Cross-Language Config Option end-to-end test" "bash 
e2e-test/test-scripts/test_java_config_in_python.sh"
 
 printf "\n$PASSED/$TOTAL bash e2e-tests passed\n"
 

Reply via email to