xintongsong commented on code in PR #237:
URL: https://github.com/apache/flink-agents/pull/237#discussion_r2388036098
##########
docs/content/docs/operations/deployment.md:
##########
@@ -24,18 +24,141 @@ under the License.
## Local Run with Test Data
-{{< hint warning >}}
-**TODO**: How to run with test data with LocalExecutorEnvironment.
-{{< /hint >}}
+After completing the [installation of flink-agents]({{< ref
"docs/get-started/installation" >}}) and [building your agent]({{< ref
"docs/development/workflow_agent" >}}), you can test and execute your agent
locally using a simple Python script. This allows you to validate logic without
requiring a Flink cluster.
Review Comment:
Should mention that local executor is for python only.
##########
docs/content/docs/operations/deployment.md:
##########
@@ -24,18 +24,141 @@ under the License.
## Local Run with Test Data
-{{< hint warning >}}
-**TODO**: How to run with test data with LocalExecutorEnvironment.
-{{< /hint >}}
+After completing the [installation of flink-agents]({{< ref
"docs/get-started/installation" >}}) and [building your agent]({{< ref
"docs/development/workflow_agent" >}}), you can test and execute your agent
locally using a simple Python script. This allows you to validate logic without
requiring a Flink cluster.
+
+### Key Features
+
+- **No Flink Required**: Local execution is ideal for development and testing.
+- **Test Data Simulation**: Easily inject mock inputs for validation.
+- **IDE Compatibility**: Run directly in your preferred development
environment.
+
+### Data Format
+
+#### Input Data Format
+
+The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with
the following structure:
+
+``````python
+[
+ {
+ # Optional field: Context key for multi-session management
+ # Priority order: "key" > "k" > auto-generated UUID
+ "key": "session_001", # or use shorthand "k": "session_001"
Review Comment:
This is hard to understand. What is multi-session management? What is
auto-generated UUID? We really need to keep it simple, and should not confuse
users with unnecessary complexity.
##########
docs/content/docs/operations/deployment.md:
##########
@@ -24,18 +24,141 @@ under the License.
## Local Run with Test Data
-{{< hint warning >}}
-**TODO**: How to run with test data with LocalExecutorEnvironment.
-{{< /hint >}}
+After completing the [installation of flink-agents]({{< ref
"docs/get-started/installation" >}}) and [building your agent]({{< ref
"docs/development/workflow_agent" >}}), you can test and execute your agent
locally using a simple Python script. This allows you to validate logic without
requiring a Flink cluster.
+
+### Key Features
+
+- **No Flink Required**: Local execution is ideal for development and testing.
+- **Test Data Simulation**: Easily inject mock inputs for validation.
+- **IDE Compatibility**: Run directly in your preferred development
environment.
+
+### Data Format
+
+#### Input Data Format
+
+The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with
the following structure:
+
+``````python
+[
+ {
+ # Optional field: Context key for multi-session management
+ # Priority order: "key" > "k" > auto-generated UUID
+ "key": "session_001", # or use shorthand "k": "session_001"
+
+ # Required field: Input content (supports text, JSON, or any
serializable type)
+ # This becomes the `input` field in InputEvent
+ "value": "Calculate the sum of 1 and 2.", # or shorthand "v": "..."
Review Comment:
Can we have some examples?
##########
docs/content/docs/operations/deployment.md:
##########
@@ -24,18 +24,141 @@ under the License.
## Local Run with Test Data
-{{< hint warning >}}
-**TODO**: How to run with test data with LocalExecutorEnvironment.
-{{< /hint >}}
+After completing the [installation of flink-agents]({{< ref
"docs/get-started/installation" >}}) and [building your agent]({{< ref
"docs/development/workflow_agent" >}}), you can test and execute your agent
locally using a simple Python script. This allows you to validate logic without
requiring a Flink cluster.
+
+### Key Features
+
+- **No Flink Required**: Local execution is ideal for development and testing.
+- **Test Data Simulation**: Easily inject mock inputs for validation.
+- **IDE Compatibility**: Run directly in your preferred development
environment.
+
+### Data Format
+
+#### Input Data Format
+
+The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with
the following structure:
+
+``````python
+[
+ {
+ # Optional field: Context key for multi-session management
+ # Priority order: "key" > "k" > auto-generated UUID
+ "key": "session_001", # or use shorthand "k": "session_001"
+
+ # Required field: Input content (supports text, JSON, or any
serializable type)
+ # This becomes the `input` field in InputEvent
+ "value": "Calculate the sum of 1 and 2.", # or shorthand "v": "..."
+ },
+ ...
+]
+``````
+
+#### Output Data Format
+
+The output data is a list of dictionaries (`List[Dict[str, Any]]`) where each
dictionary contains a single key-value pair representing the processed result.
The structure is generated from `OutputEvent` objects:
+
+``````python
+[
+ {key_1: output_1}, # From first OutputEvent
+ {key_2: output_2}, # From second OutputEvent
+ ...
+]
+``````
+
+Each dictionary in the output list follows this pattern:
+
+``````
+{
+ # Key: Matches the input context key (from "key"/"k" field or
auto-generated UUID)
+ # Value: Result from agent processing (type depends on implementation)
+ <context_key>: <processed_output>
+}
+``````
+
+### Example for Local Run with Test Data
+
+``````python
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from my_module.agents import MyAgent # Replace with your actual agent path
+
+if __name__ == "__main__":
+ # 1. Initialize environment
+ env = AgentsExecutionEnvironment.get_execution_environment()
+
+ # 2. Prepare test data
+ input_data = [
+ {"key": "0001", "value": "Calculate the sum of 1 and 2."},
+ {"key": "0002", "value": "Tell me a joke about cats."}
+ ]
+
+ # 3. Create agent instance
+ agent = MyAgent()
+
+ # 4. Build pipeline
+ output_data = env.from_list(input_data) \
+ .apply(agent) \
+ .to_list()
+
+ # 5. Execute and show results
+ env.execute()
+
+ print("\nExecution Results:")
+ for record in output_data:
+ for key, value in record.items():
+ print(f"{key}: {value}")
+
+``````
## Local Run with Flink MiniCluster
-{{< hint warning >}}
-**TODO**: How to run with Flink MiniCluster locally.
+After completing the [installation of flink-agents]({{< ref
"docs/get-started/installation" >}}) and [building your agent]({{< ref
"docs/development/workflow_agent" >}}), you can test and execute your agent
locally using a **Flink MiniCluster embedded in Python**. This allows you to
simulate a real Flink streaming environment without deploying to a full
cluster. For more details about how to integrate agents with Flink's
`DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref
"docs/development/integrate_with_flink" >}}) documentation.
+
+{{< hint info >}}
+
+If you encounter the exception "No module named 'flink_agents'" when running
with Flink MiniCluster, you can set the PYTHONPATH by adding
os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] at the beginning of
your code.
+
{{< /hint >}}
## Run in Flink Cluster
-{{< hint warning >}}
-**TODO**: How to run in Flink Cluster.
-{{< /hint >}}
\ No newline at end of file
+Flink Agents jobs are deployed and run on the cluster similarly to Pyflink
jobs. You can refer to the instructions for [submit pyflink
jobs](https://https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#submitting-pyflink-jobs)
for more detailed steps. The following explains how to submit Flink Agents
jobs to the cluster, using a Standalone cluster as an example.
+
+### Prepare Flink Agents
+
+We recommand creating a Python virtual environment to install the Flink Agents
Python library.
+
+Follow the [instructions]({{< ref "docs/get-started/installation" >}}) to
install the Flink Agents Python and Java libraries.
+
+### Deploy a Standalone Flink Cluster
+
+Download a stable release of Flink 1.20.3, then extract the archive. You can
refer to the [local
installation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/local_installation/)
instructions for more detailed step.
+
+```bash
+curl -LO
https://archive.apache.org/dist/flink/flink-1.20.3/flink-1.20.3-bin-scala_2.12.tgz
+tar -xzf flink-1.20.3-bin-scala_2.12.tgz
+```
+
+Deploy a standalone Flink cluster in your local environment with the following
command.
+
+```bash
+./flink-1.20.3/bin/start-cluster.sh
+```
+
+You should be able to navigate to the web UI at
[localhost:8081](localhost:8081) to view the Flink dashboard and see that the
cluster is up and running.
Review Comment:
It doesn't have to be a standalone cluster. And why do we need to teach our
users how to start a flink cluster in the documentation of flink-agents?
##########
docs/content/docs/operations/deployment.md:
##########
@@ -24,18 +24,141 @@ under the License.
## Local Run with Test Data
-{{< hint warning >}}
-**TODO**: How to run with test data with LocalExecutorEnvironment.
-{{< /hint >}}
+After completing the [installation of flink-agents]({{< ref
"docs/get-started/installation" >}}) and [building your agent]({{< ref
"docs/development/workflow_agent" >}}), you can test and execute your agent
locally using a simple Python script. This allows you to validate logic without
requiring a Flink cluster.
+
+### Key Features
+
+- **No Flink Required**: Local execution is ideal for development and testing.
+- **Test Data Simulation**: Easily inject mock inputs for validation.
+- **IDE Compatibility**: Run directly in your preferred development
environment.
+
+### Data Format
+
+#### Input Data Format
+
+The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with
the following structure:
+
+``````python
+[
+ {
+ # Optional field: Context key for multi-session management
+ # Priority order: "key" > "k" > auto-generated UUID
+ "key": "session_001", # or use shorthand "k": "session_001"
+
+ # Required field: Input content (supports text, JSON, or any
serializable type)
+ # This becomes the `input` field in InputEvent
+ "value": "Calculate the sum of 1 and 2.", # or shorthand "v": "..."
+ },
+ ...
+]
+``````
+
+#### Output Data Format
+
+The output data is a list of dictionaries (`List[Dict[str, Any]]`) where each
dictionary contains a single key-value pair representing the processed result.
The structure is generated from `OutputEvent` objects:
+
+``````python
+[
+ {key_1: output_1}, # From first OutputEvent
+ {key_2: output_2}, # From second OutputEvent
+ ...
+]
+``````
+
+Each dictionary in the output list follows this pattern:
+
+``````
+{
+ # Key: Matches the input context key (from "key"/"k" field or
auto-generated UUID)
+ # Value: Result from agent processing (type depends on implementation)
+ <context_key>: <processed_output>
+}
+``````
+
+### Example for Local Run with Test Data
+
+``````python
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from my_module.agents import MyAgent # Replace with your actual agent path
+
+if __name__ == "__main__":
+ # 1. Initialize environment
+ env = AgentsExecutionEnvironment.get_execution_environment()
+
+ # 2. Prepare test data
+ input_data = [
+ {"key": "0001", "value": "Calculate the sum of 1 and 2."},
+ {"key": "0002", "value": "Tell me a joke about cats."}
+ ]
+
+ # 3. Create agent instance
+ agent = MyAgent()
+
+ # 4. Build pipeline
+ output_data = env.from_list(input_data) \
+ .apply(agent) \
+ .to_list()
+
+ # 5. Execute and show results
+ env.execute()
+
+ print("\nExecution Results:")
+ for record in output_data:
+ for key, value in record.items():
+ print(f"{key}: {value}")
+
+``````
## Local Run with Flink MiniCluster
-{{< hint warning >}}
-**TODO**: How to run with Flink MiniCluster locally.
+After completing the [installation of flink-agents]({{< ref
"docs/get-started/installation" >}}) and [building your agent]({{< ref
"docs/development/workflow_agent" >}}), you can test and execute your agent
locally using a **Flink MiniCluster embedded in Python**. This allows you to
simulate a real Flink streaming environment without deploying to a full
cluster. For more details about how to integrate agents with Flink's
`DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref
"docs/development/integrate_with_flink" >}}) documentation.
Review Comment:
It's not a simulation.
##########
docs/content/docs/operations/deployment.md:
##########
@@ -24,18 +24,141 @@ under the License.
## Local Run with Test Data
-{{< hint warning >}}
-**TODO**: How to run with test data with LocalExecutorEnvironment.
-{{< /hint >}}
+After completing the [installation of flink-agents]({{< ref
"docs/get-started/installation" >}}) and [building your agent]({{< ref
"docs/development/workflow_agent" >}}), you can test and execute your agent
locally using a simple Python script. This allows you to validate logic without
requiring a Flink cluster.
+
+### Key Features
+
+- **No Flink Required**: Local execution is ideal for development and testing.
+- **Test Data Simulation**: Easily inject mock inputs for validation.
+- **IDE Compatibility**: Run directly in your preferred development
environment.
+
+### Data Format
+
+#### Input Data Format
+
+The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with
the following structure:
+
+``````python
+[
+ {
+ # Optional field: Context key for multi-session management
+ # Priority order: "key" > "k" > auto-generated UUID
+ "key": "session_001", # or use shorthand "k": "session_001"
+
+ # Required field: Input content (supports text, JSON, or any
serializable type)
+ # This becomes the `input` field in InputEvent
+ "value": "Calculate the sum of 1 and 2.", # or shorthand "v": "..."
+ },
+ ...
+]
+``````
+
+#### Output Data Format
+
+The output data is a list of dictionaries (`List[Dict[str, Any]]`) where each
dictionary contains a single key-value pair representing the processed result.
The structure is generated from `OutputEvent` objects:
+
+``````python
+[
+ {key_1: output_1}, # From first OutputEvent
+ {key_2: output_2}, # From second OutputEvent
+ ...
+]
+``````
+
+Each dictionary in the output list follows this pattern:
+
+``````
+{
+ # Key: Matches the input context key (from "key"/"k" field or
auto-generated UUID)
+ # Value: Result from agent processing (type depends on implementation)
+ <context_key>: <processed_output>
+}
+``````
+
+### Example for Local Run with Test Data
Review Comment:
Ok, here comes the example. But normally, we don't look for the examples in
the another section. Maybe put the example before the input and output
sections? Otherwise, it's super confusing, like we don't even know that are the
inputs and outputs for when you start talking about their formats.
##########
docs/content/docs/operations/deployment.md:
##########
@@ -24,18 +24,141 @@ under the License.
## Local Run with Test Data
Review Comment:
Before introducing each deployment mode, I think we need a opening paragraph
to briefly introduce that there are three ways of executing the agents, and
what are the differences and how to choose between them.
##########
docs/content/docs/operations/deployment.md:
##########
@@ -24,18 +24,141 @@ under the License.
## Local Run with Test Data
-{{< hint warning >}}
-**TODO**: How to run with test data with LocalExecutorEnvironment.
-{{< /hint >}}
+After completing the [installation of flink-agents]({{< ref
"docs/get-started/installation" >}}) and [building your agent]({{< ref
"docs/development/workflow_agent" >}}), you can test and execute your agent
locally using a simple Python script. This allows you to validate logic without
requiring a Flink cluster.
+
+### Key Features
+
+- **No Flink Required**: Local execution is ideal for development and testing.
+- **Test Data Simulation**: Easily inject mock inputs for validation.
+- **IDE Compatibility**: Run directly in your preferred development
environment.
+
+### Data Format
+
+#### Input Data Format
+
+The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with
the following structure:
+
+``````python
+[
+ {
+ # Optional field: Context key for multi-session management
+ # Priority order: "key" > "k" > auto-generated UUID
+ "key": "session_001", # or use shorthand "k": "session_001"
+
+ # Required field: Input content (supports text, JSON, or any
serializable type)
+ # This becomes the `input` field in InputEvent
+ "value": "Calculate the sum of 1 and 2.", # or shorthand "v": "..."
+ },
+ ...
+]
+``````
+
+#### Output Data Format
+
+The output data is a list of dictionaries (`List[Dict[str, Any]]`) where each
dictionary contains a single key-value pair representing the processed result.
The structure is generated from `OutputEvent` objects:
+
+``````python
+[
+ {key_1: output_1}, # From first OutputEvent
+ {key_2: output_2}, # From second OutputEvent
+ ...
+]
+``````
+
+Each dictionary in the output list follows this pattern:
+
+``````
+{
+ # Key: Matches the input context key (from "key"/"k" field or
auto-generated UUID)
+ # Value: Result from agent processing (type depends on implementation)
+ <context_key>: <processed_output>
+}
+``````
+
+### Example for Local Run with Test Data
+
+``````python
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from my_module.agents import MyAgent # Replace with your actual agent path
+
+if __name__ == "__main__":
+ # 1. Initialize environment
+ env = AgentsExecutionEnvironment.get_execution_environment()
+
+ # 2. Prepare test data
+ input_data = [
+ {"key": "0001", "value": "Calculate the sum of 1 and 2."},
+ {"key": "0002", "value": "Tell me a joke about cats."}
+ ]
+
+ # 3. Create agent instance
+ agent = MyAgent()
+
+ # 4. Build pipeline
+ output_data = env.from_list(input_data) \
+ .apply(agent) \
+ .to_list()
+
+ # 5. Execute and show results
+ env.execute()
+
+ print("\nExecution Results:")
+ for record in output_data:
+ for key, value in record.items():
+ print(f"{key}: {value}")
+
+``````
## Local Run with Flink MiniCluster
-{{< hint warning >}}
-**TODO**: How to run with Flink MiniCluster locally.
+After completing the [installation of flink-agents]({{< ref
"docs/get-started/installation" >}}) and [building your agent]({{< ref
"docs/development/workflow_agent" >}}), you can test and execute your agent
locally using a **Flink MiniCluster embedded in Python**. This allows you to
simulate a real Flink streaming environment without deploying to a full
cluster. For more details about how to integrate agents with Flink's
`DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref
"docs/development/integrate_with_flink" >}}) documentation.
Review Comment:
Why only for python?
##########
docs/content/docs/operations/deployment.md:
##########
@@ -24,18 +24,141 @@ under the License.
## Local Run with Test Data
-{{< hint warning >}}
-**TODO**: How to run with test data with LocalExecutorEnvironment.
-{{< /hint >}}
+After completing the [installation of flink-agents]({{< ref
"docs/get-started/installation" >}}) and [building your agent]({{< ref
"docs/development/workflow_agent" >}}), you can test and execute your agent
locally using a simple Python script. This allows you to validate logic without
requiring a Flink cluster.
+
+### Key Features
+
+- **No Flink Required**: Local execution is ideal for development and testing.
+- **Test Data Simulation**: Easily inject mock inputs for validation.
+- **IDE Compatibility**: Run directly in your preferred development
environment.
+
+### Data Format
+
+#### Input Data Format
+
+The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with
the following structure:
+
+``````python
+[
+ {
+ # Optional field: Context key for multi-session management
+ # Priority order: "key" > "k" > auto-generated UUID
+ "key": "session_001", # or use shorthand "k": "session_001"
+
+ # Required field: Input content (supports text, JSON, or any
serializable type)
+ # This becomes the `input` field in InputEvent
+ "value": "Calculate the sum of 1 and 2.", # or shorthand "v": "..."
+ },
+ ...
+]
+``````
+
+#### Output Data Format
+
+The output data is a list of dictionaries (`List[Dict[str, Any]]`) where each
dictionary contains a single key-value pair representing the processed result.
The structure is generated from `OutputEvent` objects:
+
+``````python
+[
+ {key_1: output_1}, # From first OutputEvent
+ {key_2: output_2}, # From second OutputEvent
+ ...
+]
+``````
+
+Each dictionary in the output list follows this pattern:
+
+``````
+{
+ # Key: Matches the input context key (from "key"/"k" field or
auto-generated UUID)
+ # Value: Result from agent processing (type depends on implementation)
+ <context_key>: <processed_output>
+}
+``````
+
+### Example for Local Run with Test Data
+
+``````python
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from my_module.agents import MyAgent # Replace with your actual agent path
+
+if __name__ == "__main__":
+ # 1. Initialize environment
+ env = AgentsExecutionEnvironment.get_execution_environment()
+
+ # 2. Prepare test data
+ input_data = [
+ {"key": "0001", "value": "Calculate the sum of 1 and 2."},
+ {"key": "0002", "value": "Tell me a joke about cats."}
+ ]
+
+ # 3. Create agent instance
+ agent = MyAgent()
+
+ # 4. Build pipeline
+ output_data = env.from_list(input_data) \
+ .apply(agent) \
+ .to_list()
+
+ # 5. Execute and show results
+ env.execute()
+
+ print("\nExecution Results:")
+ for record in output_data:
+ for key, value in record.items():
+ print(f"{key}: {value}")
+
+``````
## Local Run with Flink MiniCluster
-{{< hint warning >}}
-**TODO**: How to run with Flink MiniCluster locally.
+After completing the [installation of flink-agents]({{< ref
"docs/get-started/installation" >}}) and [building your agent]({{< ref
"docs/development/workflow_agent" >}}), you can test and execute your agent
locally using a **Flink MiniCluster embedded in Python**. This allows you to
simulate a real Flink streaming environment without deploying to a full
cluster. For more details about how to integrate agents with Flink's
`DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref
"docs/development/integrate_with_flink" >}}) documentation.
+
+{{< hint info >}}
+
+If you encounter the exception "No module named 'flink_agents'" when running
with Flink MiniCluster, you can set the PYTHONPATH by adding
os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] at the beginning of
your code.
Review Comment:
Why do we have this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]