This is an automated email from the ASF dual-hosted git repository.
xintongsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new e05521ba [doc] Add YAML API documentation and quickstart example.
(#705)
e05521ba is described below
commit e05521ba388bf7e27dfcc7cbd2a65ead589119d2
Author: Wenjin Xie <[email protected]>
AuthorDate: Wed May 27 23:03:58 2026 +0800
[doc] Add YAML API documentation and quickstart example. (#705)
---
docs/content/docs/development/chat_models.md | 2 +-
docs/content/docs/development/embedding_models.md | 2 +-
.../docs/development/integrate_with_flink.md | 2 +-
docs/content/docs/development/mcp.md | 2 +-
docs/content/docs/development/memory/_index.md | 2 +-
docs/content/docs/development/prompts.md | 2 +-
docs/content/docs/development/tool_use.md | 2 +-
docs/content/docs/development/vector_stores.md | 2 +-
docs/content/docs/development/yaml.md | 555 +++++++++++++++++++++
docs/content/docs/get-started/overview.md | 1 +
.../docs/get-started/quickstart/yaml_agent.md | 375 ++++++++++++++
.../agents/examples/YamlWorkflowAgentExample.java | 92 ++++
.../resources/yaml/yaml_review_analysis_agent.yaml | 72 +++
.../quickstart/yaml_review_analysis_agent.yaml | 65 +++
.../quickstart/yaml_workflow_agent_example.py | 88 ++++
python/pyproject.toml | 1 +
16 files changed, 1257 insertions(+), 8 deletions(-)
diff --git a/docs/content/docs/development/chat_models.md
b/docs/content/docs/development/chat_models.md
index 0fc50545..15b330ae 100644
--- a/docs/content/docs/development/chat_models.md
+++ b/docs/content/docs/development/chat_models.md
@@ -1,6 +1,6 @@
---
title: Chat Models
-weight: 3
+weight: 4
type: docs
---
<!--
diff --git a/docs/content/docs/development/embedding_models.md
b/docs/content/docs/development/embedding_models.md
index 5546ee9c..3646bcf5 100644
--- a/docs/content/docs/development/embedding_models.md
+++ b/docs/content/docs/development/embedding_models.md
@@ -1,6 +1,6 @@
---
title: Embedding Models
-weight: 5
+weight: 6
type: docs
---
<!--
diff --git a/docs/content/docs/development/integrate_with_flink.md
b/docs/content/docs/development/integrate_with_flink.md
index 150e5c0d..fe13d291 100644
--- a/docs/content/docs/development/integrate_with_flink.md
+++ b/docs/content/docs/development/integrate_with_flink.md
@@ -1,6 +1,6 @@
---
title: Integrate with Flink
-weight: 10
+weight: 11
type: docs
---
<!--
diff --git a/docs/content/docs/development/mcp.md
b/docs/content/docs/development/mcp.md
index 0d5cf9ad..929f2e47 100644
--- a/docs/content/docs/development/mcp.md
+++ b/docs/content/docs/development/mcp.md
@@ -1,6 +1,6 @@
---
title: MCP
-weight: 9
+weight: 10
type: docs
---
<!--
diff --git a/docs/content/docs/development/memory/_index.md
b/docs/content/docs/development/memory/_index.md
index 760fdebd..429faa31 100644
--- a/docs/content/docs/development/memory/_index.md
+++ b/docs/content/docs/development/memory/_index.md
@@ -2,7 +2,7 @@
title: Memory
bold: true
bookCollapseSection: true
-weight: 7
+weight: 8
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/docs/development/prompts.md
b/docs/content/docs/development/prompts.md
index b9a5fe88..dc15a349 100644
--- a/docs/content/docs/development/prompts.md
+++ b/docs/content/docs/development/prompts.md
@@ -1,6 +1,6 @@
---
title: Prompts
-weight: 4
+weight: 5
type: docs
---
<!--
diff --git a/docs/content/docs/development/tool_use.md
b/docs/content/docs/development/tool_use.md
index e7ab7a23..895ef2db 100644
--- a/docs/content/docs/development/tool_use.md
+++ b/docs/content/docs/development/tool_use.md
@@ -1,6 +1,6 @@
---
title: Tool Use
-weight: 8
+weight: 9
type: docs
---
<!--
diff --git a/docs/content/docs/development/vector_stores.md
b/docs/content/docs/development/vector_stores.md
index 3ef3fa0a..e8582649 100644
--- a/docs/content/docs/development/vector_stores.md
+++ b/docs/content/docs/development/vector_stores.md
@@ -1,6 +1,6 @@
---
title: Vector Stores
-weight: 6
+weight: 7
type: docs
---
<!--
diff --git a/docs/content/docs/development/yaml.md
b/docs/content/docs/development/yaml.md
new file mode 100644
index 00000000..9bea5c46
--- /dev/null
+++ b/docs/content/docs/development/yaml.md
@@ -0,0 +1,555 @@
+---
+title: YAML API
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Overview
+
+Beyond the Python and Java programmatic APIs, Flink Agents offers a
**declarative YAML API** for describing agents. Compared with the programmatic
APIs, the YAML API has the following advantages:
+
+- **Human-friendly**: low entry barrier and easy to templatize across
deployments.
+- **Coding-agent-friendly**: a fixed schema reduces token cost, enables strict
schema validation, and decouples configuration changes (declared parameters)
from logic changes (action code).
+
+A Flink Agents application is composed of three concepts; the YAML API exposes
all three as declarative sections:
+
+- **Events**: the messages flowing inside an agent. Every event has a type
used for routing.
+- **Actions**: code snippets triggered by events and emitting new events. They
carry the agent's business logic.
+- **Resources**: reusable components — Chat Models, Tools, Prompts, Vector
Stores, MCP servers, etc. — referenced by actions at runtime.
+
+A YAML file describes **Resources** and **Actions** declaratively; the
**Event** types are referenced by name (built-in event aliases like `input` /
`chat_response`, or your own event-type strings). The implementation classes
(action functions, tool functions, custom types) still live in your Python or
Java code; the YAML file only describes *how* those pieces are wired together.
+
+If you'd rather declare agents directly in code, see [Workflow Agent]({{< ref
"docs/development/workflow_agent" >}}) and [ReAct Agent]({{< ref
"docs/development/react_agent" >}}).
+
+## Declaring an Agent
+
+This section walks through:
+
+1. Declaring a single agent in one YAML file.
+2. Declaring multiple agents in one YAML file.
+3. Declaring shared Resources and Actions that any agent in the file (or any
later-loaded file) can reuse.
+
+### Declaring a single agent
+
+The example below declares one agent named `review_analysis_agent` with a
chat-model connection, a chat-model setup, a function tool, and two actions. It
demonstrates every field you typically use.
+
+```yaml
+agents:
+ - name: review_analysis_agent
+ description: Analyze product reviews and emit a satisfaction score.
+
+ # ---- Actions ----
+ actions:
+ - name: process_input
+ function: my_pkg.actions:process_input
+ listen_to: [input]
+ type: python
+ - name: process_chat_response
+ function: my_pkg.actions:process_chat_response
+ listen_to: [chat_response]
+ type: python
+
+ # ---- Resources ----
+ chat_model_connections:
+ - name: ollama_server
+ clazz: ollama
+ type: python
+ base_url: http://localhost:11434
+
+ chat_model_setups:
+ - name: review_analysis_model
+ clazz: ollama
+ type: python
+ connection: ollama_server
+ model: qwen3:8b
+ prompt: review_analysis_prompt
+ tools: [notify_shipping_manager]
+ extract_reasoning: true
+
+ prompts:
+ - name: review_analysis_prompt
+ messages:
+ - role: system
+ content: "Analyze the review and return JSON with score + reasons."
+ - role: user
+ content: "{input}"
+
+ tools:
+ - name: notify_shipping_manager
+ function: my_pkg.actions:notify_shipping_manager
+ type: python
+```
+
+#### Agent properties
+
+| Field | Required | Description |
+|-------|----------|-------------|
+| `name` | yes | Agent name. Must be unique across the environment. Used to
apply the agent by name later. |
+| `description` | no | Free-form description of what the agent does. Not
surfaced at runtime. |
+
+#### Resources
+
+All resource sections are declared as lists under the agent. Each entry has a
`name` unique within its section.
+
+**Prompt** — declarative prompt template; pick exactly one of `text` or
`messages`.
+
+| Field | Required | Description |
+|-------|----------|-------------|
+| `name` | yes | Prompt name (referenced by chat-model setups). |
+| `text` | one-of | Single-string prompt template. Corresponds to
`Prompt.from_text` / `Prompt.fromText`. |
+| `messages` | one-of | Multi-turn message template. Corresponds to
`Prompt.from_messages` / `Prompt.fromMessages`. Each entry has `role` (`system`
/ `user` / `assistant` / `tool`) and `content`. |
+
+```yaml
+prompts:
+ - name: prompt1
+ messages:
+ - {role: system, content: "..."}
+ - {role: user, content: "{input}"}
+ - name: prompt2
+ text: "this is the {value}"
+```
+
+**Tool** — points at a callable that the chat model can invoke.
+
+| Field | Required | Description |
+|-------|----------|-------------|
+| `name` | yes | Tool name (referenced from `chat_model_setups[].tools`). |
+| `function` | yes | Fully-qualified callable in the form
`<module-or-class>:<qualname>`. See [Function
references](#function-references). |
+| `type` | no | Implementation language: `python` or `java`. Defaults to
`python` (see [Selecting the implementation
language](#selecting-the-implementation-language)). |
+| `parameter_types` | java only | Required for Java tools — one Java type FQN
per declared parameter, in order. Forbidden for Python tools (the signature is
reflected from the callable). |
+
+```yaml
+tools:
+ - name: my_tool
+ function: my_pkg.tools:my_tool
+ type: python
+```
+
+For **Java tools**, `parameter_types` is required because Java methods can be
overloaded — list one FQN per declared parameter, in order. Generic type
arguments are not part of the JVM method descriptor and must not be included
(`java.util.List`, not `java.util.List<String>`). Boxed primitives
(`java.lang.Integer`, etc.) and bare primitives (`int`, `boolean`, ...) are
both accepted.
+
+```yaml
+tools:
+ - name: add
+ type: java
+ function: com.example.MyTools:add
+ parameter_types: [java.lang.Integer, java.lang.Integer]
+```
+
+**Skills** — bundles of agent skill assets loaded from one or more sources. At
least one of `paths` / `urls` / `classpath` / `package` must be non-empty;
multiple sources can coexist.
+
+| Field | Required | Description |
+|-------|----------|-------------|
+| `name` | yes | Skills resource name. |
+| `paths` | one-of | `local` scheme: list of directories or `.zip` files. |
+| `urls` | one-of | `url` scheme: list of `http(s)` URLs pointing to `.zip`
archives. |
+| `classpath` | one-of | `classpath` scheme (Java runtime only): list of
classpath resource paths. |
+| `package` | one-of | `package` scheme (Python runtime only): list of
`{package, resource}` pairs. |
+
+```yaml
+skills:
+ - name: agent_skills
+ paths:
+ - ./skills
+ urls:
+ - https://example.com/skills.zip
+ classpath:
+ - skills/my-skills
+ package:
+ - package: my_pkg
+ resource: skills/
+```
+
+`classpath` is accepted by the Python parser for schema parity with Java but
rejected at runtime, because the Python runtime does not register a `classpath`
handler.
+
+**ResourceDescriptor-backed resources** — `chat_model_connections`,
`chat_model_setups`, `embedding_model_connections`, `embedding_model_setups`,
`vector_stores`, `mcp_servers`. Unlike the resources above (which are
constructed inline by the loader), these are described by a
`ResourceDescriptor` and instantiated by the framework at runtime. They all
share the same shape:
+
+| Field | Required | Description |
+|-------|----------|-------------|
+| `name` | yes | Resource name (referenced by other resources or actions). |
+| `clazz` | yes | Either a built-in [alias](#class-aliases) (e.g. `ollama`,
`openai`) or a fully-qualified class path. |
+| `type` | no | Implementation language: `python` or `java`. Defaults to
`python` (see [Selecting the implementation
language](#selecting-the-implementation-language)). |
+| *extras* | no | Any additional keys are forwarded verbatim as
`ResourceDescriptor` init arguments (e.g. `base_url`, `endpoint`, `model`,
`request_timeout`). |
+
+```yaml
+chat_model_connections:
+ - name: my_connection
+ clazz: ollama
+ type: python
+ base_url: http://localhost:11434
+
+mcp_servers:
+ - name: my_mcp
+ clazz: mcp
+ type: python
+ endpoint: http://127.0.0.1:8000/mcp
+```
+
+#### Actions
+
+`actions:` is a list. Each entry is either an inline action **map** or, when
reusing a shared action, a bare **string** referring to the shared action's
name.
+
+Inline action (map) fields:
+
+| Field | Required | Description |
+|-------|----------|-------------|
+| `name` | yes | Action name (unique within the agent). |
+| `function` | yes | Fully-qualified callable in the form
`<module-or-class>:<qualname>`. See [Function
references](#function-references). |
+| `listen_to` | yes | List of event types the action listens to. Built-in
[event aliases](#event-aliases) (`input`, `chat_request`, ...) or your own
event-type strings. |
+| `type` | no | Implementation language: `python` or `java`. Defaults to
`python` (see [Selecting the implementation
language](#selecting-the-implementation-language)). |
+| `config` | no | Free-form configuration map passed to the action at runtime.
|
+
+```yaml
+actions:
+ - name: action1
+ function: my_pkg.actions:action1
+ listen_to: [input]
+ type: python
+ - name: action2
+ function: my_pkg.actions:action2
+ listen_to: [chat_response]
+ type: python
+ - action3 # shared action reference (declared at file
level)
+```
+
+Action method signatures are fixed (`(Event, RunnerContext)`), so there is no
`parameter_types` field on actions.
+
+### Declaring multiple agents in one file
+
+A single YAML file can declare more than one agent under `agents:`. Each agent
is independent and gets its own resource set; agent names must be unique within
the file (and across the whole environment, once loaded).
+
+```yaml
+agents:
+ - name: agent1
+ description: The first agent
+ # actions
+ # resources
+
+ - name: agent2
+ description: The second agent
+ # actions
+ # resources
+```
+
+### Declaring and reusing shared Resources and Actions
+
+Any resource section (`prompts`, `tools`, `chat_model_connections`, ...) or
`actions:` declared as a **top-level sibling** of `agents:` is **shared**: it
is registered on the `AgentsExecutionEnvironment` itself, and any agent in the
file — or in any file later loaded into the same environment — can reference it
by name.
+
+**Shared resources** are referenced inside an agent simply by name:
+
+```yaml
+agents:
+ - name: agent1
+ description: The first agent
+ chat_model_setups:
+ - name: my_llm
+ clazz: ollama
+ connection: my_connection # references the shared connection
+ thinking: false
+
+ - name: agent2
+ description: The second agent
+ chat_model_setups:
+ - name: my_llm
+ clazz: ollama
+ connection: my_connection # same shared connection reused
+ thinking: true
+
+# shared resource at the file level
+chat_model_connections:
+ - name: my_connection
+ clazz: ollama
+ base_url: http://localhost:11434
+```
+
+**Shared actions** are referenced inside an agent's `actions:` list by writing
the action's name as a bare string:
+
+```yaml
+agents:
+ - name: agent1
+ description: The first agent
+ actions:
+ - action1 # shared action
+ - name: my_action
+ function: my_pkg.actions:my_action
+ listen_to: [input]
+
+ - name: agent2
+ description: The second agent
+ actions:
+ - action1 # same shared action reused
+
+# shared actions at the file level
+actions:
+ - name: action1
+ function: my_pkg.actions:action1
+ listen_to: [input]
+ type: python
+ - name: action2
+ function: my_pkg.actions:action2
+ listen_to: [chat_response]
+ type: python
+```
+
+{{< hint info >}}
+Composing one agent across multiple YAML files is **not** supported. If two
files loaded into the same environment declare the same agent name, or the same
shared resource/action name, the second `load_yaml` call raises an error.
+{{< /hint >}}
+
+## Loading and Running
+
+`AgentsExecutionEnvironment` exposes a `load_yaml` / `loadYaml` method that
parses one or more YAML files and:
+
+- registers all shared resources on the environment, and
+- registers all declared agents on the environment.
+
+Once loaded, an agent declared in YAML is applied **by name** through the same
`AgentBuilder` as a code-defined agent — `.apply(...)` accepts either an
`Agent` instance or the name of an agent previously registered on the
environment.
+
+{{< tabs "Load and Apply" >}}
+
+{{< tab "Python" >}}
+```python
+agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+agents_env.load_yaml("path/to/agents.yaml")
+
+review_analysis_res_stream = (
+ agents_env.from_datastream(
+ input=product_review_stream, key_selector=lambda x: x.id
+ )
+ .apply("review_analysis_agent") # look up the agent by name
+ .to_datastream()
+)
+```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+AgentsExecutionEnvironment agentsEnv =
+ AgentsExecutionEnvironment.getExecutionEnvironment(env);
+agentsEnv.loadYaml(Paths.get("path/to/agents.yaml"));
+
+DataStream<Object> outputStream =
+ agentsEnv
+ .fromDataStream(inputStream, (KeySelector<MyPojo, String>)
MyPojo::getId)
+ .apply("review_analysis_agent") // look up the agent by name
+ .toDataStream();
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+`load_yaml` / `loadYaml` accepts either a single path or a list of paths, and
**can be called multiple times**. Multiple calls accumulate on the same
environment; the same uniqueness rules apply across the combined state.
Duplicate agent or resource names — within a file or across files — raise an
error.
+
+{{< tabs "Multi-File Loading" >}}
+
+{{< tab "Python" >}}
+```python
+# Load multiple files in one call
+agents_env.load_yaml(["./agents.yaml", "./shared.yaml"])
+
+# Or call repeatedly — same result
+agents_env.load_yaml("./agents.yaml")
+agents_env.load_yaml("./shared.yaml")
+```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+// Load multiple files in one call
+agentsEnv.loadYaml(Paths.get("./agents.yaml"), Paths.get("./shared.yaml"));
+
+// Or call repeatedly — same result
+agentsEnv.loadYaml(Paths.get("./agents.yaml"));
+agentsEnv.loadYaml(Paths.get("./shared.yaml"));
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+A common pattern is to split a topology file (the agents themselves) from an
infrastructure file (chat-model connections, vector stores, ...). The
infrastructure file can be swapped per environment (dev / staging / prod)
without touching the agent definitions.
+
+For an end-to-end runnable walkthrough that loads a YAML-declared agent and
runs it on Flink, see [YAML Agent Quickstart]({{< ref
"docs/get-started/quickstart/yaml_agent" >}}).
+
+## Advanced Topics
+
+### Function references
+
+Actions and function tools point at user code through a single `function:`
string in the form:
+
+```
+<module-or-class>:<qualname>
+```
+
+The colon separates the **left side** — a Python module or a Java class FQN —
from the **right side** — the attribute path inside it.
+
+{{< tabs "Function Reference Examples" >}}
+
+{{< tab "Python" >}}
+```yaml
+# Top-level function in a module
+function: my_pkg.actions:process_input
+
+# Static method on a class (nested via dots in the qualname)
+function: my_pkg.agents:ReviewAnalysisAgent.process_input
+```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```yaml
+# Static method on a class
+function: com.example.MyActions:processInput
+
+# Static method on a nested/inner class
+function: com.example.Outer$Inner:processInput
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+### Selecting the implementation language
+
+Every resource, tool, and action accepts an optional `type:` field with values
`python` or `java`. When omitted, the default is **`python`** — both the Python
loader and the Java loader treat a missing `type:` as `python`.
+
+This means a YAML file loaded by the **Java loader** must set `type: java`
explicitly on every resource, tool, and action whose implementation is in Java.
Omitting `type:` on a Java resource in a Java-loaded YAML will make the loader
try to build a Python-wrapping descriptor — usually not what you want.
+
+When `type:` resolves to the **opposite** language of the loader, the loader
builds a cross-language descriptor that delegates to the other-language
implementation — see [Cross-language agents](#cross-language-agents).
+
+### Provider aliases
+
+For `clazz:` on resource descriptors and for event names in `listen_to:`, you
can use a short alias instead of a fully-qualified class path.
+
+#### Event aliases
+
+| Alias | Event type |
+| ------------------------------ | -------------------------------- |
+| `input` | `InputEvent` |
+| `output` | `OutputEvent` |
+| `chat_request` | `ChatRequestEvent` |
+| `chat_response` | `ChatResponseEvent` |
+| `tool_request` | `ToolRequestEvent` |
+| `tool_response` | `ToolResponseEvent` |
+| `context_retrieval_request` | `ContextRetrievalRequestEvent` |
+| `context_retrieval_response` | `ContextRetrievalResponseEvent` |
+
+For custom event types defined in your code, write the event's full
`EVENT_TYPE` string instead of an alias.
+
+#### Class aliases
+
+Aliases for `clazz:` are keyed on resource type **and** implementation
language. The same alias (e.g. `ollama`) resolves to a different class for
`chat_model_connections` vs `chat_model_setups`, and for `python` vs `java`.
+
+Common chat-model aliases:
+
+| Alias | `type: python` | `type: java`
|
+| -------------------- | --------------------------- |
--------------------------- |
+| `ollama` | Ollama (Python) | Ollama (Java)
|
+| `openai` | OpenAI Completions (Python) | —
|
+| `openai_completions` | — | OpenAI Completions
(Java) |
+| `openai_responses` | — | OpenAI Responses (Java)
|
+| `anthropic` | Anthropic | Anthropic
|
+| `azure_openai` | Azure OpenAI (Python) | —
|
+| `azure` | — | Azure OpenAI (Java)
|
+| `tongyi` | Tongyi (Python) | —
|
+
+Embedding-model and vector-store aliases follow the same scheme. The full
alias tables live in `flink_agents.api.yaml.aliases` (Python) and
`org.apache.flink.agents.api.yaml.Aliases` (Java).
+
+If `clazz:` is not a known alias, the loader passes it through as-is — write a
fully-qualified class path (e.g.
`flink_agents.integrations.chat_models.ollama_chat_model.OllamaChatModelSetup`)
for providers you've added yourself.
+
+### Cross-language agents
+
+Setting `type:` to the **opposite** language of the loader bridges Python and
Java pieces in the same agent. The loader resolves the alias against the
cross-language bucket and wraps the resource in the appropriate cross-language
proxy.
+
+Example — a Python-side agent whose chat model uses a **Java** Ollama setup
that also calls a **Java** function tool:
+
+```yaml
+agents:
+ - name: cross_language_agent
+ actions:
+ - name: process_input
+ function: my_pkg.actions:process_input
+ listen_to: [input]
+ - name: process_chat_response
+ function: my_pkg.actions:process_chat_response
+ listen_to: [chat_response]
+
+ chat_model_connections:
+ # Python Ollama connection used by the math chat model
+ - name: ollama_connection
+ clazz: ollama
+ request_timeout: 240.0
+ # Java Ollama connection used by the creative chat model
+ - name: ollama_connection_java
+ clazz: ollama
+ type: java
+ endpoint: http://localhost:11434
+ requestTimeout: 240
+
+ chat_model_setups:
+ - name: math_chat_model
+ clazz: ollama
+ connection: ollama_connection
+ model: qwen3:1.7b
+ tools: [calculate_bmi] # python -> java tool via the
bridge
+
+ - name: creative_chat_model
+ clazz: ollama
+ type: java
+ connection: ollama_connection_java
+ model: qwen3:1.7b
+
+ tools:
+ - name: calculate_bmi
+ type: java
+ function: com.example.HealthTools:calculateBMI
+ parameter_types: [java.lang.Double, java.lang.Double]
+```
+
+Loaded with `agents_env.load_yaml(...)` on the Python side, this produces an
agent where:
+
+- The Python `process_input` and `process_chat_response` actions are Python
functions.
+- `math_chat_model` is a Python Ollama setup that calls a Java function tool
through the cross-language tool bridge.
+- `creative_chat_model` is a Java Ollama setup driven from the Python loader
via the Java chat-model wrapper.
+
+Not every resource type is cross-language. Currently `chat_model_connections`,
`chat_model_setups`, `embedding_model_connections`, `embedding_model_setups`,
and `vector_stores` support `type:` on the opposite language; others (e.g.
`mcp_servers`) do not.
+
+## YAML API Specification
+
+To help users and coding agents understand the YAML format and validate YAML
files, Flink Agents publishes a language-neutral **JSON Schema** for the YAML
document. The schema is checked in at
[`docs/yaml-schema.json`](https://github.com/apache/flink-agents/blob/main/docs/yaml-schema.json)
— point your IDE's YAML language server at it for inline validation and
autocompletion.
+
+The schema benefits both humans and machines:
+
+- It defines an interface contract, so LLMs and external systems can interact
with Flink Agents declaratively.
+- It is language-neutral, so it doubles as the compatibility contract between
the Python and the Java loader.
+
+### Python and Java mapping
+
+When loading a YAML file, both runtimes parse it into typed in-memory
representations and validate it against them using the framework's native
validation:
+
+- The **Python loader** parses YAML into Pydantic `BaseModel`s declared in
`flink_agents.api.yaml.specs`. Pydantic enforces the schema (required fields,
value types, `extra="forbid"` on unknown keys, mutually-exclusive
`text`/`messages` on prompts, ...).
+- The **Java loader** parses YAML into POJOs declared in
`org.apache.flink.agents.api.yaml.spec`, validated by Jackson with equivalent
rules.
+
+### Consistency guarantees
+
+Because Pydantic models are easier to author and evolve than raw JSON Schema,
the **ground truth** is the Pydantic spec — the checked-in
`docs/yaml-schema.json` is exported from it. Continuous tests then verify
cross-runtime consistency:
+
+- the JSON Schema exported by the Pydantic specs matches the checked-in
`docs/yaml-schema.json`;
+- the JSON Schema exported by the Java POJOs matches the checked-in
`docs/yaml-schema.json`;
+- the Pydantic specs stay aligned with the Python `Agent` API;
+- the Java POJOs stay aligned with the Java `Agent` API.
+
+This keeps the YAML API a true cross-language contract: a YAML file that
validates against the schema is guaranteed to load and run on either the Python
or the Java loader (subject only to the language-specific differences
documented above, such as `parameter_types` on Java tools).
diff --git a/docs/content/docs/get-started/overview.md
b/docs/content/docs/get-started/overview.md
index fb31c74f..4172d14e 100644
--- a/docs/content/docs/get-started/overview.md
+++ b/docs/content/docs/get-started/overview.md
@@ -45,3 +45,4 @@ To get started with Apache Flink Agents, you can checkout the
following quicksta
- [Workflow Agent Quickstart]({{< ref
"docs/get-started/quickstart/workflow_agent" >}})
- [ReAct Agent Quickstart]({{< ref "docs/get-started/quickstart/react_agent"
>}})
+- [YAML Agent Quickstart]({{< ref "docs/get-started/quickstart/yaml_agent" >}})
diff --git a/docs/content/docs/get-started/quickstart/yaml_agent.md
b/docs/content/docs/get-started/quickstart/yaml_agent.md
new file mode 100644
index 00000000..233d7128
--- /dev/null
+++ b/docs/content/docs/get-started/quickstart/yaml_agent.md
@@ -0,0 +1,375 @@
+---
+title: 'YAML Agent'
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Overview
+
+The YAML API lets you declare a Flink Agents application — the agent's
actions, tools, prompts, chat models, and other resources — as a single
declarative file, and load it into an `AgentsExecutionEnvironment` with one
call. Your action and tool implementations still live in Python or Java; the
YAML file only describes how those pieces are wired together.
+
+This quickstart runs the same **Review Analysis** workflow as the [Workflow
Agent Quickstart]({{< ref "docs/get-started/quickstart/workflow_agent" >}}) —
extracting a satisfaction score and dissatisfaction reasons from a stream of
product reviews — but declares the agent in YAML and loads it with `load_yaml`
/ `loadYaml`. The action functions and the `notify_shipping_manager` tool are
reused from the workflow-agent example as static methods, so this quickstart
shows the **smallest possib [...]
+
+For the full reference of the YAML format, see the [YAML API]({{< ref
"docs/development/yaml" >}}) documentation.
+
+## Code Walkthrough
+
+### Declare the Agent in YAML
+
+The whole agent — chat model, prompt, tool, and the two actions — is declared
in `yaml_review_analysis_agent.yaml`. The `function:` strings point at static
methods on the existing `ReviewAnalysisAgent` class from the workflow-agent
quickstart.
+
+{{< tabs "Declare Agent YAML" >}}
+
+{{< tab "Python" >}}
+```yaml
+agents:
+ - name: review_analysis_agent
+ description: |
+ YAML-declared review analysis agent. Reuses the static methods of
+ ReviewAnalysisAgent from the workflow_agent quickstart as actions
+ and tool, but wires everything together through this YAML file
+ instead of class-level decorators.
+
+ actions:
+ - name: process_input
+ function:
flink_agents.examples.quickstart.agents.review_analysis_agent:ReviewAnalysisAgent.process_input
+ listen_to: [input]
+ - name: process_chat_response
+ function:
flink_agents.examples.quickstart.agents.review_analysis_agent:ReviewAnalysisAgent.process_chat_response
+ listen_to: [chat_response]
+
+ chat_model_connections:
+ - name: ollama_server
+ clazz: ollama
+ request_timeout: 120
+
+ prompts:
+ - name: review_analysis_prompt
+ messages:
+ - role: system
+ content: |
+ Analyze the user review and product information to determine a
+ satisfaction score (1-5) and potential reasons for
dissatisfaction.
+
+ Example input format:
+ {
+ "id": "12345",
+ "review": "The headphones broke after one week. Very poor
quality."
+ }
+
+ Ensure your response can be parsed by Python JSON, using this
format
+ as an example:
+ {
+ "id": "12345",
+ "score": 1,
+ "reasons": ["poor quality"]
+ }
+
+ If the review mentions shipping dissatisfaction, first call
+ notify_shipping_manager, then return the JSON above with no
+ mention of the tool call.
+ - role: user
+ content: |
+ "input":
+ {input}
+
+ chat_model_setups:
+ - name: review_analysis_model
+ clazz: ollama
+ connection: ollama_server
+ model: qwen3:8b
+ prompt: review_analysis_prompt
+ tools: [notify_shipping_manager]
+ extract_reasoning: true
+
+ tools:
+ - name: notify_shipping_manager
+ function:
flink_agents.examples.quickstart.agents.review_analysis_agent:ReviewAnalysisAgent.notify_shipping_manager
+```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```yaml
+agents:
+ - name: review_analysis_agent
+ description: |
+ YAML-declared review analysis agent. Reuses the static methods of
+ ReviewAnalysisAgent from the workflow_agent quickstart as actions
+ and tool, but wires everything together through this YAML file
+ instead of class-level annotations.
+
+ actions:
+ - name: processInput
+ type: java
+ function:
org.apache.flink.agents.examples.agents.ReviewAnalysisAgent:processInput
+ listen_to: [input]
+ - name: processChatResponse
+ type: java
+ function:
org.apache.flink.agents.examples.agents.ReviewAnalysisAgent:processChatResponse
+ listen_to: [chat_response]
+
+ chat_model_connections:
+ - name: ollama_server
+ clazz: ollama
+ type: java
+ endpoint: http://localhost:11434
+ requestTimeout: 120
+
+ prompts:
+ - name: review_analysis_prompt
+ messages:
+ - role: system
+ content: |
+ Analyze the user review and product information to determine a
+ satisfaction score (1-5) and potential reasons for
dissatisfaction.
+
+ Example input format:
+ {
+ "id": "12345",
+ "review": "The headphones broke after one week. Very poor
quality."
+ }
+
+ Ensure your response can be parsed by Java JSON, using this
format
+ as an example:
+ {
+ "id": "12345",
+ "score": 1,
+ "reasons": ["poor quality"]
+ }
+
+ If the review mentions shipping dissatisfaction, first call
+ notifyShippingManager, then return the JSON above with no
+ mention of the tool call.
+ - role: user
+ content: |
+ "input":
+ {input}
+
+ # Action ``processInput`` sends ``ChatRequestEvent("reviewAnalysisModel",
...)``,
+ # so the chat-model setup MUST be named ``reviewAnalysisModel``.
+ chat_model_setups:
+ - name: reviewAnalysisModel
+ clazz: ollama
+ type: java
+ connection: ollama_server
+ model: qwen3:8b
+ prompt: review_analysis_prompt
+ tools: [notifyShippingManager]
+ extract_reasoning: true
+
+ tools:
+ - name: notifyShippingManager
+ type: java
+ function:
org.apache.flink.agents.examples.agents.ReviewAnalysisAgent:notifyShippingManager
+ parameter_types: [java.lang.String, java.lang.String]
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+A few things to notice in the YAML above:
+
+- `clazz: ollama` is an alias resolved by the loader to the full Ollama
chat-model class — see the alias table in the [YAML API]({{< ref
"docs/development/yaml#class-aliases" >}}) doc.
+- `listen_to: [input]` / `[chat_response]` use **event aliases** for the
framework's built-in events.
+- `function:` strings use the `<module-or-class>:<qualname>` format. The right
side is the class-qualified method name, so the YAML reuses the same
`process_input` / `processInput` static methods the original
`ReviewAnalysisAgent` already defines.
+- The prompt is declared inline as a `messages:` list and referenced from the
chat-model setup by name.
+
+### Load and Run
+
+In the entry-point script, build the Flink stream as usual, then load the YAML
and apply the agent **by name**.
+
+{{< tabs "Load and Run" >}}
+
+{{< tab "Python" >}}
+```python
+# Set up the Flink streaming environment and the Agents execution environment.
+env = StreamExecutionEnvironment.get_execution_environment()
+agents_env = AgentsExecutionEnvironment.get_execution_environment(env)
+
+# limit async request to avoid overwhelming ollama server
+agents_env.get_config().set(AgentExecutionOptions.NUM_ASYNC_THREADS, 2)
+
+# Load the YAML — agents and shared resources are registered on the env.
+agents_env.load_yaml(current_dir / "yaml_review_analysis_agent.yaml")
+
+# Read product reviews from a text file as a streaming source.
+product_review_stream = env.from_source(
+ source=FileSource.for_record_stream_format(
+ StreamFormat.text_line_format(), f"file:///{current_dir}/resources"
+ )
+ .monitor_continuously(Duration.of_minutes(1))
+ .build(),
+ watermark_strategy=WatermarkStrategy.no_watermarks(),
+ source_name="yaml_review_analysis_example",
+).map(lambda x: ProductReview.model_validate_json(x))
+
+# Apply the YAML-declared agent BY NAME.
+review_analysis_res_stream = (
+ agents_env.from_datastream(
+ input=product_review_stream, key_selector=lambda x: x.id
+ )
+ .apply("review_analysis_agent")
+ .to_datastream()
+)
+
+review_analysis_res_stream.print()
+agents_env.execute()
+```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+// Set up the Flink streaming environment and the Agents execution environment.
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+AgentsExecutionEnvironment agentsEnv =
+ AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+// limit async request to avoid overwhelming ollama server
+agentsEnv.getConfig().set(AgentExecutionOptions.NUM_ASYNC_THREADS, 2);
+
+// Load the YAML — agents and shared resources are registered on the env.
+Path yamlPath = copyResource("yaml/yaml_review_analysis_agent.yaml").toPath();
+agentsEnv.loadYaml(yamlPath);
+
+// Read product reviews from input_data.txt file as a streaming source.
+File inputDataFile = copyResource("input_data.txt");
+DataStream<String> productReviewStream =
+ env.fromSource(
+ FileSource.forRecordStreamFormat(
+ new TextLineInputFormat(),
+ new Path(inputDataFile.getAbsolutePath()))
+ .build(),
+ WatermarkStrategy.noWatermarks(),
+ "yaml-review-analysis-example");
+
+// Apply the YAML-declared agent BY NAME.
+DataStream<Object> reviewAnalysisResStream =
+ agentsEnv
+ .fromDataStream(productReviewStream)
+ .apply("review_analysis_agent")
+ .toDataStream();
+
+reviewAnalysisResStream.print();
+agentsEnv.execute();
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+The key difference from the code-defined workflow agent quickstart is the pair
of calls:
+
+1. `agents_env.load_yaml(...)` / `agentsEnv.loadYaml(...)` — parses the YAML
and registers the declared agent(s) and shared resources on the environment.
+2. `.apply("review_analysis_agent")` — looks the agent up **by name** instead
of passing an `Agent` instance.
+
+Everything else — the Flink source, the key selector, the sink — is identical.
+
+## Run the Example
+
+### Prerequisites
+
+* Unix-like environment (we use Linux, Mac OS X, Cygwin, WSL)
+* Git
+* Java 11+
+* Python 3.10, 3.11 or 3.12
+
+### Preparation
+
+#### Prepare Flink and Flink Agents
+
+Follow the [installation]({{< ref "docs/get-started/installation" >}})
instructions to set up Flink and Flink Agents.
+
+#### Clone the Flink Agents Repository (if not done already)
+
+```bash
+git clone https://github.com/apache/flink-agents.git
+cd flink-agents
+```
+
+{{< hint info >}}
+For python examples, you can skip this step and submit the python file in
installed flink-agents wheel.
+{{< /hint >}}
+
+#### Deploy a Standalone Flink Cluster
+
+You can deploy a standalone Flink cluster in your local environment with the
following command.
+
+{{< tabs "Deploy a Standalone Flink Cluster" >}}
+
+{{< tab "Python" >}}
+```bash
+export PYTHONPATH=$(python -c 'import sysconfig;
print(sysconfig.get_paths()["purelib"])')
+$FLINK_HOME/bin/start-cluster.sh
+```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+1. Build Flink Agents from source to generate example jar. See
[installation]({{< ref "docs/get-started/installation" >}}) for more details.
+2. Start the Flink cluster
+ ```bash
+ $FLINK_HOME/bin/start-cluster.sh
+ ```
+
+{{< hint info >}}
+To run example on JDK 21+, append jvm option
`--add-exports=java.base/jdk.internal.vm=ALL-UNNAMED` to
[env.java.opts.all](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#env-java-opts-all)
in `$FLINK_HOME/conf/config.yaml` before start the flink cluster.
+{{< /hint >}}
+{{< /tab >}}
+
+{{< /tabs >}}
+
+#### Prepare Ollama
+
+Download and install Ollama from the official
[website](https://ollama.com/download).
+
+{{< hint info >}}
+Ollama server **0.9.0** or higher is required.
+{{< /hint >}}
+
+Then pull the `qwen3:8b` model:
+
+```bash
+ollama pull qwen3:8b
+```
+
+### Submit Flink Agents Job to Standalone Flink Cluster
+
+{{< tabs "Submit YAML Example" >}}
+
+{{< tab "Python" >}}
+```bash
+export PYTHONPATH=$(python -c 'import sysconfig;
print(sysconfig.get_paths()["purelib"])')
+
+# Run the YAML-declared review analysis example
+$FLINK_HOME/bin/flink run -py
./flink-agents/python/flink_agents/examples/quickstart/yaml_workflow_agent_example.py
+# or submit the example python file in installed flink-agents wheel
+$FLINK_HOME/bin/flink run -py
$PYTHONPATH/flink_agents/examples/quickstart/yaml_workflow_agent_example.py
+```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```bash
+$FLINK_HOME/bin/flink run -c
org.apache.flink.agents.examples.YamlWorkflowAgentExample
./flink-agents/examples/target/flink-agents-examples-$VERSION.jar
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+You should see a Flink job submitted to the Flink Cluster in the Flink web UI
at [localhost:8081](localhost:8081). After a few minutes, the analysis results
— one JSON record per input review — appear in the TaskManager output log.
diff --git
a/examples/src/main/java/org/apache/flink/agents/examples/YamlWorkflowAgentExample.java
b/examples/src/main/java/org/apache/flink/agents/examples/YamlWorkflowAgentExample.java
new file mode 100644
index 00000000..60cfbb25
--- /dev/null
+++
b/examples/src/main/java/org/apache/flink/agents/examples/YamlWorkflowAgentExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.examples;
+
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.agents.api.agents.AgentExecutionOptions;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.io.File;
+
+/**
+ * Java example demonstrating a YAML-declared workflow agent.
+ *
+ * <p>The agent (chat model, prompt, tool, and actions) is declared in {@code
+ * yaml_review_analysis_agent.yaml} on the classpath. The YAML file's {@code
function:} fields point
+ * at the static methods of {@link
org.apache.flink.agents.examples.agents.ReviewAnalysisAgent}
+ * already defined for the code-only quickstart, so this example demonstrates
the minimal delta
+ * between a code-defined and a YAML-declared agent.
+ *
+ * <p>Pipeline:
+ *
+ * <pre>{@code
+ * FileSource (input_data.txt) -> AgentsEnv.apply("review_analysis_agent") ->
print
+ * }</pre>
+ */
+public class YamlWorkflowAgentExample {
+
+ /** Runs the example pipeline. */
+ public static void main(String[] args) throws Exception {
+ // Set up the Flink streaming environment and the Agents execution
environment.
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ AgentsExecutionEnvironment agentsEnv =
+ AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+ // limit async request to avoid overwhelming ollama server
+ agentsEnv.getConfig().set(AgentExecutionOptions.NUM_ASYNC_THREADS, 2);
+
+ // Load the YAML — the declared agent and its chat-model connection,
+ // chat-model setup, prompt, tool, and actions are all registered on
+ // the environment in this single call.
+ File yamlFile =
+
WorkflowSingleAgentExample.copyResource("yaml/yaml_review_analysis_agent.yaml");
+ agentsEnv.loadYaml(yamlFile.toPath());
+
+ // Read product reviews from input_data.txt file as a streaming source.
+ // Each element represents a ProductReview.
+ File inputDataFile =
WorkflowSingleAgentExample.copyResource("input_data.txt");
+ DataStream<String> productReviewStream =
+ env.fromSource(
+ FileSource.forRecordStreamFormat(
+ new TextLineInputFormat(),
+ new
Path(inputDataFile.getAbsolutePath()))
+ .build(),
+ WatermarkStrategy.noWatermarks(),
+ "yaml-review-analysis-example");
+
+ // Apply the YAML-declared agent BY NAME — ``apply`` accepts either an
+ // Agent instance or the name of an agent registered on the
environment.
+ DataStream<Object> reviewAnalysisResStream =
+ agentsEnv
+ .fromDataStream(productReviewStream)
+ .apply("review_analysis_agent")
+ .toDataStream();
+
+ // Print the analysis results to stdout.
+ reviewAnalysisResStream.print();
+
+ // Execute the Flink pipeline.
+ agentsEnv.execute();
+ }
+}
diff --git a/examples/src/main/resources/yaml/yaml_review_analysis_agent.yaml
b/examples/src/main/resources/yaml/yaml_review_analysis_agent.yaml
new file mode 100644
index 00000000..723c1df9
--- /dev/null
+++ b/examples/src/main/resources/yaml/yaml_review_analysis_agent.yaml
@@ -0,0 +1,72 @@
+agents:
+ - name: review_analysis_agent
+ description: |
+ YAML-declared review analysis agent. Reuses the static methods of
+ ReviewAnalysisAgent from the workflow_agent quickstart as actions
+ and tool, but wires everything together through this YAML file
+ instead of class-level annotations.
+
+ actions:
+ - name: processInput
+ type: java
+ function:
org.apache.flink.agents.examples.agents.ReviewAnalysisAgent:processInput
+ listen_to: [input]
+ - name: processChatResponse
+ type: java
+ function:
org.apache.flink.agents.examples.agents.ReviewAnalysisAgent:processChatResponse
+ listen_to: [chat_response]
+
+ chat_model_connections:
+ - name: ollama_server
+ clazz: ollama
+ type: java
+ endpoint: http://localhost:11434
+ requestTimeout: 120
+
+ prompts:
+ - name: review_analysis_prompt
+ messages:
+ - role: system
+ content: |
+ Analyze the user review and product information to determine a
+ satisfaction score (1-5) and potential reasons for
dissatisfaction.
+
+ Example input format:
+ {
+ "id": "12345",
+ "review": "The headphones broke after one week. Very poor
quality."
+ }
+
+ Ensure your response can be parsed by Java JSON, using this
+ format as an example:
+ {
+ "id": "12345",
+ "score": 1,
+ "reasons": ["poor quality"]
+ }
+
+ If the review mentions shipping dissatisfaction, first call
+ notifyShippingManager, then return the JSON above with no
+ mention of the tool call.
+ - role: user
+ content: |
+ "input":
+ {input}
+
+ # Action ``processInput`` sends ``ChatRequestEvent("reviewAnalysisModel",
...)``,
+ # so the chat-model setup MUST be named ``reviewAnalysisModel``.
+ chat_model_setups:
+ - name: reviewAnalysisModel
+ clazz: ollama
+ type: java
+ connection: ollama_server
+ model: qwen3:8b
+ prompt: review_analysis_prompt
+ tools: [notifyShippingManager]
+ extract_reasoning: true
+
+ tools:
+ - name: notifyShippingManager
+ type: java
+ function:
org.apache.flink.agents.examples.agents.ReviewAnalysisAgent:notifyShippingManager
+ parameter_types: [java.lang.String, java.lang.String]
diff --git
a/python/flink_agents/examples/quickstart/yaml_review_analysis_agent.yaml
b/python/flink_agents/examples/quickstart/yaml_review_analysis_agent.yaml
new file mode 100644
index 00000000..f6aff0a8
--- /dev/null
+++ b/python/flink_agents/examples/quickstart/yaml_review_analysis_agent.yaml
@@ -0,0 +1,65 @@
+agents:
+ - name: review_analysis_agent
+ description: |
+ YAML-declared review analysis agent. Reuses the static methods of
+ ReviewAnalysisAgent from the workflow_agent quickstart as actions
+ and tool, but wires everything together through this YAML file
+ instead of class-level decorators.
+
+ actions:
+ - name: process_input
+ function:
flink_agents.examples.quickstart.agents.review_analysis_agent:ReviewAnalysisAgent.process_input
+ listen_to: [input]
+ - name: process_chat_response
+ function:
flink_agents.examples.quickstart.agents.review_analysis_agent:ReviewAnalysisAgent.process_chat_response
+ listen_to: [chat_response]
+
+ chat_model_connections:
+ - name: ollama_server
+ clazz: ollama
+ request_timeout: 120
+
+ prompts:
+ - name: review_analysis_prompt
+ messages:
+ - role: system
+ content: |
+ Analyze the user review and product information to determine a
+ satisfaction score (1-5) and potential reasons for
dissatisfaction.
+
+ Example input format:
+ {
+ "id": "12345",
+ "review": "The headphones broke after one week. Very poor
quality."
+ }
+
+ Ensure your response can be parsed by Python JSON, using this
+ format as an example:
+ {
+ "id": "12345",
+ "score": 1,
+ "reasons": ["poor quality"]
+ }
+
+ If the review mentions shipping dissatisfaction, first call
+ notify_shipping_manager, then return the JSON above with no
+ mention of the tool call.
+ - role: user
+ content: |
+ "input":
+ {input}
+
+ # Action ``process_input`` sends
``ChatRequestEvent(model="review_analysis_model", ...)``,
+ # so the chat-model setup MUST be named ``review_analysis_model``.
+ chat_model_setups:
+ - name: review_analysis_model
+ clazz: ollama
+ connection: ollama_server
+ model: qwen3:8b
+ prompt: review_analysis_prompt
+ tools: [notify_shipping_manager]
+ extract_reasoning: true
+
+ tools:
+ - name: notify_shipping_manager
+ function:
flink_agents.examples.quickstart.agents.review_analysis_agent:ReviewAnalysisAgent.notify_shipping_manager
diff --git
a/python/flink_agents/examples/quickstart/yaml_workflow_agent_example.py
b/python/flink_agents/examples/quickstart/yaml_workflow_agent_example.py
new file mode 100644
index 00000000..9291de66
--- /dev/null
+++ b/python/flink_agents/examples/quickstart/yaml_workflow_agent_example.py
@@ -0,0 +1,88 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from pathlib import Path
+
+from pyflink.common import Duration, WatermarkStrategy
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
+
+from flink_agents.api.core_options import AgentExecutionOptions
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.examples.quickstart.agents.custom_types_and_resources import
(
+ ProductReview,
+)
+
+current_dir = Path(__file__).parent
+
+
+def main() -> None:
+ """Run the product review analysis quickstart with the agent declared in
YAML.
+
+ The agent (chat model, prompt, tool, and actions) is declared in
+ ``yaml_review_analysis_agent.yaml``. The YAML file's ``function:``
+ fields point at the static methods of ``ReviewAnalysisAgent`` already
+ defined for the code-only quickstart, so this example demonstrates the
+ minimal delta between a code-defined and a YAML-declared agent.
+ """
+ # Set up the Flink streaming environment and the Agents execution
environment.
+ env = StreamExecutionEnvironment.get_execution_environment()
+ agents_env = AgentsExecutionEnvironment.get_execution_environment(env)
+
+ # limit async request to avoid overwhelming ollama server
+ agents_env.get_config().set(AgentExecutionOptions.NUM_ASYNC_THREADS, 2)
+
+ # Load the YAML — the declared agent and its chat-model connection,
+ # chat-model setup, prompt, tool, and actions are all registered on
+ # the environment in this single call.
+ agents_env.load_yaml(current_dir / "yaml_review_analysis_agent.yaml")
+
+ # Read product reviews from a text file as a streaming source.
+ # Each line in the file should be a JSON string representing a
ProductReview.
+ product_review_stream = env.from_source(
+ source=FileSource.for_record_stream_format(
+ StreamFormat.text_line_format(), f"file:///{current_dir}/resources"
+ )
+ .monitor_continuously(Duration.of_minutes(1))
+ .build(),
+ watermark_strategy=WatermarkStrategy.no_watermarks(),
+ source_name="yaml_review_analysis_example",
+ ).map(
+ lambda x: ProductReview.model_validate_json(
+ x
+ ) # Deserialize JSON to ProductReview.
+ )
+
+ # Apply the YAML-declared agent BY NAME — ``apply`` accepts either an
+ # Agent instance or the name of an agent registered on the environment.
+ review_analysis_res_stream = (
+ agents_env.from_datastream(
+ input=product_review_stream, key_selector=lambda x: x.id
+ )
+ .apply("review_analysis_agent")
+ .to_datastream()
+ )
+
+ # Print the analysis results to stdout.
+ review_analysis_res_stream.print()
+
+ # Execute the Flink pipeline.
+ agents_env.execute()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/python/pyproject.toml b/python/pyproject.toml
index de969a15..0c8f74e5 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -70,6 +70,7 @@ exclude = ["_build_backend*"]
[tool.setuptools.package-data]
"flink_agents.lib" = ["**/*.jar"]
+"flink_agents.examples.quickstart" = ["**/*.yaml"]
"flink_agents.examples.quickstart.resources" = ["**/*.txt"]
# Optional dependencies (dependency groups)