Sxnan commented on code in PR #1: URL: https://github.com/apache/flink-agents-demos/pull/1#discussion_r2944095759
########## flink-operations-agent-demo/bin/internal/consume_operations_record.sh: ########## @@ -0,0 +1,141 @@ +#!/bin/bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Consume diagnosis results from Kafka and save as markdown files +# Usage: ./consume_operations_record.sh [output_dir] + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BIN_DIR="$(dirname "$SCRIPT_DIR")" +PROJECT_ROOT="$(dirname "$BIN_DIR")" +OUTPUT_DIR="${1:-$PROJECT_ROOT/tmp/operations_record}" + +cd "$PROJECT_ROOT" + +source venv/bin/activate + +# Ensure output directory exists and remove existing files +if [ -d "$OUTPUT_DIR" ]; then + rm -rf "$OUTPUT_DIR" +fi +mkdir -p "$OUTPUT_DIR/normal" Review Comment: This is inconsistent with the README where it says > **Diagnosis Results:** Saved to `tmp/diagnosis-result/` directory, organized by health status: > - `healthy/` - Jobs with no issues > - `auto_resolved/` - Issues fixed automatically by the agent > - `requires_action/` - Issues requiring manual intervention ########## flink-operations-agent-demo/bin/stop_all.sh: ########## @@ -0,0 +1,77 @@ +#!/bin/bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Stop all services for diagnosis-agent-demo +# Usage: ./stop_all.sh + +set -e + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +PROJECT_DIR="$(dirname "$SCRIPT_DIR")" + +echo "============================================================" +echo " Stopping All Services for Diagnosis Agent Demo" +echo "============================================================" +echo "" + +# Step 1: Stop watermark collector +echo "[Step 1/3] Stopping watermark collector..." +echo "------------------------------------------------------------" +"$SCRIPT_DIR/internal/start_metric_collector.sh" --stop +if [ $? -ne 0 ]; then + echo "Warning: Failed to stop watermark collector (may not be running)" +fi +echo "" + +# Step 2: Stop auto-send job info +echo "[Step 2/3] Stopping auto-send job info..." +echo "------------------------------------------------------------" +"$SCRIPT_DIR/internal/auto_send_job_info.sh" --stop +if [ $? -ne 0 ]; then + echo "Warning: Failed to stop auto-send job info (may not be running)" +fi +echo "" + +# Step 3: Stop Flink cluster +echo "[Step 3/3] Stopping Flink cluster..." +echo "------------------------------------------------------------" +if [ -d "$PROJECT_DIR/flink-1.20.3" ]; then + echo "Stopping Flink cluster..." + "$PROJECT_DIR/flink-1.20.3/bin/stop-cluster.sh" + if [ $? -ne 0 ]; then + echo "Warning: Failed to stop Flink cluster" + fi + + echo "Stopping TaskManager..." + "$PROJECT_DIR/flink-1.20.3/bin/taskmanager.sh" stop + if [ $? -ne 0 ]; then + echo "Warning: Failed to stop TaskManager" + fi + echo "Flink cluster stopped" +else + echo "Warning: Flink directory not found ($PROJECT_DIR/flink-1.20.3)" +fi +echo "" Review Comment: The Kafka and Elasticsearch containers are not stopped. Is this expected? ########## flink-operations-agent-demo/operations-agent-job/operations_agent.py: ########## @@ -0,0 +1,404 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import logging +import os +from datetime import datetime + +from flink_agents.api.agents.agent import Agent +from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.decorators import ( + action, + chat_model_connection, + chat_model_setup, + embedding_model_connection, + embedding_model_setup, + prompt, + vector_store, +) +from flink_agents.api.events.chat_event import ChatRequestEvent, ChatResponseEvent +from flink_agents.api.events.context_retrieval_event import ContextRetrievalRequestEvent, ContextRetrievalResponseEvent +from flink_agents.api.events.event import Event, InputEvent, OutputEvent +from flink_agents.api.prompts.prompt import Prompt +from flink_agents.api.resource import ResourceDescriptor, ResourceName +from flink_agents.api.runner_context import RunnerContext + +from custom_types_and_prompts import ( + JobInfo, + ProblemDiagnosisResult, + ProblemIdentificationResult, + ProblemRemedyRequestEvent, + ProblemRemedyResult, + diagnosis_prompt, + problem_identification_prompt, + remedy_prompt, +) +from tools.operations_agent_tools import OperationsAgentTools + + +class FlinkJobOperationsAgent(Agent): + """Flink Agent for comprehensive job operations. + + This agent performs multi-dimensional analysis of Flink jobs including: + - Resource analysis + - Job analysis + - Exception analysis + """ + + @prompt + @staticmethod + def problem_identification_prompt() -> Prompt: + """System prompt for Flink job problem identification.""" + return problem_identification_prompt + + @prompt + @staticmethod + def diagnosis_prompt() -> Prompt: + """System prompt for Flink job diagnosis analysis.""" + return diagnosis_prompt + + @prompt + @staticmethod + def remedy_prompt() -> Prompt: + """System prompt for Flink job remedy.""" + return remedy_prompt + + @chat_model_connection + @staticmethod + def tongyi_connection() -> ResourceDescriptor: + """TongyiChatModel connection for Flink job problem identification.""" + api_key = os.getenv("DASHSCOPE_API_KEY") + if not api_key: + err_msg = "DASHSCOPE_API_KEY environment variable is not set. Please set it in your .env file or environment." + raise ValueError(err_msg) + return ResourceDescriptor(clazz=ResourceName.ChatModel.TONGYI_CONNECTION, api_key=api_key, request_timeout=60.0) + + @chat_model_setup + @staticmethod + def problem_identification_chat_model() -> ResourceDescriptor: + """ChatModel setup for Flink job problem identification.""" + return ResourceDescriptor( + clazz=ResourceName.ChatModel.TONGYI_SETUP, + connection="tongyi_connection", + model="qwen-flash", + prompt="problem_identification_prompt", + extract_reasoning=True, + tools=OperationsAgentTools.get_problem_identification_tool_names(), + ) + + @chat_model_setup + @staticmethod + def diagnosis_chat_model() -> ResourceDescriptor: + """ChatModel setup for Flink job diagnosis.""" + return ResourceDescriptor( + clazz=ResourceName.ChatModel.TONGYI_SETUP, + connection="tongyi_connection", + model="qwen-flash", + prompt="diagnosis_prompt", + extract_reasoning=True, + tools=OperationsAgentTools.get_diagnosis_tool_names(), + ) + + @chat_model_setup + @staticmethod + def remedy_chat_model() -> ResourceDescriptor: + """ChatModel setup for Flink job remedy.""" + return ResourceDescriptor( + clazz=ResourceName.ChatModel.TONGYI_SETUP, + connection="tongyi_connection", + model="qwen-flash", + prompt="remedy_prompt", + extract_reasoning=True, + tools=OperationsAgentTools.get_remedy_tool_names(), + ) + + @embedding_model_connection + @staticmethod + def embedding_model_connection() -> ResourceDescriptor: + """EmbeddingModelConnection responsible for ollama model service connection.""" + return ResourceDescriptor( + clazz=ResourceName.EmbeddingModel.OLLAMA_CONNECTION, + host="http://localhost:11434", + ) + + @embedding_model_setup + @staticmethod + def embedding_model() -> ResourceDescriptor: + """EmbeddingModel which focus on math, and reuse ChatModelConnection.""" + return ResourceDescriptor( + clazz=ResourceName.EmbeddingModel.OLLAMA_SETUP, + connection="embedding_model_connection", + model=os.environ.get("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text:latest"), + ) + + @vector_store + @staticmethod + def vector_store() -> ResourceDescriptor: + """Vector store setup for knowledge base.""" + return ResourceDescriptor( + clazz=ResourceName.VectorStore.JAVA_WRAPPER_COLLECTION_MANAGEABLE_VECTOR_STORE, + java_clazz=ResourceName.VectorStore.Java.ELASTICSEARCH_VECTOR_STORE, + embedding_model="embedding_model", + host=os.environ.get("ES_HOST"), Review Comment: Where do we set the environment variable "ES_HOST"? ########## flink-operations-agent-demo/README.md: ########## @@ -0,0 +1,250 @@ +# Flink Operations Agent Demo + +A demonstration project for Flink operations agent that automatically analyzes Flink job health, provides diagnostic recommendations, and can attempt to fix issues using available tools. + +## Overview + +This project showcases an intelligent Flink job diagnosis system that: +- Detects Flink job health issues (backpressure, failures, etc.) +- Provides AI-powered diagnostic analysis and recommendations +- **Automatically attempts to fix issues** using available tools (restart jobs, adjust configurations, etc.) +- Integrates with Kafka for event streaming and Elasticsearch for SOP storage + +### Architecture + +<p align="center"> + <img src="bin/internal/architecture.png" alt="Architecture Overview" width="500"/> +</p> + +The architecture has two layers: a **Platform Layer** at the top, and a **Cluster Layer** at the bottom. + +#### Cluster Layer + +At the bottom sits a standalone Flink cluster running multiple jobs. What makes it special: an **Operations Agent** is also deployed here — running as a Flink job itself. + +#### Platform Layer + +At the top live the scripts that simulate platform capabilities. This is where users interact with the system — submitting jobs, managing them. + +#### How the Two Layers Communicate + +The Agent reaches up — calling tools, accessing the platform, gathering information, executing operations. The platform reaches down — triggering operation events, sometimes manually when a user requests it, sometimes automatically as part of routine inspections. + +#### Inside the Agent: Three-Phase Pipeline + +<p align="center"> + <img src="bin/internal/inside_agent.png" alt="Architecture Overview" width="800"/> +</p> + + +When the Agent receives information about a Flink job, it runs through four phases: + +1. **Problem Identification** — The Agent examines the job using fundamental information and universal knowledge. Is there an anomaly? If so, it moves to the next phase. + +2. **SOP Retrieval** — The Agent searches the **SOP Vector Store** (Elasticsearch) for relevant experience and knowledge about this specific anomaly, using embedding similarity search to surface the most applicable procedures. + +3. **Problem Diagnosis** — Armed with the retrieved SOPs, the Agent goes deeper. Multiple rounds of tool calls. Deep investigation and analysis using **LLM Chat Models** to determine: does this problem need fixing? + +4. **Problem Remedy** — If it does, the Agent acts. Based on the diagnosis and the tools available, it either executes the fix automatically (e.g., restart jobs, adjust configurations) or surfaces what should be done. + +Finally, the system outputs a complete **operations record** and feeds it back to the platform. + +## Prerequisites + +- **Docker**: For running Kafka and Elasticsearch +- **Ollama**: For generating embeddings (install from https://ollama.ai) + - Required model: `nomic-embed-text:latest` + - Run: `ollama pull nomic-embed-text:latest` +- **Python 3.10 or 3.11**: For PyFlink and operations agent +- **Java 11+**: For building and running Flink jobs +- **Maven**: For building sample jobs +- **curl**: For downloading dependencies +- **DashScope API Key**: For AI-powered diagnosis + - Sign up at https://dashscope.aliyun.com + - Set environment variable: `export DASHSCOPE_API_KEY=your_api_key_here` + - If you prefer not to use Tongyi, you can modify the chat model section in `operations-agent-job/diagnosis_agent.py`. We didn't use Ollama by default because smaller models have poor demo performance, while larger models run slowly on personal computers. For more chat model options, see the [Flink Agents Documentation](https://nightlies.apache.org/flink/flink-agents-docs-release-0.2/docs/development/chat_models/). Review Comment: `operations-agent-job/diagnosis_agent.py` does not exist -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
