This is an automated email from the ASF dual-hosted git repository.
robocanic pushed a commit to branch ai
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git
The following commit(s) were added to refs/heads/ai by this push:
new 8c250513 [feat(ai)]: refactor AI agent for extensibility and add
streaming support (#1323)
8c250513 is described below
commit 8c250513697457d126f09ada10aa89df9c9e1ee6
Author: liwener <[email protected]>
AuthorDate: Fri Sep 19 18:00:21 2025 +0800
[feat(ai)]: refactor AI agent for extensibility and add streaming support
(#1323)
* [feat]: Integrating AI Agent Capabilities into dubbo-admin
A preliminary prototype of an agent designed to help users diagnose issues
in dubbo microservices.
* [chore(manager)]: Move function to agent package
* [chore]: Delete useless prompt
* [chore]: move config package to internal
* [chore]: remove unnecessary nil explicit initialization
* [feat]: Enable support for the Alibaba DashScope OpenAI-Compatible API
* [chore]: Abstract models of different providers into Model class
* [refactor]: abstract Agent, Flow, and Tool input/output
- Abstract the Agent interface
- Unify data schema and interfaces of Agent flows
- Unify Tool input and output data structures
- Simplify usage of Agent
* [feature]: Add simple chat history memory
* [chore]: Add support of stream output
* [chore]: Convert Chinese comments to English
* [chore]: replace genkit with fixed custom package
* [chore]: replace panic with error in String()
* [chore]: remove useless message package
* [feat]: use channel to get stream chunk and output
* [chore(ai/agent)]: move memory context initialization into agent
* [chore(ai/agent)]: fix bad channel usage in `react.Interact()`
The old implementation could cause re-closing the channel.
---
ai/.gitignore | 5 +-
ai/README.md | 0
ai/agent/agent.go | 194 ++++++++++++++
ai/agent/react/react.go | 243 +++++++++++++++++
ai/agent/react/react_test.go | 190 ++++++++++++++
ai/config/config.go | 27 ++
ai/go.mod | 36 ++-
ai/go.sum | 82 +++++-
ai/internal/agent/flow.go | 182 -------------
ai/internal/config/config.go | 25 --
ai/internal/manager/manager.go | 128 +++++----
ai/internal/memory/history.go | 44 ++++
ai/internal/memory/memory.go | 482 ++++++++++++++++++++++++++++++++++
ai/internal/schema/llm_schema.go | 102 -------
ai/internal/tools/mock_tools.go | 474 +++++++++++++++++++--------------
ai/internal/tools/tools.go | 94 +++++--
ai/main.go | 20 +-
ai/plugins/dashscope/dashscope.go | 95 +++----
ai/plugins/model/model.go | 32 +++
ai/plugins/siliconflow/siliconflow.go | 88 +++----
ai/schema/react_schema.go | 136 ++++++++++
ai/schema/schema.go | 17 ++
ai/test/flow_test.go | 107 --------
ai/test/llm_test.go | 16 +-
ai/test/test.go | 12 -
ai/utils/utils.go | 18 +-
26 files changed, 2002 insertions(+), 847 deletions(-)
diff --git a/ai/.gitignore b/ai/.gitignore
index a96daa9e..85bc0c29 100644
--- a/ai/.gitignore
+++ b/ai/.gitignore
@@ -3,4 +3,7 @@
.genkit
.DS_Store
-.env
\ No newline at end of file
+.env
+
+reference
+.qoder
\ No newline at end of file
diff --git a/ai/README.md b/ai/README.md
deleted file mode 100644
index e69de29b..00000000
diff --git a/ai/agent/agent.go b/ai/agent/agent.go
new file mode 100644
index 00000000..26183e71
--- /dev/null
+++ b/ai/agent/agent.go
@@ -0,0 +1,194 @@
+package agent
+
+import (
+ "context"
+ "dubbo-admin-ai/config"
+ "dubbo-admin-ai/internal/manager"
+ "dubbo-admin-ai/schema"
+ "fmt"
+
+ "github.com/firebase/genkit/go/core"
+)
+
+type NoStream = struct{}
+type StreamType interface {
+ NoStream | schema.StreamChunk
+}
+
+type Flow = *core.Flow[schema.Schema, schema.Schema, any]
+type NormalFlow = *core.Flow[schema.Schema, schema.Schema, NoStream]
+type StreamFlow = *core.Flow[schema.Schema, schema.Schema, schema.StreamChunk]
+
+type StreamHandler = func(*core.StreamingFlowValue[schema.Schema,
schema.StreamChunk], error) bool
+
+const (
+ ThinkFlowName string = "think"
+ StreamThinkFlowName string = "stream_think"
+ ActFlowName string = "act"
+ ReActFlowName string = "reAct"
+)
+
+type Agent interface {
+ Interact(schema.Schema) (schema.Schema, error)
+}
+
+// TODO: Use input/Output channel to support async execution
+type Stage struct {
+ flow any
+ streamHandler StreamHandler
+
+ Input schema.Schema
+ Output schema.Schema
+ StreamChan chan *schema.StreamChunk
+ OutputChan chan schema.Schema
+}
+
+func NewStage(flow any, inputSchema schema.Schema, outputSchema schema.Schema)
*Stage {
+ return &Stage{
+ flow: flow,
+ streamHandler: nil,
+ Input: inputSchema,
+ Output: outputSchema,
+ // InputChan: make(chan schema.Schema,
config.STAGE_CHANNEL_BUFFER_SIZE),
+ // OutputChan: make(chan schema.Schema,
config.STAGE_CHANNEL_BUFFER_SIZE),
+ }
+}
+
+func NewStreamStage(flow any, inputSchema schema.Schema, outputSchema
schema.Schema, onStreaming func(*Stage, schema.StreamChunk) error, onDone
func(*Stage, schema.Schema) error) (stage *Stage) {
+ if onStreaming == nil || onDone == nil {
+ panic("onStreaming, onDone and streamChan callbacks cannot be
nil for streaming stage")
+ }
+
+ stage = NewStage(flow, inputSchema, outputSchema)
+ stage.streamHandler = func(val *core.StreamingFlowValue[schema.Schema,
schema.StreamChunk], err error) bool {
+ if err != nil {
+ manager.GetLogger().Error("error", "err", err)
+ return false
+ }
+
+ if !val.Done {
+ if err := onStreaming(stage, val.Stream); err != nil {
+ manager.GetLogger().Error("Error when
streaming", "err", err)
+ return false
+ }
+ } else if val.Output != nil {
+ if err := onDone(stage, val.Output); err != nil {
+ manager.GetLogger().Error("Error when
finalizing", "err", err)
+ return false
+ }
+ }
+
+ return true
+ }
+
+ return stage
+}
+
+func (s *Stage) Execute(ctx context.Context, streamChan chan
*schema.StreamChunk, outputChan chan schema.Schema) (err error) {
+ if value, ok := s.flow.(NormalFlow); ok {
+ s.Output, err = value.Run(ctx, s.Input)
+ } else if value, ok := s.flow.(StreamFlow); ok {
+ if streamChan == nil || outputChan == nil || s.streamHandler ==
nil {
+ return fmt.Errorf("stream handler and channels cannot
be nil for stream flow")
+ }
+ s.StreamChan = streamChan
+ s.OutputChan = outputChan
+ value.Stream(ctx, s.Input)(s.streamHandler)
+ }
+
+ return err
+}
+
+type Orchestrator interface {
+ Run(context.Context, schema.Schema, chan *schema.StreamChunk, chan
schema.Schema)
+ RunStage(context.Context, string, schema.Schema, chan
*schema.StreamChunk, chan schema.Schema) (schema.Schema, error)
+}
+
+type OrderOrchestrator struct {
+ stages map[string]*Stage
+ order []string
+}
+
+func NewOrderOrchestrator(stages ...*Stage) *OrderOrchestrator {
+ stagesMap := make(map[string]*Stage, len(stages))
+ for _, stage := range stages {
+ // Get the flow name through interface method
+ var flowName string
+ if nFlow, ok := stage.flow.(NormalFlow); ok {
+ flowName = nFlow.Name()
+ } else if sFlow, ok := stage.flow.(StreamFlow); ok {
+ flowName = sFlow.Name()
+ }
+ stagesMap[flowName] = stage
+ }
+
+ order := make([]string, 2)
+ order[0] = StreamThinkFlowName
+ order[1] = ActFlowName
+
+ return &OrderOrchestrator{
+ stages: stagesMap,
+ order: order,
+ }
+}
+
+func (o *OrderOrchestrator) Run(ctx context.Context, userInput schema.Schema,
streamChan chan *schema.StreamChunk, outputChan chan schema.Schema) {
+ defer func() {
+ close(streamChan)
+ close(outputChan)
+ }()
+
+ // Use user initial input for the first round
+ if userInput == nil {
+ panic("userInput cannot be nil")
+ }
+ var input schema.Schema = userInput
+ // Iterate until reaching maximum iterations or status is Finished
+Outer:
+ for range config.MAX_REACT_ITERATIONS {
+ for _, order := range o.order {
+ // Execute current stage
+ curStage, ok := o.stages[order]
+ if !ok {
+ panic(fmt.Errorf("stage %s not found", order))
+ }
+
+ curStage.Input = input
+ if curStage.Input == nil {
+ panic(fmt.Errorf("stage %s input is nil",
order))
+ }
+
+ if err := curStage.Execute(ctx, streamChan,
outputChan); err != nil {
+ panic(fmt.Errorf("failed to execute stage %s:
%w", order, err))
+ }
+
+ // Use current stage Output as next stage input
+ if curStage.Output == nil {
+ panic(fmt.Errorf("stage %s output is nil",
order))
+ }
+ input = curStage.Output
+
+ // Check if LLM returned final answer
+ switch curStage.Output.(type) {
+ case schema.ThinkOutput:
+ out := curStage.Output.(schema.ThinkOutput)
+ if out.Status == schema.Finished &&
out.FinalAnswer != "" {
+ break Outer
+ }
+ }
+ }
+ }
+}
+
+func (o *OrderOrchestrator) RunStage(ctx context.Context, key string, input
schema.Schema, streamChan chan *schema.StreamChunk, outputChan chan
schema.Schema) (output schema.Schema, err error) {
+ stage, ok := o.stages[key]
+ if !ok {
+ return nil, fmt.Errorf("stage %s not found", key)
+ }
+
+ stage.Input = input
+ if err = stage.Execute(ctx, streamChan, outputChan); err != nil {
+ return nil, err
+ }
+ return stage.Output, nil
+}
diff --git a/ai/agent/react/react.go b/ai/agent/react/react.go
new file mode 100644
index 00000000..e5742e71
--- /dev/null
+++ b/ai/agent/react/react.go
@@ -0,0 +1,243 @@
+package react
+
+import (
+ "context"
+
+ "dubbo-admin-ai/agent"
+ "dubbo-admin-ai/config"
+ "dubbo-admin-ai/internal/manager"
+ "dubbo-admin-ai/internal/memory"
+ "dubbo-admin-ai/internal/tools"
+ "dubbo-admin-ai/schema"
+ "fmt"
+ "os"
+
+ "github.com/firebase/genkit/go/ai"
+ "github.com/firebase/genkit/go/core"
+ "github.com/firebase/genkit/go/genkit"
+)
+
+type ThinkIn = schema.ThinkInput
+type ThinkOut = schema.ThinkOutput
+type ActIn = ThinkOut
+type ActOut = schema.ToolOutputs
+
+// ReActAgent implements Agent interface
+type ReActAgent struct {
+ registry *genkit.Genkit
+ memoryCtx context.Context
+ orchestrator agent.Orchestrator
+}
+
+func Create(g *genkit.Genkit) *ReActAgent {
+ prompt := BuildThinkPrompt(g)
+ thinkStage := agent.NewStreamStage(
+ streamThink(g, prompt),
+ schema.ThinkInput{},
+ schema.ThinkOutput{},
+ func(stage *agent.Stage, chunk schema.StreamChunk) error {
+ if stage.StreamChan == nil {
+ return fmt.Errorf("streamChan is nil")
+ }
+ stage.StreamChan <- &chunk
+ return nil
+ },
+ func(stage *agent.Stage, output schema.Schema) error {
+ if stage.OutputChan == nil {
+ return fmt.Errorf("outputChan is nil")
+ }
+ stage.Output = output
+ stage.OutputChan <- output
+ return nil
+ },
+ )
+ actStage := agent.NewStage(act(g), schema.ThinkOutput{},
schema.ToolOutputs{})
+
+ orchestrator := agent.NewOrderOrchestrator(thinkStage, actStage)
+
+ return &ReActAgent{
+ registry: g,
+ orchestrator: orchestrator,
+ memoryCtx: memory.NewMemoryContext(memory.ChatHistoryKey),
+ }
+}
+
+func (ra *ReActAgent) Interact(input schema.Schema) (chan *schema.StreamChunk,
chan schema.Schema, error) {
+ streamChan := make(chan *schema.StreamChunk,
config.STAGE_CHANNEL_BUFFER_SIZE)
+ outputChan := make(chan schema.Schema, config.STAGE_CHANNEL_BUFFER_SIZE)
+ go func() {
+ ra.orchestrator.Run(ra.memoryCtx, input, streamChan, outputChan)
+ }()
+ return streamChan, outputChan, nil
+}
+
+func BuildThinkPrompt(registry *genkit.Genkit) ai.Prompt {
+ // Load system prompt from filesystem
+ data, err := os.ReadFile(config.PROMPT_DIR_PATH + "/agentSystem.prompt")
+ if err != nil {
+ panic(fmt.Errorf("failed to read agentSystem prompt: %w", err))
+ }
+ systemPromptText := string(data)
+
+ // Build and Register ReAct think prompt
+ mockToolManager := tools.NewMockToolManager(registry)
+ toolRefs, err := mockToolManager.AllToolRefs()
+ if err != nil {
+ panic(fmt.Errorf("failed to get mock mock_tools: %v", err))
+ }
+ return genkit.DefinePrompt(registry, "agentThinking",
+ ai.WithSystem(systemPromptText),
+ ai.WithInputType(ThinkIn{}),
+ ai.WithOutputType(ThinkOut{}),
+ ai.WithPrompt(schema.UserThinkPromptTemplate),
+ ai.WithTools(toolRefs...),
+ ai.WithReturnToolRequests(true),
+ )
+}
+
+func rawChunkHandler(cb core.StreamCallback[schema.StreamChunk])
ai.ModelStreamCallback {
+ return func(ctx context.Context, chunk *ai.ModelResponseChunk) error {
+ if cb != nil {
+ return cb(ctx, schema.StreamChunk{
+ Stage: "think",
+ Chunk: chunk,
+ })
+ }
+ return nil
+ }
+}
+
+func streamThink(g *genkit.Genkit, prompt ai.Prompt) agent.StreamFlow {
+ return genkit.DefineStreamingFlow(g, agent.StreamThinkFlowName,
+ func(ctx context.Context, in schema.Schema, cb
core.StreamCallback[schema.StreamChunk]) (out schema.Schema, err error) {
+ manager.GetLogger().Info("Thinking...", "input", in)
+ defer func() {
+ manager.GetLogger().Info("Think Done.",
"output", out)
+ }()
+
+ history, ok :=
ctx.Value(memory.ChatHistoryKey).(*memory.History)
+ if !ok {
+ panic(fmt.Errorf("failed to get history from
context"))
+ }
+
+ var resp *ai.ModelResponse
+ // ai.WithStreaming() receives ai.ModelStreamCallback
type callback function
+ // This callback function is called when the model
generates each raw streaming chunk, used for raw chunk processing
+
+ // The passed cb is user-defined callback function for
handling streaming data logic, such as printing
+ if !history.IsEmpty() {
+ resp, err = prompt.Execute(ctx,
+ ai.WithInput(in),
+
ai.WithMessages(history.AllHistory()...),
+ ai.WithStreaming(rawChunkHandler(cb)),
+ )
+ } else {
+ resp, err = prompt.Execute(ctx,
+ ai.WithInput(in),
+ ai.WithStreaming(rawChunkHandler(cb)),
+ )
+ }
+ if err != nil {
+ return out, fmt.Errorf("failed to execute
agentThink prompt: %w", err)
+ }
+
+ // Parse output
+ var response ThinkOut
+ err = resp.Output(&response)
+
+ if err != nil {
+ return out, fmt.Errorf("failed to parse
agentThink prompt response: %w", err)
+ }
+
+ history.AddHistory(resp.History()...)
+ return response, nil
+ })
+}
+
+func think(g *genkit.Genkit, prompt ai.Prompt) agent.NormalFlow {
+ return genkit.DefineFlow(g, agent.ThinkFlowName,
+ func(ctx context.Context, in schema.Schema) (out schema.Schema,
err error) {
+ manager.GetLogger().Info("Thinking...", "input", in)
+ defer func() {
+ manager.GetLogger().Info("Think Done.",
"output", out)
+ }()
+
+ history, ok :=
ctx.Value(memory.ChatHistoryKey).(*memory.History)
+ if !ok {
+ panic(fmt.Errorf("failed to get history from
context"))
+ }
+
+ var resp *ai.ModelResponse
+ if !history.IsEmpty() {
+ resp, err = prompt.Execute(ctx,
+ ai.WithInput(in),
+
ai.WithMessages(history.AllHistory()...),
+ )
+ } else {
+ resp, err = prompt.Execute(ctx,
+ ai.WithInput(in),
+ )
+ }
+
+ if err != nil {
+ return out, fmt.Errorf("failed to execute
agentThink prompt: %w", err)
+ }
+
+ // Parse output
+ var response ThinkOut
+ err = resp.Output(&response)
+
+ if err != nil {
+ return out, fmt.Errorf("failed to parse
agentThink prompt response: %w", err)
+ }
+
+ history.AddHistory(resp.History()...)
+ return response, nil
+ })
+}
+
+// act: Core logic of the executor
+func act(g *genkit.Genkit) agent.NormalFlow {
+ return genkit.DefineFlow(g, agent.ActFlowName,
+ func(ctx context.Context, in schema.Schema) (out schema.Schema,
err error) {
+ manager.GetLogger().Info("Acting...", "input", in)
+ defer func() {
+ manager.GetLogger().Info("Act Done.", "output",
out)
+ }()
+
+ input, ok := in.(ActIn)
+ if !ok {
+ return nil, fmt.Errorf("input is not of type
ActIn, got %T", in)
+ }
+
+ var actOuts ActOut
+
+ // Execute tool calls
+ history, ok :=
ctx.Value(memory.ChatHistoryKey).(*memory.History)
+ if !ok {
+ panic(fmt.Errorf("failed to get history from
context"))
+ }
+
+ var parts []*ai.Part
+ for _, req := range input.ToolRequests {
+
+ output, err := req.Call(g, ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to call
tool %s: %w", req.ToolName, err)
+ }
+
+ parts = append(parts,
+ ai.NewToolResponsePart(&ai.ToolResponse{
+ Name: req.ToolName,
+ Ref: req.ToolName,
+ Output: output.Map(),
+ }))
+
+ actOuts.Add(&output)
+ }
+
+ history.AddHistory(ai.NewMessage(ai.RoleTool, nil,
parts...))
+
+ return actOuts, nil
+ })
+}
diff --git a/ai/agent/react/react_test.go b/ai/agent/react/react_test.go
new file mode 100644
index 00000000..b8662ce8
--- /dev/null
+++ b/ai/agent/react/react_test.go
@@ -0,0 +1,190 @@
+package react
+
+import (
+ "dubbo-admin-ai/agent"
+ "dubbo-admin-ai/config"
+ "dubbo-admin-ai/internal/manager"
+ "dubbo-admin-ai/internal/memory"
+ "dubbo-admin-ai/internal/tools"
+ "dubbo-admin-ai/plugins/dashscope"
+ "dubbo-admin-ai/schema"
+ "encoding/json"
+ "fmt"
+ "testing"
+
+ "github.com/firebase/genkit/go/core"
+)
+
+var (
+ reActAgent *ReActAgent
+ chatHistoryCtx = memory.NewMemoryContext(memory.ChatHistoryKey)
+)
+
+func init() {
+ manager.Init(dashscope.Qwen3.Key(), manager.PrettyLogger())
+ reActAgent = Create(manager.GetRegistry())
+}
+
+func TestThinking(t *testing.T) {
+ agentInput := ThinkIn{
+ Content: "我的微服务 order-service 运行缓慢,请帮助我诊断原因",
+ }
+
+ streamChan := make(chan *schema.StreamChunk,
config.STAGE_CHANNEL_BUFFER_SIZE)
+ outputChan := make(chan schema.Schema, config.STAGE_CHANNEL_BUFFER_SIZE)
+ defer func() {
+ close(streamChan)
+ close(outputChan)
+ }()
+ resp, err := reActAgent.orchestrator.RunStage(chatHistoryCtx,
agent.StreamThinkFlowName, agentInput, streamChan, outputChan)
+ if err != nil {
+ t.Fatalf("failed to run thinking flow: %v", err)
+ }
+
+ fmt.Println(resp)
+}
+
+func TestThinkWithToolReq(t *testing.T) {
+ input := ActOut{
+ Outputs: []tools.ToolOutput{
+ {
+ ToolName: "prometheus_query_service_latency",
+ Summary: "服务 order-service 在过去10分钟内的 P95 延迟为
3500ms",
+ Result: map[string]any{
+ "quantile": 0.95,
+ "value_millis": 3500,
+ },
+ },
+ {
+ ToolName: "prometheus_query_service_traffic",
+ Summary: "服务 order-service 的 QPS 为 250.0, 错误率为
5.2%",
+ Result: map[string]any{
+ "error_rate_percentage": 5.2,
+ "request_rate_qps": 250,
+ },
+ },
+ },
+ }
+
+ streamChan := make(chan *schema.StreamChunk,
config.STAGE_CHANNEL_BUFFER_SIZE)
+ outputChan := make(chan schema.Schema, config.STAGE_CHANNEL_BUFFER_SIZE)
+ defer func() {
+ close(streamChan)
+ close(outputChan)
+ }()
+ resp, err := reActAgent.orchestrator.RunStage(chatHistoryCtx,
agent.StreamThinkFlowName, input, streamChan, outputChan)
+
+ if err != nil {
+ t.Fatalf("failed to run thinking flow: %v", err)
+ }
+
+ fmt.Println(resp)
+}
+
+func TestAct(t *testing.T) {
+ actInJson := `{
+ "tool_requests": [
+ {
+ "tool_name": "prometheus_query_service_latency",
+ "parameter": {
+ "service_name": "order-service",
+ "time_range_minutes": 15,
+ "quantile": 0.95
+ }
+ },
+ {
+ "tool_name": "prometheus_query_service_traffic",
+ "parameter": {
+ "service_name": "order-service",
+ "time_range_minutes": 15
+ }
+ },
+ {
+ "tool_name": "trace_dependency_view",
+ "parameter": {
+ "service_name": "order-service"
+ }
+ },
+ {
+ "tool_name": "dubbo_service_status",
+ "parameter": {
+ "service_name": "order-service"
+ }
+ }
+ ],
+ "status": "CONTINUED",
+ "thought": "开始对 order-service
运行缓慢的问题进行系统性诊断。首先需要了解当前服务的整体性能表现,包括延迟、流量和错误率等关键指标。同时,获取服务的实例状态和拓扑依赖关系,以判断是否存在明显的异常。由于多个数据源可独立查询,为提高效率,将并行执行多个工具调用:查询延迟指标、服务流量、服务依赖关系和服务实例状态。"
+}`
+ actIn := ActIn{}
+ if err := json.Unmarshal([]byte(actInJson), &actIn); err != nil {
+ t.Fatalf("failed to unmarshal actInJson: %v", err)
+ }
+
+ actOuts, err := reActAgent.orchestrator.RunStage(chatHistoryCtx,
agent.ActFlowName, actIn, nil, nil)
+ resp := actOuts
+ if err != nil {
+ t.Fatalf("failed to run act flow: %v", err)
+ }
+ if resp == nil {
+ t.Fatal("expected non-nil response")
+ }
+
+}
+
+func TestAgent(t *testing.T) {
+ agentInput := ThinkIn{
+ Content: "我的微服务 order-service 运行缓慢,请帮助我诊断原因",
+ }
+
+ streamChan, outputChan, err := reActAgent.Interact(agentInput)
+ if err != nil {
+ t.Fatalf("failed to run thinking flow: %v", err)
+ }
+
+ for {
+ select {
+ case chunk, ok := <-streamChan:
+ if !ok {
+ streamChan = nil
+ continue
+ }
+ fmt.Print(chunk.Chunk.Text())
+
+ case finalOutput, ok := <-outputChan:
+ if !ok {
+ outputChan = nil
+ continue
+ }
+ fmt.Printf("Final output: %+v\n", finalOutput)
+ default:
+ if streamChan == nil && outputChan == nil {
+ return
+ }
+ }
+ }
+}
+
+func TestStreamThink(t *testing.T) {
+ manager.Init(dashscope.Qwen3.Key(), manager.PrettyLogger())
+ prompt := BuildThinkPrompt(manager.GetRegistry())
+ chatHistoryCtx := memory.NewMemoryContext(memory.ChatHistoryKey)
+
+ agentInput := ThinkIn{
+ Content: "我的微服务 order-service 运行缓慢,请帮助我诊断原因",
+ }
+
+ stream := streamThink(manager.GetRegistry(),
prompt).Stream(chatHistoryCtx, agentInput)
+ stream(func(val *core.StreamingFlowValue[schema.Schema,
schema.StreamChunk], err error) bool {
+ if err != nil {
+ fmt.Printf("Stream error: %v\n", err)
+ return false
+ }
+ if !val.Done {
+ fmt.Print(val.Stream.Chunk.Text())
+ } else if val.Output != nil {
+ fmt.Printf("Stream final output: %+v\n", val.Output)
+ }
+
+ return true
+ })
+}
diff --git a/ai/config/config.go b/ai/config/config.go
new file mode 100644
index 00000000..2f001c8b
--- /dev/null
+++ b/ai/config/config.go
@@ -0,0 +1,27 @@
+package config
+
+import (
+ "dubbo-admin-ai/plugins/dashscope"
+ "dubbo-admin-ai/plugins/model"
+ "os"
+ "path/filepath"
+ "runtime"
+)
+
+var (
+ // API keys
+ GEMINI_API_KEY string = os.Getenv("GEMINI_API_KEY")
+ SILICONFLOW_API_KEY string = os.Getenv("SILICONFLOW_API_KEY")
+ DASHSCOPE_API_KEY string = os.Getenv("DASHSCOPE_API_KEY")
+
+ // Configuration
+ // Automatically get project root directory
+ _, b, _, _ = runtime.Caller(0)
+ PROJECT_ROOT = filepath.Join(filepath.Dir(b), "..")
+
+ PROMPT_DIR_PATH string = filepath.Join(PROJECT_ROOT,
"prompts")
+ LOG_LEVEL string = os.Getenv("LOG_LEVEL")
+ DEFAULT_MODEL model.Model = dashscope.Qwen3
+ MAX_REACT_ITERATIONS int = 5
+ STAGE_CHANNEL_BUFFER_SIZE int = 5
+)
diff --git a/ai/go.mod b/ai/go.mod
index f0fa8b8d..74d16133 100644
--- a/ai/go.mod
+++ b/ai/go.mod
@@ -4,18 +4,42 @@ go 1.24.1
toolchain go1.24.5
+replace github.com/firebase/genkit/go => github.com/stringl1l1l1l/genkit/go
v0.0.0-20250915045204-bf3e8da21e69
+
require (
- github.com/firebase/genkit/go v0.6.2
+ github.com/firebase/genkit/go v1.0.2
+ github.com/gin-gonic/gin v1.10.1
github.com/joho/godotenv v1.5.1
github.com/lmittmann/tint v1.1.2
- github.com/openai/openai-go v0.1.0-alpha.65
+ github.com/mitchellh/mapstructure v1.5.0
+ github.com/openai/openai-go v1.8.2
)
require (
+ github.com/bytedance/sonic v1.11.6 // indirect
+ github.com/bytedance/sonic/loader v0.1.1 // indirect
+ github.com/cloudwego/base64x v0.1.4 // indirect
+ github.com/cloudwego/iasm v0.2.0 // indirect
+ github.com/gabriel-vasile/mimetype v1.4.3 // indirect
+ github.com/gin-contrib/sse v0.1.0 // indirect
+ github.com/go-playground/locales v0.14.1 // indirect
+ github.com/go-playground/universal-translator v0.18.1 // indirect
+ github.com/go-playground/validator/v10 v10.20.0 // indirect
+ github.com/goccy/go-json v0.10.2 // indirect
+ github.com/json-iterator/go v1.1.12 // indirect
+ github.com/klauspost/cpuid/v2 v2.2.7 // indirect
+ github.com/leodido/go-urn v1.4.0 // indirect
+ github.com/mattn/go-isatty v0.0.20 // indirect
+ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd //
indirect
+ github.com/modern-go/reflect2 v1.0.2 // indirect
+ github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
+ github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
+ github.com/ugorji/go/codec v1.2.12 // indirect
+ golang.org/x/arch v0.8.0 // indirect
)
require (
@@ -24,6 +48,7 @@ require (
cloud.google.com/go/compute/metadata v0.7.0 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
+ github.com/dusted-go/logging v1.3.0
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
@@ -31,17 +56,18 @@ require (
github.com/google/dotprompt/go v0.0.0-20250611200215-bb73406b05ca //
indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/s2a-go v0.1.9 // indirect
- github.com/google/uuid v1.6.0 // indirect
+ github.com/google/uuid v1.6.0
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.14.2 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
- github.com/invopop/jsonschema v0.13.0 // indirect
+ github.com/invopop/jsonschema v0.13.0
github.com/mailru/easyjson v0.9.0 // indirect
github.com/mbleigh/raymond v0.0.0-20250414171441-6b3a58ab9e0a //
indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb //
indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415
// indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
+ github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0
// indirect
go.opentelemetry.io/otel v1.36.0 // indirect
@@ -52,7 +78,7 @@ require (
golang.org/x/net v0.43.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.28.0 // indirect
- google.golang.org/genai v1.11.1 // indirect
+ google.golang.org/genai v1.24.0 // indirect
google.golang.org/genproto/googleapis/rpc
v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/grpc v1.73.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
diff --git a/ai/go.sum b/ai/go.sum
index ab1e3298..555d79e7 100644
--- a/ai/go.sum
+++ b/ai/go.sum
@@ -8,18 +8,43 @@ github.com/bahlo/generic-list-go v0.2.0
h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPn
github.com/bahlo/generic-list-go v0.2.0/go.mod
h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/buger/jsonparser v1.1.1
h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod
h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
+github.com/bytedance/sonic v1.11.6
h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0=
+github.com/bytedance/sonic v1.11.6/go.mod
h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
+github.com/bytedance/sonic/loader v0.1.1
h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
+github.com/bytedance/sonic/loader v0.1.1/go.mod
h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
+github.com/cloudwego/base64x v0.1.4
h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
+github.com/cloudwego/base64x v0.1.4/go.mod
h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
+github.com/cloudwego/iasm v0.2.0
h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
+github.com/cloudwego/iasm v0.2.0/go.mod
h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dusted-go/logging v1.3.0
h1:SL/EH1Rp27oJQIte+LjWvWACSnYDTqNx5gZULin0XRY=
+github.com/dusted-go/logging v1.3.0/go.mod
h1:s58+s64zE5fxSWWZfp+b8ZV0CHyKHjamITGyuY1wzGg=
github.com/felixge/httpsnoop v1.0.4
h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod
h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
-github.com/firebase/genkit/go v0.6.2
h1:FaVJtcprfXZz0gXTtARJqUiovu/R2wuJycNn/18aNMc=
-github.com/firebase/genkit/go v0.6.2/go.mod
h1:blRYK6oNgwBDX6F+gInACru6q527itviv+xruiMSUuU=
+github.com/gabriel-vasile/mimetype v1.4.3
h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
+github.com/gabriel-vasile/mimetype v1.4.3/go.mod
h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
+github.com/gin-contrib/sse v0.1.0
h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
+github.com/gin-contrib/sse v0.1.0/go.mod
h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
+github.com/gin-gonic/gin v1.10.1
h1:T0ujvqyCSqRopADpgPgiTT63DUQVSfojyME59Ei63pQ=
+github.com/gin-gonic/gin v1.10.1/go.mod
h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y=
github.com/go-logr/logr v1.2.2/go.mod
h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod
h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod
h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/go-playground/assert/v2 v2.2.0
h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
+github.com/go-playground/assert/v2 v2.2.0/go.mod
h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
+github.com/go-playground/locales v0.14.1
h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
+github.com/go-playground/locales v0.14.1/go.mod
h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
+github.com/go-playground/universal-translator v0.18.1
h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
+github.com/go-playground/universal-translator v0.18.1/go.mod
h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
+github.com/go-playground/validator/v10 v10.20.0
h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
+github.com/go-playground/validator/v10 v10.20.0/go.mod
h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
+github.com/goccy/go-json v0.10.2
h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
+github.com/goccy/go-json v0.10.2/go.mod
h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/goccy/go-yaml v1.17.1
h1:LI34wktB2xEE3ONG/2Ar54+/HJVBriAGJ55PHls4YuY=
github.com/goccy/go-yaml v1.17.1/go.mod
h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/golang/protobuf v1.5.4
h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
@@ -28,6 +53,7 @@ github.com/google/dotprompt/go
v0.0.0-20250611200215-bb73406b05ca h1:LuQ8KS5N04c
github.com/google/dotprompt/go v0.0.0-20250611200215-bb73406b05ca/go.mod
h1:dnIk+MSMnipm9uZyPIgptq7I39aDxyjBiaev/OG0W0Y=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod
h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/google/gofuzz v1.0.0/go.mod
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=
github.com/google/s2a-go v0.1.9/go.mod
h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
@@ -42,27 +68,57 @@ github.com/invopop/jsonschema v0.13.0
h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcI
github.com/invopop/jsonschema v0.13.0/go.mod
h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod
h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
+github.com/json-iterator/go v1.1.12
h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod
h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
+github.com/klauspost/cpuid/v2 v2.0.9/go.mod
h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
+github.com/klauspost/cpuid/v2 v2.2.7
h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
+github.com/klauspost/cpuid/v2 v2.2.7/go.mod
h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
+github.com/knz/go-libedit v1.10.1/go.mod
h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod
h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod
h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/leodido/go-urn v1.4.0
h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
+github.com/leodido/go-urn v1.4.0/go.mod
h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/lmittmann/tint v1.1.2
h1:2CQzrL6rslrsyjqLDwD11bZ5OpLBPU+g3G/r5LSfS8w=
github.com/lmittmann/tint v1.1.2/go.mod
h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/mailru/easyjson v0.9.0
h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4=
github.com/mailru/easyjson v0.9.0/go.mod
h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU=
+github.com/mattn/go-isatty v0.0.20
h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
+github.com/mattn/go-isatty v0.0.20/go.mod
h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mbleigh/raymond v0.0.0-20250414171441-6b3a58ab9e0a
h1:v2cBA3xWKv2cIOVhnzX/gNgkNXqiHfUgJtA3r61Hf7A=
github.com/mbleigh/raymond v0.0.0-20250414171441-6b3a58ab9e0a/go.mod
h1:Y6ghKH+ZijXn5d9E7qGGZBmjitx7iitZdQiIW97EpTU=
-github.com/openai/openai-go v0.1.0-alpha.65
h1:G12sA6OaL+cVMElMO3m5RVFwKhhg40kmGeGhaYZIoYw=
-github.com/openai/openai-go v0.1.0-alpha.65/go.mod
h1:3SdE6BffOX9HPEQv8IL/fi3LYZ5TUpRYaqGQZbyk11A=
+github.com/mitchellh/mapstructure v1.5.0
h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
+github.com/mitchellh/mapstructure v1.5.0/go.mod
h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.2
h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod
h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/openai/openai-go v1.8.2
h1:UqSkJ1vCOPUpz9Ka5tS0324EJFEuOvMc+lA/EarJWP8=
+github.com/openai/openai-go v1.8.2/go.mod
h1:g461MYGXEXBVdV5SaR/5tNzNbSfwTBBefwc+LlDCK0Y=
+github.com/pelletier/go-toml/v2 v2.2.2
h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
+github.com/pelletier/go-toml/v2 v2.2.2/go.mod
h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2
h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.13.1
h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod
h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0/go.mod
h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
+github.com/stretchr/objx v0.5.2/go.mod
h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.7.0/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.1/go.mod
h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.8.4/go.mod
h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
+github.com/stretchr/testify v1.9.0/go.mod
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/stringl1l1l1l/genkit/go v0.0.0-20250915045204-bf3e8da21e69
h1:+wzggJwSECW2wpQFpK+WIZYxQmTWKg1M869d4i6QrZg=
+github.com/stringl1l1l1l/genkit/go v0.0.0-20250915045204-bf3e8da21e69/go.mod
h1:GabAxvHNs9ZSvmaK5bfZe2NkTsGP544/baVFegXq4aU=
github.com/tidwall/gjson v1.14.2/go.mod
h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.18.0
h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod
h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
@@ -73,6 +129,10 @@ github.com/tidwall/pretty v1.2.1
h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod
h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod
h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
+github.com/twitchyliquid64/golang-asm v0.15.1
h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
+github.com/twitchyliquid64/golang-asm v0.15.1/go.mod
h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
+github.com/ugorji/go/codec v1.2.12
h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
+github.com/ugorji/go/codec v1.2.12/go.mod
h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/wk8/go-ordered-map/v2 v2.1.8
h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod
h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod
h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
@@ -82,6 +142,8 @@ github.com/xeipuuv/gojsonreference
v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod
h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0
h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod
h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
+github.com/yosida95/uritemplate/v3 v3.0.2
h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
+github.com/yosida95/uritemplate/v3 v3.0.2/go.mod
h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4=
go.opentelemetry.io/auto/sdk v1.1.0
h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod
h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0
h1:F7Jx+6hwnZ41NSFTO5q4LYDtJRXBf2PD0rNBkeB/lus=
@@ -98,18 +160,23 @@ go.opentelemetry.io/otel/trace v1.36.0
h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKr
go.opentelemetry.io/otel/trace v1.36.0/go.mod
h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod
h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
+golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod
h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
+golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc=
+golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
golang.org/x/crypto v0.41.0/go.mod
h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod
h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod
h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
-google.golang.org/genai v1.11.1 h1:MgI2JVDaIQ1YMuzKFwgPciB+K6kQ8MCBMVL9u7Oa8qw=
-google.golang.org/genai v1.11.1/go.mod
h1:HFXR1zT3LCdLxd/NW6IOSCczOYyRAxwaShvYbgPSeVw=
+google.golang.org/genai v1.24.0 h1:j5lt+Qr7W0+OBxwwEPe4DQ+ygEqpvZuSBvYoHIuUjhg=
+google.golang.org/genai v1.24.0/go.mod
h1:QPj5NGJw+3wEOHg+PrsWwJKvG6UC84ex5FR7qAYsN/M=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822
h1:fc6jSaCT0vBduLYZHYrBBNY4dsWuvgyff9noRNDdBeE=
google.golang.org/genproto/googleapis/rpc
v0.0.0-20250603155806-513f23925822/go.mod
h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok=
@@ -119,5 +186,8 @@ google.golang.org/protobuf v1.36.6/go.mod
h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod
h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+nullprogram.com/x/optparse v1.0.0/go.mod
h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
+rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
diff --git a/ai/internal/agent/flow.go b/ai/internal/agent/flow.go
deleted file mode 100644
index e39e359e..00000000
--- a/ai/internal/agent/flow.go
+++ /dev/null
@@ -1,182 +0,0 @@
-package agent
-
-import (
- "context"
-
- "dubbo-admin-ai/internal/config"
- "dubbo-admin-ai/internal/manager"
- "dubbo-admin-ai/internal/schema"
- "dubbo-admin-ai/internal/tools"
- "errors"
- "fmt"
- "log"
- "os"
-
- "github.com/firebase/genkit/go/core/logger"
-
- "github.com/firebase/genkit/go/ai"
- "github.com/firebase/genkit/go/core"
- "github.com/firebase/genkit/go/genkit"
-)
-
-// 公开的 Flow 变量,以便在编排器中调用
-var (
- ReActFlow *core.Flow[schema.ReActIn, schema.ReActOut, struct{}]
- ThinkingFlow *core.Flow[schema.ThinkIn, *schema.ThinkOut, struct{}]
- ActFlow *core.Flow[*schema.ActIn, schema.ActOut, struct{}]
-
- ThinkPrompt *ai.Prompt
-)
-
-var g *genkit.Genkit
-
-// The order of initialization cannot change
-func InitAgent(defaultModel string) (err error) {
- if err = manager.LoadEnvVars(); err != nil {
- return err
- }
-
- manager.InitLogger()
- if err := manager.InitGlobalGenkit(defaultModel); err != nil {
- return err
- }
-
- if g, err = manager.GetGlobalGenkit(); err != nil {
- return err
- }
-
- tools.RegisterAllMockTools(g)
-
- if err = InitFlows(g); err != nil {
- return err
- }
-
- return nil
-}
-
-func InitFlows(registry *genkit.Genkit) error {
- if registry == nil {
- return fmt.Errorf("registry is nil")
- }
- g = registry
-
- data, err := os.ReadFile(config.PROMPT_DIR_PATH + "/agentSystem.prompt")
- if err != nil {
- return fmt.Errorf("failed to read agentSystem prompt: %w", err)
- }
- systemPromptText := string(data)
-
- mockTools, err := tools.AllMockToolRef()
- if err != nil {
- log.Fatalf("failed to get mock mock_tools: %v", err)
- }
- ThinkPrompt, err = genkit.DefinePrompt(g, "agentThinking",
- ai.WithSystem(systemPromptText),
- ai.WithInputType(schema.ThinkIn{}),
- ai.WithOutputType(schema.ThinkOut{}),
- ai.WithPrompt("{{userInput}}"),
- ai.WithTools(mockTools...),
- )
-
- if err != nil {
- return fmt.Errorf("failed to define agentThink prompt: %w", err)
- }
-
- ReActFlow = genkit.DefineFlow(g, "reAct", reAct)
- ThinkingFlow = genkit.DefineFlow(g, "thinking", thinking)
- ActFlow = genkit.DefineFlow(g, "act", act)
-
- return nil
-}
-
-// Flow 的核心函数实现 `fn` (不对外导出)
-// ----------------------------------------------------------------------------
-// 1. agentOrchestrator: 总指挥/编排器的核心逻辑
-// ----------------------------------------------------------------------------
-func reAct(ctx context.Context, reActInput schema.ReActIn) (reActOut
schema.ReActOut, err error) {
- //TODO: 输入数据意图解析
-
- thinkingInput := reActInput
- // 主协调循环 (Reconciliation Loop)
- for i := 0; i < config.MAX_REACT_ITERATIONS; i++ {
-
- // a. 调用 thinkingFlow
- thinkingResp, err := ThinkingFlow.Run(ctx, thinkingInput)
- if err != nil {
- manager.GetLogger().Error("failed to run thinking
flow", "error", err)
- return "", err
- }
- if thinkingResp == nil {
- manager.GetLogger().Error("expected non-nil response")
- return "", errors.New("expected non-nil response")
- }
-
- for _, r := range thinkingResp.ToolRequests {
- manager.GetLogger().Info(r.String())
- }
-
- reActOut = thinkingResp.String()
-
- // b. 检查是否有最终答案
- if thinkingResp.IsStop() {
- break
- }
-
- // c. 调用 actFlow
- actOutArray, err := ActFlow.Run(ctx, thinkingResp)
- if err != nil {
- logger.FromContext(ctx).Error("failed to run act flow",
"error", err)
- return "", err
- }
-
- // TODO: 这里将来会集成SessionMemory,暂时简化处理
- _ = actOutArray // 占位符,避免未使用变量错误
- }
-
- return reActOut, nil
-}
-
-// ----------------------------------------------------------------------------
-// 2. thinking: 大脑/思考者的核心逻辑
-// ----------------------------------------------------------------------------
-func thinking(ctx context.Context, input schema.ThinkIn) (*schema.ThinkOut,
error) {
- resp, err := ThinkPrompt.Execute(ctx,
- ai.WithInput(input),
- )
-
- if err != nil {
- return nil, fmt.Errorf("failed to execute agentThink prompt:
%w", err)
- }
-
- // 解析输出
- var response *schema.ThinkOut
- err = resp.Output(&response)
- if err != nil {
- return nil, fmt.Errorf("failed to parse agentThink prompt
response: %w", err)
- }
-
- return response, nil
-}
-
-// ----------------------------------------------------------------------------
-// 3. act: 执行者的核心逻辑
-// TODO: Genkit 的 Tool 设计不能保证任意 LLM 一定能执行工具调用,是否考虑设计成所有模型都能执行?
-// 一种思路是让 LLM 直接生成工具调用的结构化描述,然后由适配器执行。
-// 管线、流水线的思想能否用在这里?
-func act(ctx context.Context, actIn *schema.ActIn) (schema.ActOut, error) {
- var actOutArray schema.ActOut
- // 执行工具调用
- for _, resp := range actIn.ToolRequests {
- if err := resp.ValidateToolDesc(); err != nil {
- return nil, fmt.Errorf("invalid tool description in
action response: %w", err)
- }
-
- output, err := resp.ToolDesc.Call(g, ctx)
- if err != nil {
- return nil, fmt.Errorf("failed to call tool %s: %w",
resp.ToolDesc.ToolName, err)
- }
- actOutArray = append(actOutArray, output)
- }
-
- return actOutArray, nil
-}
diff --git a/ai/internal/config/config.go b/ai/internal/config/config.go
deleted file mode 100644
index b071b846..00000000
--- a/ai/internal/config/config.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package config
-
-import (
- "dubbo-admin-ai/plugins/siliconflow"
- "os"
- "path/filepath"
- "runtime"
-)
-
-var (
- // API keys
- GEMINI_API_KEY string = os.Getenv("GEMINI_API_KEY")
- SILICONFLOW_API_KEY string = os.Getenv("SILICONFLOW_API_KEY")
- DASHSCOPE_API_KEY string = os.Getenv("DASHSCOPE_API_KEY")
-
- // Configuration
- // 自动获取项目根目录
- _, b, _, _ = runtime.Caller(0)
- PROJECT_ROOT = filepath.Join(filepath.Dir(b), "..", "..")
-
- PROMPT_DIR_PATH string = filepath.Join(PROJECT_ROOT, "prompts")
- LOG_LEVEL string = os.Getenv("LOG_LEVEL")
- DEFAULT_MODEL string = siliconflow.DeepSeekV3
- MAX_REACT_ITERATIONS int = 5
-)
diff --git a/ai/internal/manager/manager.go b/ai/internal/manager/manager.go
index 2c0e8d17..c276c406 100644
--- a/ai/internal/manager/manager.go
+++ b/ai/internal/manager/manager.go
@@ -2,20 +2,19 @@ package manager
import (
"context"
-
- "dubbo-admin-ai/internal/config"
+ "dubbo-admin-ai/config"
+ "sync"
"dubbo-admin-ai/plugins/dashscope"
"dubbo-admin-ai/plugins/siliconflow"
"dubbo-admin-ai/utils"
- "fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"time"
- "github.com/firebase/genkit/go/core/logger"
+ "github.com/dusted-go/logging/prettylog"
"github.com/firebase/genkit/go/genkit"
"github.com/firebase/genkit/go/plugins/googlegenai"
"github.com/joho/godotenv"
@@ -23,21 +22,50 @@ import (
)
var (
- globalGenkit *genkit.Genkit
- rootContext *context.Context
- globalLogger *slog.Logger
+ registry *genkit.Genkit
+ gloLogger *slog.Logger
+ once sync.Once
)
-func InitGlobalGenkit(defaultModel string) (err error) {
- ctx := context.Background()
- if rootContext == nil {
- rootContext = &ctx
+func Init(modelName string, logger *slog.Logger) {
+ once.Do(func() {
+ loadEnvVars()
+ registry = defaultRegistry(modelName)
+ gloLogger = logger
+
+ if logger == nil {
+ gloLogger = DevLogger()
+ }
+ })
+}
+
+// Load environment variables from PROJECT_ROOT/.env file
+func loadEnvVars() {
+ dotEnvFilePath := filepath.Join(config.PROJECT_ROOT, ".env")
+ dotEnvExampleFilePath := filepath.Join(config.PROJECT_ROOT,
".env.example")
+
+ // Check if the .env file exists, if not, copy .env.example to .env
+ if _, err := os.Stat(dotEnvFilePath); os.IsNotExist(err) {
+ if err = utils.CopyFile(dotEnvExampleFilePath, dotEnvFilePath);
err != nil {
+ panic(err)
+ }
}
- g, err := genkit.Init(*rootContext,
+
+ // Load environment variables
+ if err := godotenv.Load(dotEnvFilePath); err != nil {
+ panic(err)
+ }
+
+}
+
+func defaultRegistry(modelName string) *genkit.Genkit {
+ ctx := context.Background()
+ return genkit.Init(ctx,
genkit.WithPlugins(
&siliconflow.SiliconFlow{
APIKey: config.SILICONFLOW_API_KEY,
},
+
&googlegenai.GoogleAI{
APIKey: config.GEMINI_API_KEY,
},
@@ -45,19 +73,12 @@ func InitGlobalGenkit(defaultModel string) (err error) {
APIKey: config.DASHSCOPE_API_KEY,
},
),
- genkit.WithDefaultModel(defaultModel),
+ genkit.WithDefaultModel(modelName),
genkit.WithPromptDir(config.PROMPT_DIR_PATH),
)
-
- if g == nil {
- return fmt.Errorf("fail to initialize global genkit")
- }
-
- globalGenkit = g
- return err
}
-func InitLogger() {
+func DevLogger() *slog.Logger {
logLevel := slog.LevelInfo
if envLevel := config.LOG_LEVEL; envLevel != "" {
switch strings.ToUpper(envLevel) {
@@ -71,56 +92,55 @@ func InitLogger() {
logLevel = slog.LevelError
}
}
- logger.SetLevel(logLevel)
slog.SetDefault(
slog.New(
tint.NewHandler(os.Stderr, &tint.Options{
- Level: slog.LevelDebug,
+ Level: logLevel,
AddSource: true,
TimeFormat: time.Kitchen,
}),
),
)
- globalLogger = slog.Default()
+ return slog.Default()
}
-func GetGlobalGenkit() (*genkit.Genkit, error) {
- var err error
- if globalGenkit == nil {
- return nil, fmt.Errorf("global genkit is nil, initialize genkit
first")
- }
- return globalGenkit, err
+func ReleaseLogger() *slog.Logger {
+ slog.SetDefault(
+ slog.New(
+ tint.NewHandler(os.Stderr, &tint.Options{
+ Level: slog.LevelInfo,
+ AddSource: true,
+ TimeFormat: time.Kitchen,
+ }),
+ ),
+ )
+ return slog.Default()
}
-func GetLogger() *slog.Logger {
- if globalLogger == nil {
- InitLogger()
- }
- return globalLogger
+func PrettyLogger() *slog.Logger {
+ slog.SetDefault(
+ slog.New(
+ prettylog.NewHandler(&slog.HandlerOptions{
+ Level: slog.LevelInfo,
+ AddSource: false,
+ ReplaceAttr: nil,
+ }),
+ ),
+ )
+ return slog.Default()
}
-func GetRootContext() context.Context {
- ctx := context.Background()
- if rootContext == nil {
- rootContext = &ctx
+func GetRegistry() *genkit.Genkit {
+ if registry == nil {
+ registry = defaultRegistry(config.DEFAULT_MODEL.Key())
}
- return *rootContext
+ return registry
}
-// Load environment variables from PROJECT_ROOT/.env file
-func LoadEnvVars() (err error) {
- dotEnvFilePath := filepath.Join(config.PROJECT_ROOT, ".env")
- dotEnvExampleFilePath := filepath.Join(config.PROJECT_ROOT,
".env.example")
-
- // Check if the .env file exists, if not, copy .env.example to .env
- if _, err = os.Stat(dotEnvFilePath); os.IsNotExist(err) {
- if err = utils.CopyFile(dotEnvExampleFilePath, dotEnvFilePath);
err != nil {
- return err
- }
+func GetLogger() *slog.Logger {
+ if gloLogger == nil {
+ gloLogger = PrettyLogger()
}
-
- // Load environment variables
- err = godotenv.Load(dotEnvFilePath)
- return err
+ return gloLogger
}
diff --git a/ai/internal/memory/history.go b/ai/internal/memory/history.go
new file mode 100644
index 00000000..78f38344
--- /dev/null
+++ b/ai/internal/memory/history.go
@@ -0,0 +1,44 @@
+package memory
+
+import (
+ "context"
+
+ "github.com/firebase/genkit/go/ai"
+)
+
+type HistoryKey string
+
+const (
+ ChatHistoryKey HistoryKey = "chat_history"
+ SystemMemoryKey HistoryKey = "system_memory"
+ CoreMemoryKey HistoryKey = "core_memory"
+)
+
+type History struct {
+ history []*ai.Message
+}
+
+func NewMemoryContext(key HistoryKey) context.Context {
+ return context.WithValue(
+ context.Background(),
+ key,
+ &History{
+ history: make([]*ai.Message, 0),
+ })
+}
+
+func (h *History) AddHistory(message ...*ai.Message) {
+ h.history = append(h.history, message...)
+}
+
+func (h *History) IsEmpty() bool {
+ return len(h.history) == 0
+}
+
+func (h *History) AllHistory() []*ai.Message {
+ return h.history
+}
+
+func (h *History) Clear() {
+ h.history = make([]*ai.Message, 0)
+}
diff --git a/ai/internal/memory/memory.go b/ai/internal/memory/memory.go
new file mode 100644
index 00000000..a7831238
--- /dev/null
+++ b/ai/internal/memory/memory.go
@@ -0,0 +1,482 @@
+package memory
+
+import (
+ "strings"
+ "time"
+
+ "github.com/firebase/genkit/go/ai"
+)
+
+const (
+ SystemWindowLimit = 10
+ CoreWindowLimit = 20
+ ChatWindowLimit = 50
+
+ // Block size limits
+ MaxBlockSize = 2048 // Maximum Block character count
+ DefaultImportance = 5 // Default importance level
+ MaxImportance = 10 // Maximum importance level
+)
+
+type Memory struct {
+ SystemWindow *MemoryWindow
+ CoreWindow *MemoryWindow
+ ChatWindow *MemoryWindow
+}
+
+type MemoryWindow struct {
+ Blocks []MemoryBlock
+ EvictionStrategy EvictionStrategy
+ WindowSize int
+}
+
+type MemoryBlock interface {
+ CompileToPrompt() *ai.Prompt
+ Limit() int
+ Size() int
+ Priority() int
+ UpdatePriority() error
+ Reserved() bool
+ Split(maxSize int) ([]MemoryBlock, error)
+}
+
+type EvictionStrategy interface {
+ OnAccess(window *MemoryWindow) error
+ OnInsert(window *MemoryWindow) error
+ Evict(window *MemoryWindow) error
+}
+
+type FIFO struct {
+}
+
+type LRU struct {
+}
+
+// SystemMemoryBlock system memory block
+type SystemMemoryBlock struct {
+ content string
+ timestamp int64
+ priority int
+ reserved bool
+}
+
+func (s *SystemMemoryBlock) CompileToPrompt() *ai.Prompt {
+ return nil
+}
+
+func (s *SystemMemoryBlock) Limit() int {
+ return MaxBlockSize
+}
+
+func (s *SystemMemoryBlock) Size() int {
+ return len(s.content)
+}
+
+func (s *SystemMemoryBlock) Priority() int {
+ return s.priority
+}
+
+func (s *SystemMemoryBlock) UpdatePriority() error {
+ s.timestamp = time.Now().UnixNano()
+ s.priority = -int(s.timestamp / 1e6) // Convert to milliseconds and
negate
+ return nil
+}
+
+func (s *SystemMemoryBlock) Reserved() bool {
+ return s.reserved
+}
+
+func (s *SystemMemoryBlock) Split(maxSize int) ([]MemoryBlock, error) {
+ if s.Size() <= maxSize {
+ return []MemoryBlock{s}, nil
+ }
+
+ parts := splitBySemanticBoundary(s.content, maxSize)
+ blocks := make([]MemoryBlock, len(parts))
+
+ for i, part := range parts {
+ blocks[i] = &SystemMemoryBlock{
+ content: part,
+ timestamp: s.timestamp,
+ priority: s.priority,
+ reserved: s.reserved,
+ }
+ }
+
+ return blocks, nil
+}
+
+// CoreMemoryBlock core memory block
+type CoreMemoryBlock struct {
+ content string
+ timestamp int64
+ priority int
+ importance int
+ reserved bool
+}
+
+func (c *CoreMemoryBlock) CompileToPrompt() *ai.Prompt {
+ // TODO: Need genkit registry to create prompt, return nil for now
+ return nil
+}
+
+func (c *CoreMemoryBlock) Limit() int {
+ return MaxBlockSize
+}
+
+func (c *CoreMemoryBlock) Size() int {
+ return len(c.content)
+}
+
+func (c *CoreMemoryBlock) Priority() int {
+ return c.priority
+}
+
+func (c *CoreMemoryBlock) UpdatePriority() error {
+ c.timestamp = time.Now().UnixMilli()
+ c.priority = calculatePriority(c.timestamp, c.importance)
+ return nil
+}
+
+func (c *CoreMemoryBlock) Reserved() bool {
+ return c.reserved
+}
+
+func (c *CoreMemoryBlock) Split(maxSize int) ([]MemoryBlock, error) {
+ if c.Size() <= maxSize {
+ return []MemoryBlock{c}, nil
+ }
+
+ parts := splitBySemanticBoundary(c.content, maxSize)
+ blocks := make([]MemoryBlock, len(parts))
+
+ for i, part := range parts {
+ blocks[i] = &CoreMemoryBlock{
+ content: part,
+ timestamp: c.timestamp,
+ priority: c.priority,
+ importance: c.importance,
+ reserved: c.reserved,
+ }
+ }
+
+ return blocks, nil
+}
+
+// ChatHistoryMemoryBlock chat history memory block
+type ChatHistoryMemoryBlock struct {
+ content string
+ timestamp int64
+ priority int
+ reserved bool
+}
+
+func (ch *ChatHistoryMemoryBlock) CompileToPrompt() *ai.Prompt {
+ // TODO: Need genkit registry to create prompt, return nil for now
+ return nil
+}
+
+func (ch *ChatHistoryMemoryBlock) Limit() int {
+ return MaxBlockSize
+}
+
+func (ch *ChatHistoryMemoryBlock) Size() int {
+ return len(ch.content)
+}
+
+func (ch *ChatHistoryMemoryBlock) Priority() int {
+ return ch.priority
+}
+
+func (ch *ChatHistoryMemoryBlock) UpdatePriority() error {
+ ch.timestamp = time.Now().UnixNano()
+ ch.priority = -int(ch.timestamp / 1e6) // Convert to milliseconds and
negate
+ return nil
+}
+
+func (ch *ChatHistoryMemoryBlock) Reserved() bool {
+ return ch.reserved
+}
+
+func (ch *ChatHistoryMemoryBlock) Split(maxSize int) ([]MemoryBlock, error) {
+ if ch.Size() <= maxSize {
+ return []MemoryBlock{ch}, nil
+ }
+
+ parts := splitBySemanticBoundary(ch.content, maxSize)
+ blocks := make([]MemoryBlock, len(parts))
+
+ for i, part := range parts {
+ blocks[i] = &ChatHistoryMemoryBlock{
+ content: part,
+ timestamp: ch.timestamp,
+ priority: ch.priority,
+ reserved: ch.reserved,
+ }
+ }
+
+ return blocks, nil
+}
+
+// FIFO strategy implementation
+func (f *FIFO) OnAccess(window *MemoryWindow) error {
+ // FIFO does not need to update priority on access
+ return nil
+}
+
+func (f *FIFO) OnInsert(window *MemoryWindow) error {
+ if len(window.Blocks) > 0 {
+ lastBlock := window.Blocks[len(window.Blocks)-1]
+ err := lastBlock.UpdatePriority()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (f *FIFO) Evict(window *MemoryWindow) error {
+ if len(window.Blocks) == 0 {
+ return nil
+ }
+
+ minPriority := int(^uint(0) >> 1) // Max int value
+ evictIndex := -1
+
+ for i, block := range window.Blocks {
+ if !block.Reserved() && block.Priority() < minPriority {
+ minPriority = block.Priority()
+ evictIndex = i
+ }
+ }
+
+ if evictIndex >= 0 {
+ // Remove found block
+ window.Blocks = append(window.Blocks[:evictIndex],
window.Blocks[evictIndex+1:]...)
+ }
+
+ return nil
+}
+
+// LRU strategy implementation
+func (l *LRU) OnAccess(window *MemoryWindow) error {
+ // Update priority of recently accessed block
+ if len(window.Blocks) > 0 {
+ lastBlock := window.Blocks[len(window.Blocks)-1]
+ err := lastBlock.UpdatePriority()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (l *LRU) OnInsert(window *MemoryWindow) error {
+ if len(window.Blocks) > 0 {
+ lastBlock := window.Blocks[len(window.Blocks)-1]
+ err := lastBlock.UpdatePriority()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (l *LRU) Evict(window *MemoryWindow) error {
+ if len(window.Blocks) == 0 {
+ return nil
+ }
+
+ minPriority := int(^uint(0) >> 1) // Max int value
+ evictIndex := -1
+
+ for i, block := range window.Blocks {
+ if !block.Reserved() && block.Priority() < minPriority {
+ minPriority = block.Priority()
+ evictIndex = i
+ }
+ }
+
+ if evictIndex >= 0 {
+ // Remove found block
+ window.Blocks = append(window.Blocks[:evictIndex],
window.Blocks[evictIndex+1:]...)
+ }
+
+ return nil
+}
+
+// MemoryWindow method implementation
+func (mw *MemoryWindow) Insert(block MemoryBlock) error {
+ // Check if eviction is needed
+ for mw.NeedsEviction() {
+ err := mw.EvictionStrategy.Evict(mw)
+ if err != nil {
+ return err
+ }
+ }
+
+ // Insert new block
+ mw.Blocks = append(mw.Blocks, block)
+
+ // Trigger insert event
+ return mw.EvictionStrategy.OnInsert(mw)
+}
+
+func (mw *MemoryWindow) NeedsEviction() bool {
+ return len(mw.Blocks) >= mw.WindowSize
+}
+
+func (mw *MemoryWindow) FindBlock(predicate func(MemoryBlock) bool)
*MemoryBlock {
+ for i := range mw.Blocks {
+ if predicate(mw.Blocks[i]) {
+ // Trigger access event
+ mw.EvictionStrategy.OnAccess(mw)
+ return &mw.Blocks[i]
+ }
+ }
+ return nil
+}
+
+// Memory method implementation
+func NewMemory() *Memory {
+ return &Memory{
+ SystemWindow: &MemoryWindow{
+ Blocks: make([]MemoryBlock, 0),
+ EvictionStrategy: &FIFO{},
+ WindowSize: SystemWindowLimit,
+ },
+ CoreWindow: &MemoryWindow{
+ Blocks: make([]MemoryBlock, 0),
+ EvictionStrategy: &LRU{},
+ WindowSize: CoreWindowLimit,
+ },
+ ChatWindow: &MemoryWindow{
+ Blocks: make([]MemoryBlock, 0),
+ EvictionStrategy: &LRU{},
+ WindowSize: ChatWindowLimit,
+ },
+ }
+}
+
+func (m *Memory) AddSystemMemory(content string, reserved bool) error {
+ block := &SystemMemoryBlock{
+ content: content,
+ timestamp: time.Now().UnixNano(),
+ priority: 0,
+ reserved: reserved,
+ }
+ block.UpdatePriority()
+
+ // Check if splitting is needed
+ if block.Size() > MaxBlockSize {
+ blocks, err := block.Split(MaxBlockSize)
+ if err != nil {
+ return err
+ }
+ for _, b := range blocks {
+ err := m.SystemWindow.Insert(b)
+ if err != nil {
+ return err
+ }
+ }
+ } else {
+ return m.SystemWindow.Insert(block)
+ }
+
+ return nil
+}
+
+func (m *Memory) AddCoreMemory(content string, importance int, reserved bool)
error {
+ block := &CoreMemoryBlock{
+ content: content,
+ timestamp: time.Now().UnixNano(),
+ priority: 0,
+ importance: importance,
+ reserved: reserved,
+ }
+ block.UpdatePriority()
+
+ // Check if splitting is needed
+ if block.Size() > MaxBlockSize {
+ blocks, err := block.Split(MaxBlockSize)
+ if err != nil {
+ return err
+ }
+ for _, b := range blocks {
+ err := m.CoreWindow.Insert(b)
+ if err != nil {
+ return err
+ }
+ }
+ } else {
+ return m.CoreWindow.Insert(block)
+ }
+
+ return nil
+}
+
+func (m *Memory) AddChatHistory(content string) error {
+ block := &ChatHistoryMemoryBlock{
+ content: content,
+ timestamp: time.Now().UnixNano(),
+ priority: 0,
+ reserved: false,
+ }
+ block.UpdatePriority()
+
+ // Check if splitting is needed
+ if block.Size() > MaxBlockSize {
+ blocks, err := block.Split(MaxBlockSize)
+ if err != nil {
+ return err
+ }
+ for _, b := range blocks {
+ err := m.ChatWindow.Insert(b)
+ if err != nil {
+ return err
+ }
+ }
+ } else {
+ return m.ChatWindow.Insert(block)
+ }
+
+ return nil
+}
+
+func (m *Memory) CompileAllToPrompt() *ai.Prompt {
+ // TODO: Need genkit registry to create unified prompt, return nil for
now
+ return nil
+}
+
+// Helper function implementation
+func splitBySemanticBoundary(content string, maxSize int) []string {
+ if len(content) <= maxSize {
+ return []string{content}
+ }
+
+ var parts []string
+ sentences := strings.Split(content, ".")
+ currentPart := ""
+
+ for _, sentence := range sentences {
+ testPart := currentPart + sentence + "."
+ if len(testPart) > maxSize && currentPart != "" {
+ parts = append(parts, strings.TrimSpace(currentPart))
+ currentPart = sentence + "."
+ } else {
+ currentPart = testPart
+ }
+ }
+
+ if currentPart != "" {
+ parts = append(parts, strings.TrimSpace(currentPart))
+ }
+
+ return parts
+}
+
+func calculatePriority(timestamp int64, importance int) int {
+ timeScore := -int(timestamp / 1e6) // Higher timestamp means
higher priority
+ importanceBonus := importance * 1000000 // Importance bonus
+ return timeScore + importanceBonus
+}
diff --git a/ai/internal/schema/llm_schema.go b/ai/internal/schema/llm_schema.go
deleted file mode 100644
index f54f1f7d..00000000
--- a/ai/internal/schema/llm_schema.go
+++ /dev/null
@@ -1,102 +0,0 @@
-package schema
-
-import (
- "dubbo-admin-ai/internal/tools"
- "encoding/json"
- "fmt"
-)
-
-// 表示 LLM 的响应结构
-type ReActIn = ReActInput
-type ReActOut = string
-
-type ThinkIn = ReActInput
-type ThinkOut = ThinkAggregation
-
-type ActIn = ThinkOut
-type ActOut = []string
-
-type ReActInput struct {
- UserInput string `json:"userInput"`
- // 移除SessionID和SessionMemory,这些由Session管理
-}
-
-type Status string
-
-const (
- Continued Status = "CONTINUED"
- Finished Status = "FINISHED"
- Pending Status = "PENDING"
-)
-
-type ToolRequest struct {
- Thought string `json:"thought"`
- ToolDesc tools.ToolDesc `json:"tool_desc"`
-}
-
-type ThinkAggregation struct {
- ToolRequests []ToolRequest `json:"tool_requests"`
- Status Status `json:"status"
jsonschema:"enum=CONTINUED,enum=FINISHED,enum=PENDING"`
- Thought string `json:"thought"`
- FinalAnswer string `json:"final_answer,omitempty"
jsonschema:"required=false"`
-}
-
-// ReActInput
-func (a ReActInput) String() string {
- data, err := json.MarshalIndent(a, "", " ")
- if err != nil {
- return fmt.Sprintf("ReActInput{error: %v}", err)
- }
- return string(data)
-}
-
-// ToolRequest
-// HasAction 检查是否包含行动指令
-func (tr ToolRequest) HasAction() bool {
- return tr.ToolDesc.ToolInput != nil && tr.ToolDesc.ToolName != ""
-}
-
-// ValidateToolDesc 验证行动指令的有效性
-func (tr ToolRequest) ValidateToolDesc() error {
- if !tr.HasAction() {
- return fmt.Errorf("no valid action found")
- }
-
- if tr.ToolDesc.ToolName == "" {
- return fmt.Errorf("tool_name cannot be empty")
- }
-
- // 验证支持的工具名称
- supportedTools, err := tools.AllMockToolNames()
- if err != nil {
- return fmt.Errorf("failed to get supported tools: %w", err)
- }
-
- if _, ok := supportedTools[tr.ToolDesc.ToolName]; !ok {
- return fmt.Errorf("unsupported tool: %s", tr.ToolDesc.ToolName)
- }
-
- return nil
-}
-
-// String 实现 fmt.Stringer 接口,返回格式化的 JSON
-func (tr ToolRequest) String() string {
- data, err := json.MarshalIndent(tr, "", " ")
- if err != nil {
- return fmt.Sprintf("ToolRequest{error: %v}", err)
- }
- return string(data)
-}
-
-// ThinkAggregation
-func (ta ThinkAggregation) IsStop() bool {
- return ta.Status == Finished && ta.FinalAnswer != ""
-}
-
-func (ta ThinkAggregation) String() string {
- data, err := json.MarshalIndent(ta, "", " ")
- if err != nil {
- return fmt.Sprintf("ThinkAggregation{error: %v}", err)
- }
- return string(data)
-}
diff --git a/ai/internal/tools/mock_tools.go b/ai/internal/tools/mock_tools.go
index d61663b2..bf2c229a 100644
--- a/ai/internal/tools/mock_tools.go
+++ b/ai/internal/tools/mock_tools.go
@@ -1,8 +1,8 @@
package tools
import (
+ "dubbo-admin-ai/internal/manager"
"fmt"
- "log"
"github.com/firebase/genkit/go/ai"
"github.com/firebase/genkit/go/genkit"
@@ -11,29 +11,27 @@ import (
// ================================================
// Prometheus Query Service Latency Tool
// ================================================
-type ToolOutput interface {
- Tool() string
-}
-
type PrometheusServiceLatencyInput struct {
- ServiceName string `json:"serviceName"
jsonschema:"required,description=The name of the service to query"`
- TimeRangeMinutes int `json:"timeRangeMinutes"
jsonschema:"required,description=Time range in minutes"`
+ ServiceName string `json:"service_name"
jsonschema:"required,description=The name of the service to query"`
+ TimeRangeMinutes int `json:"time_range_minutes"
jsonschema:"required,description=Time range in minutes"`
Quantile float64 `json:"quantile"
jsonschema:"required,description=Quantile to query (e.g., 0.99 or 0.95)"`
}
+func (i PrometheusServiceLatencyInput) String() string {
+ return fmt.Sprintf("PrometheusServiceLatencyInput{ServiceName: %s,
TimeRangeMinutes: %d, Quantile: %.2f}", i.ServiceName, i.TimeRangeMinutes,
i.Quantile)
+}
+
type PrometheusServiceLatencyOutput struct {
- ToolName string `json:"toolName"`
Quantile float64 `json:"quantile"`
- ValueMillis int `json:"valueMillis"`
- Summary string `json:"summary"`
+ ValueMillis int `json:"value_millis"`
}
-func (o PrometheusServiceLatencyOutput) Tool() string {
- return o.ToolName
+func (o PrometheusServiceLatencyOutput) String() string {
+ return fmt.Sprintf("PrometheusServiceLatencyOutput{Quantile: %.2f,
ValueMillis: %d}", o.Quantile, o.ValueMillis)
}
-func prometheusQueryServiceLatency(ctx *ai.ToolContext, input
PrometheusServiceLatencyInput) (PrometheusServiceLatencyOutput, error) {
- log.Printf("Tool 'prometheus_query_service_latency' called for service:
%s", input.ServiceName)
+func prometheusQueryServiceLatency(ctx *ai.ToolContext, input
PrometheusServiceLatencyInput) (ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'prometheus_query_service_latency'
called", "service", input.ServiceName)
// Mock data based on the example in prompt
valueMillis := 3500
@@ -41,11 +39,15 @@ func prometheusQueryServiceLatency(ctx *ai.ToolContext,
input PrometheusServiceL
valueMillis = 850
}
- return PrometheusServiceLatencyOutput{
- ToolName: "prometheus_query_service_latency",
+ output := PrometheusServiceLatencyOutput{
Quantile: input.Quantile,
ValueMillis: valueMillis,
- Summary: fmt.Sprintf("服务 %s 在过去%d分钟内的 P%.0f 延迟为 %dms",
input.ServiceName, input.TimeRangeMinutes, input.Quantile*100, valueMillis),
+ }
+
+ return ToolOutput{
+ ToolName: "prometheus_query_service_latency",
+ Summary: fmt.Sprintf("服务 %s 在过去%d分钟内的 P%.0f 延迟为 %dms",
input.ServiceName, input.TimeRangeMinutes, input.Quantile*100,
output.ValueMillis),
+ Result: output,
}, nil
}
@@ -54,29 +56,35 @@ func prometheusQueryServiceLatency(ctx *ai.ToolContext,
input PrometheusServiceL
// ================================================
type PrometheusServiceTrafficInput struct {
- ServiceName string `json:"serviceName"
jsonschema:"required,description=The name of the service to query"`
- TimeRangeMinutes int `json:"timeRangeMinutes"
jsonschema:"required,description=Time range in minutes"`
+ ServiceName string `json:"service_name"
jsonschema:"required,description=The name of the service to query"`
+ TimeRangeMinutes int `json:"time_range_minutes"
jsonschema:"required,description=Time range in minutes"`
+}
+
+func (i PrometheusServiceTrafficInput) String() string {
+ return fmt.Sprintf("PrometheusServiceTrafficInput{ServiceName: %s,
TimeRangeMinutes: %d}", i.ServiceName, i.TimeRangeMinutes)
}
type PrometheusServiceTrafficOutput struct {
- ToolName string `json:"toolName"`
- RequestRateQPS float64 `json:"requestRateQPS"`
- ErrorRatePercentage float64 `json:"errorRatePercentage"`
- Summary string `json:"summary"`
+ RequestRateQPS float64 `json:"request_rate_qps"`
+ ErrorRatePercentage float64 `json:"error_rate_percentage"`
}
-func (o PrometheusServiceTrafficOutput) Tool() string {
- return o.ToolName
+func (o PrometheusServiceTrafficOutput) String() string {
+ return fmt.Sprintf("PrometheusServiceTrafficOutput{RequestRateQPS:
%.1f, ErrorRatePercentage: %.1f}", o.RequestRateQPS, o.ErrorRatePercentage)
}
-func prometheusQueryServiceTraffic(ctx *ai.ToolContext, input
PrometheusServiceTrafficInput) (PrometheusServiceTrafficOutput, error) {
- log.Printf("Tool 'prometheus_query_service_traffic' called for service:
%s", input.ServiceName)
+func prometheusQueryServiceTraffic(ctx *ai.ToolContext, input
PrometheusServiceTrafficInput) (ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'prometheus_query_service_traffic'
called", "service", input.ServiceName)
- return PrometheusServiceTrafficOutput{
- ToolName: "prometheus_query_service_traffic",
+ output := PrometheusServiceTrafficOutput{
RequestRateQPS: 250.0,
ErrorRatePercentage: 5.2,
- Summary: fmt.Sprintf("服务 %s 的 QPS 为 250, 错误率为
5.2%%", input.ServiceName),
+ }
+
+ return ToolOutput{
+ ToolName: "prometheus_query_service_traffic",
+ Summary: fmt.Sprintf("服务 %s 的 QPS 为 %.1f, 错误率为 %.1f%%",
input.ServiceName, output.RequestRateQPS, output.ErrorRatePercentage),
+ Result: output,
}, nil
}
@@ -85,7 +93,11 @@ func prometheusQueryServiceTraffic(ctx *ai.ToolContext,
input PrometheusServiceT
// ================================================
type QueryTimeseriesDatabaseInput struct {
- PromqlQuery string `json:"promqlQuery"
jsonschema:"required,description=PromQL query to execute"`
+ PromqlQuery string `json:"promql_query"
jsonschema:"required,description=PromQL query to execute"`
+}
+
+func (i QueryTimeseriesDatabaseInput) String() string {
+ return fmt.Sprintf("QueryTimeseriesDatabaseInput{PromqlQuery: %s}",
i.PromqlQuery)
}
type TimeseriesMetric struct {
@@ -103,22 +115,19 @@ type TimeseriesResult struct {
}
type QueryTimeseriesDatabaseOutput struct {
- ToolName string `json:"toolName"`
- Query string `json:"query"`
- Results []TimeseriesResult `json:"results"`
- Summary string `json:"summary"`
+ Query string `json:"query"`
+ Results []TimeseriesResult `json:"results"`
}
-func (o QueryTimeseriesDatabaseOutput) Tool() string {
- return o.ToolName
+func (o QueryTimeseriesDatabaseOutput) String() string {
+ return fmt.Sprintf("QueryTimeseriesDatabaseOutput{Query: %s, Results:
%d items}", o.Query, len(o.Results))
}
-func queryTimeseriesDatabase(ctx *ai.ToolContext, input
QueryTimeseriesDatabaseInput) (QueryTimeseriesDatabaseOutput, error) {
- log.Printf("Tool 'query_timeseries_database' called with query: %s",
input.PromqlQuery)
+func queryTimeseriesDatabase(ctx *ai.ToolContext, input
QueryTimeseriesDatabaseInput) (ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'query_timeseries_database' called",
"query", input.PromqlQuery)
- return QueryTimeseriesDatabaseOutput{
- ToolName: "query_timeseries_database",
- Query: input.PromqlQuery,
+ output := QueryTimeseriesDatabaseOutput{
+ Query: input.PromqlQuery,
Results: []TimeseriesResult{
{
Metric: TimeseriesMetric{Pod:
"order-service-pod-1"},
@@ -129,7 +138,12 @@ func queryTimeseriesDatabase(ctx *ai.ToolContext, input
QueryTimeseriesDatabaseI
Value: TimeseriesValue{Timestamp: 1692192000,
Value: "3.2"},
},
},
- Summary: "查询返回了 2 个时间序列",
+ }
+
+ return ToolOutput{
+ ToolName: "query_timeseries_database",
+ Summary: "查询返回了 2 个时间序列",
+ Result: output,
}, nil
}
@@ -138,33 +152,34 @@ func queryTimeseriesDatabase(ctx *ai.ToolContext, input
QueryTimeseriesDatabaseI
// ================================================
type ApplicationPerformanceProfilingInput struct {
- ServiceName string `json:"serviceName"
jsonschema:"required,description=The name of the service to profile"`
- PodName string `json:"podName"
jsonschema:"required,description=The specific pod name to profile"`
- DurationSeconds int `json:"durationSeconds"
jsonschema:"required,description=Duration of profiling in seconds"`
+ ServiceName string `json:"service_name"
jsonschema:"required,description=The name of the service to profile"`
+ PodName string `json:"pod_name"
jsonschema:"required,description=The specific pod name to profile"`
+ DurationSeconds int `json:"duration_seconds"
jsonschema:"required,description=Duration of profiling in seconds"`
+}
+
+func (i ApplicationPerformanceProfilingInput) String() string {
+ return fmt.Sprintf("ApplicationPerformanceProfilingInput{ServiceName:
%s, PodName: %s, DurationSeconds: %d}", i.ServiceName, i.PodName,
i.DurationSeconds)
}
type PerformanceHotspot struct {
- CPUTimePercentage float64 `json:"cpuTimePercentage"`
- StackTrace []string `json:"stackTrace"`
+ CPUTimePercentage float64 `json:"cpu_time_percentage"`
+ StackTrace []string `json:"stack_trace"`
}
type ApplicationPerformanceProfilingOutput struct {
- ToolName string `json:"toolName"`
Status string `json:"status"`
- TotalSamples int `json:"totalSamples"`
+ TotalSamples int `json:"total_samples"`
Hotspots []PerformanceHotspot `json:"hotspots"`
- Summary string `json:"summary"`
}
-func (o ApplicationPerformanceProfilingOutput) Tool() string {
- return o.ToolName
+func (o ApplicationPerformanceProfilingOutput) String() string {
+ return fmt.Sprintf("ApplicationPerformanceProfilingOutput{Status: %s,
TotalSamples: %d, Hotspots: %d items}", o.Status, o.TotalSamples,
len(o.Hotspots))
}
-func applicationPerformanceProfiling(ctx *ai.ToolContext, input
ApplicationPerformanceProfilingInput) (ApplicationPerformanceProfilingOutput,
error) {
- log.Printf("Tool 'application_performance_profiling' called for
service: %s, pod: %s", input.ServiceName, input.PodName)
+func applicationPerformanceProfiling(ctx *ai.ToolContext, input
ApplicationPerformanceProfilingInput) (ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'application_performance_profiling'
called", "service", input.ServiceName, "pod", input.PodName)
- return ApplicationPerformanceProfilingOutput{
- ToolName: "application_performance_profiling",
+ output := ApplicationPerformanceProfilingOutput{
Status: "completed",
TotalSamples: 10000,
Hotspots: []PerformanceHotspot{
@@ -177,7 +192,12 @@ func applicationPerformanceProfiling(ctx *ai.ToolContext,
input ApplicationPerfo
},
},
},
- Summary: "性能分析显示,45.5%的CPU时间消耗在数据库查询调用链上",
+ }
+
+ return ToolOutput{
+ ToolName: "application_performance_profiling",
+ Summary: "性能分析显示,45.5%的CPU时间消耗在数据库查询调用链上",
+ Result: output,
}, nil
}
@@ -186,31 +206,37 @@ func applicationPerformanceProfiling(ctx *ai.ToolContext,
input ApplicationPerfo
// ================================================
type JVMPerformanceAnalysisInput struct {
- ServiceName string `json:"serviceName"
jsonschema:"required,description=The name of the Java service to analyze"`
- PodName string `json:"podName" jsonschema:"required,description=The
specific pod name to analyze"`
+ ServiceName string `json:"service_name"
jsonschema:"required,description=The name of the Java service to analyze"`
+ PodName string `json:"pod_name"
jsonschema:"required,description=The specific pod name to analyze"`
+}
+
+func (i JVMPerformanceAnalysisInput) String() string {
+ return fmt.Sprintf("JVMPerformanceAnalysisInput{ServiceName: %s,
PodName: %s}", i.ServiceName, i.PodName)
}
type JVMPerformanceAnalysisOutput struct {
- ToolName string `json:"toolName"`
- FullGcCountLastHour int `json:"fullGcCountLastHour"`
- FullGcTimeAvgMillis int `json:"fullGcTimeAvgMillis"`
- HeapUsagePercentage float64 `json:"heapUsagePercentage"`
- Summary string `json:"summary"`
+ FullGcCountLastHour int `json:"full_gc_count_last_hour"`
+ FullGcTimeAvgMillis int `json:"full_gc_time_avg_millis"`
+ HeapUsagePercentage float64 `json:"heap_usage_percentage"`
}
-func (o JVMPerformanceAnalysisOutput) Tool() string {
- return o.ToolName
+func (o JVMPerformanceAnalysisOutput) String() string {
+ return fmt.Sprintf("JVMPerformanceAnalysisOutput{FullGcCountLastHour:
%d, FullGcTimeAvgMillis: %d, HeapUsagePercentage: %.1f}",
o.FullGcCountLastHour, o.FullGcTimeAvgMillis, o.HeapUsagePercentage)
}
-func jvmPerformanceAnalysis(ctx *ai.ToolContext, input
JVMPerformanceAnalysisInput) (JVMPerformanceAnalysisOutput, error) {
- log.Printf("Tool 'jvm_performance_analysis' called for service: %s,
pod: %s", input.ServiceName, input.PodName)
+func jvmPerformanceAnalysis(ctx *ai.ToolContext, input
JVMPerformanceAnalysisInput) (ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'jvm_performance_analysis' called",
"service", input.ServiceName, "pod", input.PodName)
- return JVMPerformanceAnalysisOutput{
- ToolName: "jvm_performance_analysis",
+ output := JVMPerformanceAnalysisOutput{
FullGcCountLastHour: 15,
FullGcTimeAvgMillis: 1200,
HeapUsagePercentage: 85.5,
- Summary: "GC activity is high, average Full GC time
is 1200ms",
+ }
+
+ return ToolOutput{
+ ToolName: "jvm_performance_analysis",
+ Summary: "GC activity is high, average Full GC time is 1200ms",
+ Result: output,
}, nil
}
@@ -219,26 +245,34 @@ func jvmPerformanceAnalysis(ctx *ai.ToolContext, input
JVMPerformanceAnalysisInp
// ================================================
type TraceDependencyViewInput struct {
- ServiceName string `json:"serviceName"
jsonschema:"required,description=The name of the service to analyze
dependencies"`
+ ServiceName string `json:"service_name"
jsonschema:"required,description=The name of the service to analyze
dependencies"`
+}
+
+func (i TraceDependencyViewInput) String() string {
+ return fmt.Sprintf("TraceDependencyViewInput{ServiceName: %s}",
i.ServiceName)
}
type TraceDependencyViewOutput struct {
- ToolName string `json:"toolName"`
- UpstreamServices []string `json:"upstreamServices"`
- DownstreamServices []string `json:"downstreamServices"`
+ UpstreamServices []string `json:"upstream_services"`
+ DownstreamServices []string `json:"downstream_services"`
}
-func (o TraceDependencyViewOutput) Tool() string {
- return o.ToolName
+func (o TraceDependencyViewOutput) String() string {
+ return fmt.Sprintf("TraceDependencyViewOutput{UpstreamServices: %v,
DownstreamServices: %v}", o.UpstreamServices, o.DownstreamServices)
}
-func traceDependencyView(ctx *ai.ToolContext, input TraceDependencyViewInput)
(TraceDependencyViewOutput, error) {
- log.Printf("Tool 'trace_dependency_view' called for service: %s",
input.ServiceName)
+func traceDependencyView(ctx *ai.ToolContext, input TraceDependencyViewInput)
(ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'trace_dependency_view' called",
"service", input.ServiceName)
- return TraceDependencyViewOutput{
- ToolName: "trace_dependency_view",
+ output := TraceDependencyViewOutput{
UpstreamServices: []string{"api-gateway", "user-service"},
DownstreamServices: []string{"mysql-orders-db", "redis-cache",
"payment-service"},
+ }
+
+ return ToolOutput{
+ ToolName: "trace_dependency_view",
+ Summary: fmt.Sprintf("服务 %s 的上下游依赖关系查询完成", input.ServiceName),
+ Result: output,
}, nil
}
@@ -247,32 +281,33 @@ func traceDependencyView(ctx *ai.ToolContext, input
TraceDependencyViewInput) (T
// ================================================
type TraceLatencyAnalysisInput struct {
- ServiceName string `json:"serviceName"
jsonschema:"required,description=The name of the service to analyze"`
- TimeRangeMinutes int `json:"timeRangeMinutes"
jsonschema:"required,description=Time range in minutes"`
+ ServiceName string `json:"service_name"
jsonschema:"required,description=The name of the service to analyze"`
+ TimeRangeMinutes int `json:"time_range_minutes"
jsonschema:"required,description=Time range in minutes"`
+}
+
+func (i TraceLatencyAnalysisInput) String() string {
+ return fmt.Sprintf("TraceLatencyAnalysisInput{ServiceName: %s,
TimeRangeMinutes: %d}", i.ServiceName, i.TimeRangeMinutes)
}
type LatencyBottleneck struct {
- DownstreamService string `json:"downstreamService"`
- LatencyAvgMillis int `json:"latencyAvgMillis"`
- ContributionPercentage float64 `json:"contributionPercentage"`
+ DownstreamService string `json:"downstream_service"`
+ LatencyAvgMillis int `json:"latency_avg_millis"`
+ ContributionPercentage float64 `json:"contribution_percentage"`
}
type TraceLatencyAnalysisOutput struct {
- ToolName string `json:"toolName"`
- TotalLatencyAvgMillis int `json:"totalLatencyAvgMillis"`
+ TotalLatencyAvgMillis int
`json:"total_latency_avg_millis"`
Bottlenecks []LatencyBottleneck `json:"bottlenecks"`
- Summary string `json:"summary"`
}
-func (o TraceLatencyAnalysisOutput) Tool() string {
- return o.ToolName
+func (o TraceLatencyAnalysisOutput) String() string {
+ return fmt.Sprintf("TraceLatencyAnalysisOutput{TotalLatencyAvgMillis:
%d, Bottlenecks: %d items}", o.TotalLatencyAvgMillis, len(o.Bottlenecks))
}
-func traceLatencyAnalysis(ctx *ai.ToolContext, input
TraceLatencyAnalysisInput) (TraceLatencyAnalysisOutput, error) {
- log.Printf("Tool 'trace_latency_analysis' called for service: %s",
input.ServiceName)
+func traceLatencyAnalysis(ctx *ai.ToolContext, input
TraceLatencyAnalysisInput) (ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'trace_latency_analysis' called",
"service", input.ServiceName)
- return TraceLatencyAnalysisOutput{
- ToolName: "trace_latency_analysis",
+ output := TraceLatencyAnalysisOutput{
TotalLatencyAvgMillis: 3200,
Bottlenecks: []LatencyBottleneck{
{
@@ -286,7 +321,12 @@ func traceLatencyAnalysis(ctx *ai.ToolContext, input
TraceLatencyAnalysisInput)
ContributionPercentage: 4.7,
},
},
- Summary: "平均总延迟为 3200ms。瓶颈已定位,95.3% 的延迟来自对下游 'mysql-orders-db'
的调用",
+ }
+
+ return ToolOutput{
+ ToolName: "trace_latency_analysis",
+ Summary: "平均总延迟为 3200ms。瓶颈已定位,95.3% 的延迟来自对下游 'mysql-orders-db'
的调用",
+ Result: output,
}, nil
}
@@ -295,32 +335,38 @@ func traceLatencyAnalysis(ctx *ai.ToolContext, input
TraceLatencyAnalysisInput)
// ================================================
type DatabaseConnectionPoolAnalysisInput struct {
- ServiceName string `json:"serviceName"
jsonschema:"required,description=The name of the service to analyze"`
+ ServiceName string `json:"service_name"
jsonschema:"required,description=The name of the service to analyze"`
+}
+
+func (i DatabaseConnectionPoolAnalysisInput) String() string {
+ return fmt.Sprintf("DatabaseConnectionPoolAnalysisInput{ServiceName:
%s}", i.ServiceName)
}
type DatabaseConnectionPoolAnalysisOutput struct {
- ToolName string `json:"toolName"`
- MaxConnections int `json:"maxConnections"`
- ActiveConnections int `json:"activeConnections"`
- IdleConnections int `json:"idleConnections"`
- PendingRequests int `json:"pendingRequests"`
- Summary string `json:"summary"`
+ MaxConnections int `json:"max_connections"`
+ ActiveConnections int `json:"active_connections"`
+ IdleConnections int `json:"idle_connections"`
+ PendingRequests int `json:"pending_requests"`
}
-func (o DatabaseConnectionPoolAnalysisOutput) Tool() string {
- return o.ToolName
+func (o DatabaseConnectionPoolAnalysisOutput) String() string {
+ return
fmt.Sprintf("DatabaseConnectionPoolAnalysisOutput{MaxConnections: %d,
ActiveConnections: %d, IdleConnections: %d, PendingRequests: %d}",
o.MaxConnections, o.ActiveConnections, o.IdleConnections, o.PendingRequests)
}
-func databaseConnectionPoolAnalysis(ctx *ai.ToolContext, input
DatabaseConnectionPoolAnalysisInput) (DatabaseConnectionPoolAnalysisOutput,
error) {
- log.Printf("Tool 'database_connection_pool_analysis' called for
service: %s", input.ServiceName)
+func databaseConnectionPoolAnalysis(ctx *ai.ToolContext, input
DatabaseConnectionPoolAnalysisInput) (ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'database_connection_pool_analysis'
called", "service", input.ServiceName)
- return DatabaseConnectionPoolAnalysisOutput{
- ToolName: "database_connection_pool_analysis",
+ output := DatabaseConnectionPoolAnalysisOutput{
MaxConnections: 100,
ActiveConnections: 100,
IdleConnections: 0,
PendingRequests: 58,
- Summary: "数据库连接池已完全耗尽 (100/100),当前有 58 个请求正在排队等待连接",
+ }
+
+ return ToolOutput{
+ ToolName: "database_connection_pool_analysis",
+ Summary: "数据库连接池已完全耗尽 (100/100),当前有 58 个请求正在排队等待连接",
+ Result: output,
}, nil
}
@@ -329,35 +375,36 @@ func databaseConnectionPoolAnalysis(ctx *ai.ToolContext,
input DatabaseConnectio
// ================================================
type KubernetesGetPodResourcesInput struct {
- ServiceName string `json:"serviceName"
jsonschema:"required,description=The name of the service"`
+ ServiceName string `json:"service_name"
jsonschema:"required,description=The name of the service"`
Namespace string `json:"namespace"
jsonschema:"required,description=The namespace of the service"`
}
+func (i KubernetesGetPodResourcesInput) String() string {
+ return fmt.Sprintf("KubernetesGetPodResourcesInput{ServiceName: %s,
Namespace: %s}", i.ServiceName, i.Namespace)
+}
+
type PodResource struct {
- PodName string `json:"podName"`
- CPUUsageCores float64 `json:"cpuUsageCores"`
- CPURequestCores float64 `json:"cpuRequestCores"`
- CPULimitCores float64 `json:"cpuLimitCores"`
- MemoryUsageMi int `json:"memoryUsageMi"`
- MemoryRequestMi int `json:"memoryRequestMi"`
- MemoryLimitMi int `json:"memoryLimitMi"`
+ PodName string `json:"pod_name"`
+ CPUUsageCores float64 `json:"cpu_usage_cores"`
+ CPURequestCores float64 `json:"cpu_request_cores"`
+ CPULimitCores float64 `json:"cpu_limit_cores"`
+ MemoryUsageMi int `json:"memory_usage_mi"`
+ MemoryRequestMi int `json:"memory_request_mi"`
+ MemoryLimitMi int `json:"memory_limit_mi"`
}
type KubernetesGetPodResourcesOutput struct {
- ToolName string `json:"toolName"`
- Pods []PodResource `json:"pods"`
- Summary string `json:"summary"`
+ Pods []PodResource `json:"pods"`
}
-func (o KubernetesGetPodResourcesOutput) Tool() string {
- return o.ToolName
+func (o KubernetesGetPodResourcesOutput) String() string {
+ return fmt.Sprintf("KubernetesGetPodResourcesOutput{Pods: %d items}",
len(o.Pods))
}
-func kubernetesGetPodResources(ctx *ai.ToolContext, input
KubernetesGetPodResourcesInput) (KubernetesGetPodResourcesOutput, error) {
- log.Printf("Tool 'kubernetes_get_pod_resources' called for service: %s
in namespace: %s", input.ServiceName, input.Namespace)
+func kubernetesGetPodResources(ctx *ai.ToolContext, input
KubernetesGetPodResourcesInput) (ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'kubernetes_get_pod_resources' called",
"service", input.ServiceName, "namespace", input.Namespace)
- return KubernetesGetPodResourcesOutput{
- ToolName: "kubernetes_get_pod_resources",
+ output := KubernetesGetPodResourcesOutput{
Pods: []PodResource{
{
PodName: "order-service-pod-1",
@@ -378,7 +425,12 @@ func kubernetesGetPodResources(ctx *ai.ToolContext, input
KubernetesGetPodResour
MemoryLimitMi: 2048,
},
},
- Summary: "2 out of 2 pods are near their memory limits",
+ }
+
+ return ToolOutput{
+ ToolName: "kubernetes_get_pod_resources",
+ Summary: "2 out of 2 pods are near their memory limits",
+ Result: output,
}, nil
}
@@ -387,7 +439,11 @@ func kubernetesGetPodResources(ctx *ai.ToolContext, input
KubernetesGetPodResour
// ================================================
type DubboServiceStatusInput struct {
- ServiceName string `json:"serviceName"
jsonschema:"required,description=The name of the Dubbo service"`
+ ServiceName string `json:"service_name"
jsonschema:"required,description=The name of the Dubbo service"`
+}
+
+func (i DubboServiceStatusInput) String() string {
+ return fmt.Sprintf("DubboServiceStatusInput{ServiceName: %s}",
i.ServiceName)
}
type DubboProvider struct {
@@ -403,20 +459,18 @@ type DubboConsumer struct {
}
type DubboServiceStatusOutput struct {
- ToolName string `json:"toolName"`
Providers []DubboProvider `json:"providers"`
Consumers []DubboConsumer `json:"consumers"`
}
-func (o DubboServiceStatusOutput) Tool() string {
- return o.ToolName
+func (o DubboServiceStatusOutput) String() string {
+ return fmt.Sprintf("DubboServiceStatusOutput{Providers: %d items,
Consumers: %d items}", len(o.Providers), len(o.Consumers))
}
-func dubboServiceStatus(ctx *ai.ToolContext, input DubboServiceStatusInput)
(DubboServiceStatusOutput, error) {
- log.Printf("Tool 'dubbo_service_status' called for service: %s",
input.ServiceName)
+func dubboServiceStatus(ctx *ai.ToolContext, input DubboServiceStatusInput)
(ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'dubbo_service_status' called",
"service", input.ServiceName)
- return DubboServiceStatusOutput{
- ToolName: "dubbo_service_status",
+ output := DubboServiceStatusOutput{
Providers: []DubboProvider{
{IP: "192.168.1.10", Port: 20880, Status: "healthy"},
{IP: "192.168.1.11", Port: 20880, Status: "healthy"},
@@ -425,6 +479,12 @@ func dubboServiceStatus(ctx *ai.ToolContext, input
DubboServiceStatusInput) (Dub
{IP: "192.168.1.20", Application: "web-frontend",
Status: "connected"},
{IP: "192.168.1.21", Application: "api-gateway",
Status: "connected"},
},
+ }
+
+ return ToolOutput{
+ ToolName: "dubbo_service_status",
+ Summary: fmt.Sprintf("服务 %s 的提供者和消费者状态查询完成",
input.ServiceName),
+ Result: output,
}, nil
}
@@ -433,9 +493,13 @@ func dubboServiceStatus(ctx *ai.ToolContext, input
DubboServiceStatusInput) (Dub
// ================================================
type QueryLogDatabaseInput struct {
- ServiceName string `json:"serviceName"
jsonschema:"required,description=The name of the service"`
+ ServiceName string `json:"service_name"
jsonschema:"required,description=The name of the service"`
Keyword string `json:"keyword"
jsonschema:"required,description=Keyword to search for"`
- TimeRangeMinutes int `json:"timeRangeMinutes"
jsonschema:"required,description=Time range in minutes"`
+ TimeRangeMinutes int `json:"time_range_minutes"
jsonschema:"required,description=Time range in minutes"`
+}
+
+func (i QueryLogDatabaseInput) String() string {
+ return fmt.Sprintf("QueryLogDatabaseInput{ServiceName: %s, Keyword: %s,
TimeRangeMinutes: %d}", i.ServiceName, i.Keyword, i.TimeRangeMinutes)
}
type LogEntry struct {
@@ -445,21 +509,18 @@ type LogEntry struct {
}
type QueryLogDatabaseOutput struct {
- ToolName string `json:"toolName"`
- TotalHits int `json:"totalHits"`
+ TotalHits int `json:"total_hits"`
Logs []LogEntry `json:"logs"`
- Summary string `json:"summary"`
}
-func (o QueryLogDatabaseOutput) Tool() string {
- return o.ToolName
+func (o QueryLogDatabaseOutput) String() string {
+ return fmt.Sprintf("QueryLogDatabaseOutput{TotalHits: %d, Logs: %d
items}", o.TotalHits, len(o.Logs))
}
-func queryLogDatabase(ctx *ai.ToolContext, input QueryLogDatabaseInput)
(QueryLogDatabaseOutput, error) {
- log.Printf("Tool 'query_log_database' called for service: %s, keyword:
%s", input.ServiceName, input.Keyword)
+func queryLogDatabase(ctx *ai.ToolContext, input QueryLogDatabaseInput)
(ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'query_log_database' called", "service",
input.ServiceName, "keyword", input.Keyword)
- return QueryLogDatabaseOutput{
- ToolName: "query_log_database",
+ output := QueryLogDatabaseOutput{
TotalHits: 152,
Logs: []LogEntry{
{
@@ -473,7 +534,12 @@ func queryLogDatabase(ctx *ai.ToolContext, input
QueryLogDatabaseInput) (QueryLo
Message: "Timeout waiting for idle object in
database connection pool.",
},
},
- Summary: fmt.Sprintf("在过去%d分钟内,发现 152 条关于 '%s' 的日志条目",
input.TimeRangeMinutes, input.Keyword),
+ }
+
+ return ToolOutput{
+ ToolName: "query_log_database",
+ Summary: fmt.Sprintf("在过去%d分钟内,发现 152 条关于 '%s' 的日志条目",
input.TimeRangeMinutes, input.Keyword),
+ Result: output,
}, nil
}
@@ -482,32 +548,33 @@ func queryLogDatabase(ctx *ai.ToolContext, input
QueryLogDatabaseInput) (QueryLo
// ================================================
type SearchArchivedLogsInput struct {
- FilePathPattern string `json:"filePathPattern"
jsonschema:"required,description=File path pattern to search"`
- GrepKeyword string `json:"grepKeyword"
jsonschema:"required,description=Keyword to grep for"`
+ FilePathPattern string `json:"file_path_pattern"
jsonschema:"required,description=File path pattern to search"`
+ GrepKeyword string `json:"grep_keyword"
jsonschema:"required,description=Keyword to grep for"`
+}
+
+func (i SearchArchivedLogsInput) String() string {
+ return fmt.Sprintf("SearchArchivedLogsInput{FilePathPattern: %s,
GrepKeyword: %s}", i.FilePathPattern, i.GrepKeyword)
}
type MatchingLine struct {
- FilePath string `json:"filePath"`
- LineNumber int `json:"lineNumber"`
- LineContent string `json:"lineContent"`
+ FilePath string `json:"file_path"`
+ LineNumber int `json:"line_number"`
+ LineContent string `json:"line_content"`
}
type SearchArchivedLogsOutput struct {
- ToolName string `json:"toolName"`
- FilesSearched int `json:"filesSearched"`
- MatchingLines []MatchingLine `json:"matchingLines"`
- Summary string `json:"summary"`
+ FilesSearched int `json:"files_searched"`
+ MatchingLines []MatchingLine `json:"matching_lines"`
}
-func (o SearchArchivedLogsOutput) Tool() string {
- return o.ToolName
+func (o SearchArchivedLogsOutput) String() string {
+ return fmt.Sprintf("SearchArchivedLogsOutput{FilesSearched: %d,
MatchingLines: %d items}", o.FilesSearched, len(o.MatchingLines))
}
-func searchArchivedLogs(ctx *ai.ToolContext, input SearchArchivedLogsInput)
(SearchArchivedLogsOutput, error) {
- log.Printf("Tool 'search_archived_logs' called with pattern: %s,
keyword: %s", input.FilePathPattern, input.GrepKeyword)
+func searchArchivedLogs(ctx *ai.ToolContext, input SearchArchivedLogsInput)
(ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'search_archived_logs' called",
"pattern", input.FilePathPattern, "keyword", input.GrepKeyword)
- return SearchArchivedLogsOutput{
- ToolName: "search_archived_logs",
+ output := SearchArchivedLogsOutput{
FilesSearched: 5,
MatchingLines: []MatchingLine{
{
@@ -521,7 +588,12 @@ func searchArchivedLogs(ctx *ai.ToolContext, input
SearchArchivedLogsInput) (Sea
LineContent: "Query_time: 28.1s | SELECT
COUNT(id), SUM(price) FROM orders WHERE user_id = 'VIP_USER_456';",
},
},
- Summary: "在归档的日志文件中,发现了多条查询,搜索了 5 个文件,找到了 2 条匹配行",
+ }
+
+ return ToolOutput{
+ ToolName: "search_archived_logs",
+ Summary: "在归档的日志文件中,发现了多条查询,搜索了 5 个文件,找到了 2 条匹配行",
+ Result: output,
}, nil
}
@@ -530,29 +602,31 @@ func searchArchivedLogs(ctx *ai.ToolContext, input
SearchArchivedLogsInput) (Sea
// ================================================
type QueryKnowledgeBaseInput struct {
- QueryText string `json:"queryText"
jsonschema:"required,description=Query text to search for"`
+ QueryText string `json:"query_text"
jsonschema:"required,description=Query text to search for"`
+}
+
+func (i QueryKnowledgeBaseInput) String() string {
+ return fmt.Sprintf("QueryKnowledgeBaseInput{QueryText: %s}",
i.QueryText)
}
type KnowledgeDocument struct {
Source string `json:"source"`
- ContentSnippet string `json:"contentSnippet"`
- SimilarityScore float64 `json:"similarityScore"`
+ ContentSnippet string `json:"content_snippet"`
+ SimilarityScore float64 `json:"similarity_score"`
}
type QueryKnowledgeBaseOutput struct {
- ToolName string `json:"toolName"`
Documents []KnowledgeDocument `json:"documents"`
}
-func (o QueryKnowledgeBaseOutput) Tool() string {
- return o.ToolName
+func (o QueryKnowledgeBaseOutput) String() string {
+ return fmt.Sprintf("QueryKnowledgeBaseOutput{Documents: %d items}",
len(o.Documents))
}
-func queryKnowledgeBase(ctx *ai.ToolContext, input QueryKnowledgeBaseInput)
(QueryKnowledgeBaseOutput, error) {
- log.Printf("Tool 'query_knowledge_base' called with query: %s",
input.QueryText)
+func queryKnowledgeBase(ctx *ai.ToolContext, input QueryKnowledgeBaseInput)
(ToolOutput, error) {
+ manager.GetLogger().Info("Tool 'query_knowledge_base' called", "query",
input.QueryText)
- return QueryKnowledgeBaseOutput{
- ToolName: "query_knowledge_base",
+ output := QueryKnowledgeBaseOutput{
Documents: []KnowledgeDocument{
{
Source:
"Project-VIP-Feature-Design-Doc.md",
@@ -560,6 +634,12 @@ func queryKnowledgeBase(ctx *ai.ToolContext, input
QueryKnowledgeBaseInput) (Que
SimilarityScore: 0.92,
},
},
+ }
+
+ return ToolOutput{
+ ToolName: "query_knowledge_base",
+ Summary: fmt.Sprintf("知识库查询 '%s' 完成", input.QueryText),
+ Result: output,
}, nil
}
@@ -567,11 +647,13 @@ func queryKnowledgeBase(ctx *ai.ToolContext, input
QueryKnowledgeBaseInput) (Que
// Tool Registration Function
// ================================================
-var registeredTools []ai.Tool
+type MockToolManager struct {
+ registry *genkit.Genkit
+ tools []ai.Tool
+}
-// RegisterAllMockTools registers all mock diagnostic tools with the genkit
instance
-func RegisterAllMockTools(g *genkit.Genkit) {
- registeredTools = []ai.Tool{
+func NewMockToolManager(g *genkit.Genkit) *MockToolManager {
+ tools := []ai.Tool{
genkit.DefineTool(g, "prometheus_query_service_latency",
"查询指定服务在特定时间范围内的 P95/P99 延迟指标", prometheusQueryServiceLatency),
genkit.DefineTool(g, "prometheus_query_service_traffic",
"查询指定服务在特定时间范围内的请求率 (QPS) 和错误率 (Error Rate)", prometheusQueryServiceTraffic),
genkit.DefineTool(g, "query_timeseries_database", "执行一条完整的
PromQL 查询语句,用于进行普罗米修斯历史数据的深度或自定义分析", queryTimeseriesDatabase),
@@ -587,37 +669,35 @@ func RegisterAllMockTools(g *genkit.Genkit) {
genkit.DefineTool(g, "query_knowledge_base",
"在向量数据库中查询与问题相关的历史故障报告或解决方案文档", queryKnowledgeBase),
}
- log.Printf("Registered %d mock diagnostic tools", len(registeredTools))
+ manager.GetLogger().Info("Registered mock diagnostic tools", "count",
len(tools))
+
+ return &MockToolManager{
+ registry: g,
+ tools: tools,
+ }
+}
+
+func (mtm *MockToolManager) Register(tools ...ai.Tool) {
+ mtm.tools = append(mtm.tools, tools...)
}
-func AllMockToolRef() (toolRef []ai.ToolRef, err error) {
- if registeredTools == nil {
+func (mtm *MockToolManager) AllToolRefs() (toolRef []ai.ToolRef, err error) {
+ if mtm.tools == nil {
return nil, fmt.Errorf("no mock tools registered")
}
- for _, tool := range registeredTools {
+ for _, tool := range mtm.tools {
toolRef = append(toolRef, tool)
}
return toolRef, nil
}
-func AllMockToolNames() (toolNames map[string]struct{}, err error) {
- if registeredTools == nil {
+func (mtm *MockToolManager) AllToolNames() (toolNames map[string]struct{}, err
error) {
+ if mtm.tools == nil {
return nil, fmt.Errorf("no mock tools registered")
}
toolNames = make(map[string]struct{})
- for _, tool := range registeredTools {
+ for _, tool := range mtm.tools {
toolNames[tool.Name()] = struct{}{}
}
return toolNames, nil
}
-
-func MockToolRefByNames(names []string) (toolRefs []ai.ToolRef) {
- for _, name := range names {
- for _, tool := range registeredTools {
- if tool.Name() == name {
- toolRefs = append(toolRefs, tool)
- }
- }
- }
- return toolRefs
-}
diff --git a/ai/internal/tools/tools.go b/ai/internal/tools/tools.go
index 17a7bded..8e1bd78e 100644
--- a/ai/internal/tools/tools.go
+++ b/ai/internal/tools/tools.go
@@ -4,36 +4,94 @@ import (
"context"
"encoding/json"
"fmt"
+ "reflect"
+ "github.com/firebase/genkit/go/ai"
"github.com/firebase/genkit/go/genkit"
+ "github.com/mitchellh/mapstructure"
)
-// type ToolOut interface {
-// FromRaw(any) ToolOutput
-// Output() string
-// }
+type ToolInput struct {
+ ToolName string `json:"tool_name"`
+ Parameter any `json:"parameter"`
+}
+
+func (ti ToolInput) String() string {
+ return fmt.Sprintf("ToolInput{Input: %v}", ti.Parameter)
+}
+
+type ToolOutput struct {
+ ToolName string `json:"tool_name"`
+ Summary string `json:"summary"`
+ Result any `json:"result"`
+}
+
+func (to ToolOutput) Map() map[string]any {
+ result := make(map[string]any)
+ result["tool_name"] = to.ToolName
+ result["summary"] = to.Summary
+ result["result"] = to.Result
+ return result
+}
+
+func (to ToolOutput) String() string {
+ result := "ToolOutput{\n"
+ result += fmt.Sprintf(" ToolName: %s\n", to.ToolName)
+ result += fmt.Sprintf(" Summary: %s\n", to.Summary)
+
+ // Format Result based on its type
+ if to.Result != nil {
+ if resultJSON, err := json.MarshalIndent(to.Result, " ", "
"); err == nil {
+ result += fmt.Sprintf(" Result: %s\n",
string(resultJSON))
+ } else {
+ result += fmt.Sprintf(" Result: %v\n", to.Result)
+ }
+ } else {
+ result += " Result: <nil>\n"
+ }
-// func (s StringOutput) Output() string {
-// return s.value
-// }
+ result += "}"
+ return result
+}
+
+type ToolManager interface {
+ AllToolRefs()
+ AllToolNames()
+ Register(...ai.Tool)
+}
-// ToolDesc 表示要执行的工具调用信息
-type ToolDesc struct {
- ToolName string `json:"tool_name"`
- ToolInput map[string]any `json:"tool_input"`
+type ToolMetadata struct {
+ ID string `json:"id"`
+ Description string `json:"description"`
+ InputType reflect.Type `json:"inputType"`
+ OutputType reflect.Type `json:"outputType"`
}
-func (toolDesc ToolDesc) Call(g *genkit.Genkit, ctx context.Context) (string,
error) {
- tool := genkit.LookupTool(g, toolDesc.ToolName)
+func (toolInput ToolInput) Call(g *genkit.Genkit, ctx context.Context)
(toolOutput ToolOutput, err error) {
+ tool := genkit.LookupTool(g, toolInput.ToolName)
if tool == nil {
- return "", fmt.Errorf("tool not found: %s", toolDesc.ToolName)
+ return toolOutput, fmt.Errorf("tool not found: %s",
toolInput.ToolName)
+ }
+
+ // Pass Parameter directly to the tool, not the entire ToolInput
+ rawToolOutput, err := tool.RunRaw(ctx, toolInput.Parameter)
+ if err != nil {
+ return toolOutput, fmt.Errorf("failed to call tool %s: %w",
toolInput.ToolName, err)
+ }
+
+ if rawToolOutput == nil {
+ return toolOutput, fmt.Errorf("tool %s returned nil output",
toolInput.ToolName)
}
- rawToolOutput, err := tool.RunRaw(ctx, toolDesc.ToolInput)
+ err = mapstructure.Decode(rawToolOutput, &toolOutput)
if err != nil {
- return "", fmt.Errorf("failed to call tool %s: %w",
toolDesc.ToolName, err)
+ return toolOutput, fmt.Errorf("failed to decode tool output for
%s: %w", toolInput.ToolName, err)
+ }
+
+ // Ensure ToolName is set
+ if toolOutput.ToolName == "" {
+ toolOutput.ToolName = toolInput.ToolName
}
- jsonOutput, err := json.Marshal(rawToolOutput)
- return string(jsonOutput), err
+ return toolOutput, nil
}
diff --git a/ai/main.go b/ai/main.go
index 9177148b..d76b42b2 100644
--- a/ai/main.go
+++ b/ai/main.go
@@ -1,13 +1,25 @@
package main
import (
- "dubbo-admin-ai/internal/agent"
+ "dubbo-admin-ai/agent/react"
+ "dubbo-admin-ai/internal/manager"
"dubbo-admin-ai/plugins/dashscope"
+ "fmt"
)
func main() {
- if err := agent.InitAgent(dashscope.Qwen3); err != nil {
- panic(err)
+ manager.Init(dashscope.Qwen3.Key(), manager.PrettyLogger())
+ reActAgent := react.Create(manager.GetRegistry())
+
+ agentInput := react.ThinkIn{
+ Content: "我的微服务 order-service 运行缓慢,请帮助我诊断原因",
+ }
+
+ _, resp, err := reActAgent.Interact(agentInput)
+ if err != nil {
+ fmt.Printf("failed to run interaction: %v\n", err)
+ return
}
- select {}
+
+ fmt.Printf("Final Response: %v\n", resp)
}
diff --git a/ai/plugins/dashscope/dashscope.go
b/ai/plugins/dashscope/dashscope.go
index df838530..df81a145 100644
--- a/ai/plugins/dashscope/dashscope.go
+++ b/ai/plugins/dashscope/dashscope.go
@@ -2,11 +2,11 @@ package dashscope
import (
"context"
- "fmt"
+ "dubbo-admin-ai/plugins/model"
"os"
"github.com/firebase/genkit/go/ai"
- "github.com/firebase/genkit/go/core"
+ "github.com/firebase/genkit/go/core/api"
"github.com/firebase/genkit/go/genkit"
"github.com/firebase/genkit/go/plugins/compat_oai"
"github.com/openai/openai-go/option"
@@ -21,44 +21,24 @@ const (
qwen_plus = "qwen-plus"
qwen_flash = "qwen-flash"
qwen3_coder = "qwen3-coder-plus"
-
- Qwen3 = provider + "/" + qwen3_235b_a22b
- Qwen_plus = provider + "/" + qwen_plus
- Qwen_max = provider + "/" + qwen_max
- Qwen3_coder = provider + "/" + qwen3_coder
- Qwen_flash = provider + "/" + qwen_flash
)
var (
- supportedModels = map[string]ai.ModelInfo{
- qwen3_235b_a22b: {
- Label: "qwen3-235b-a22b-instruct-2507",
- Supports: &compat_oai.BasicText,
- Versions: []string{"qwen3-235b-a22b-instruct-2507"},
- },
- qwen_plus: {
- Label: "qwen-plus",
- Supports: &compat_oai.BasicText,
- Versions: []string{"qwen-plus"},
- },
- qwen_max: {
- Label: "qwen-max",
- Supports: &compat_oai.BasicText,
- Versions: []string{"qwen-max"},
- },
- qwen3_coder: {
- Label: "qwen3-coder",
- Supports: &compat_oai.BasicText,
- Versions: []string{"qwen3-coder"},
- },
- qwen_flash: {
- Label: "qwen-flash",
- Supports: &compat_oai.BasicText,
- Versions: []string{"qwen-flash"},
- },
+ Qwen3 = model.New(provider, qwen3_235b_a22b, compat_oai.BasicText)
+ Qwen_plus = model.New(provider, qwen_plus, compat_oai.BasicText)
+ Qwen_max = model.New(provider, qwen_max, compat_oai.BasicText)
+ Qwen3_coder = model.New(provider, qwen3_coder, compat_oai.BasicText)
+ Qwen_flash = model.New(provider, qwen_flash, compat_oai.BasicText)
+
+ supportedModels = []model.Model{
+ Qwen3,
+ Qwen_plus,
+ Qwen_max,
+ Qwen3_coder,
+ Qwen_flash,
}
- knownEmbedders = []string{}
+ // supportedEmbeddingModels = []string{}
)
type DashScope struct {
@@ -75,7 +55,7 @@ func (o *DashScope) Name() string {
}
// Init implements genkit.Plugin.
-func (o *DashScope) Init(ctx context.Context, g *genkit.Genkit) error {
+func (o *DashScope) Init(ctx context.Context) []api.Action {
apiKey := o.APIKey
// if api key is not set, get it from environment variable
@@ -84,7 +64,7 @@ func (o *DashScope) Init(ctx context.Context, g
*genkit.Genkit) error {
}
if apiKey == "" {
- return fmt.Errorf("DashScope plugin initialization failed:
apiKey is required")
+ panic("DashScope plugin initialization failed: apiKey is
required")
}
if o.openAICompatible == nil {
@@ -102,47 +82,40 @@ func (o *DashScope) Init(ctx context.Context, g
*genkit.Genkit) error {
}
o.openAICompatible.Provider = provider
- if err := o.openAICompatible.Init(ctx, g); err != nil {
- return err
- }
+ compatActions := o.openAICompatible.Init(ctx)
- // define default models
- for model, info := range supportedModels {
- if _, err := o.DefineModel(g, model, info); err != nil {
- return err
- }
- }
+ var actions []api.Action
+ actions = append(actions, compatActions...)
- // define default embedders
- for _, embedder := range knownEmbedders {
- if _, err := o.DefineEmbedder(g, embedder); err != nil {
- return err
- }
+ // define default models
+ for _, model := range supportedModels {
+ actions = append(actions, o.DefineModel(model.Key(),
model.Info()).(api.Action))
}
+ //TODO: define default embedders
- return nil
+ return actions
}
func (o *DashScope) Model(g *genkit.Genkit, name string) ai.Model {
- return o.openAICompatible.Model(g, name, provider)
+ return o.openAICompatible.Model(g, api.NewName(provider, name))
}
-func (o *DashScope) DefineModel(g *genkit.Genkit, name string, info
ai.ModelInfo) (ai.Model, error) {
- return o.openAICompatible.DefineModel(g, provider, name, info)
+func (o *DashScope) DefineModel(id string, opts ai.ModelOptions) ai.Model {
+ return o.openAICompatible.DefineModel(provider, id, opts)
}
-func (o *DashScope) DefineEmbedder(g *genkit.Genkit, name string)
(ai.Embedder, error) {
- return o.openAICompatible.DefineEmbedder(g, provider, name)
+func (o *DashScope) DefineEmbedder(id string, opts *ai.EmbedderOptions)
ai.Embedder {
+ return o.openAICompatible.DefineEmbedder(provider, id, opts)
}
func (o *DashScope) Embedder(g *genkit.Genkit, name string) ai.Embedder {
- return o.openAICompatible.Embedder(g, name, provider)
+ return o.openAICompatible.Embedder(g, api.NewName(provider, name))
}
-func (o *DashScope) ListActions(ctx context.Context) []core.ActionDesc {
+func (o *DashScope) ListActions(ctx context.Context) []api.ActionDesc {
return o.openAICompatible.ListActions(ctx)
}
-func (o *DashScope) ResolveAction(g *genkit.Genkit, atype core.ActionType,
name string) error {
- return o.openAICompatible.ResolveAction(g, atype, name)
+func (o *DashScope) ResolveAction(atype api.ActionType, name string)
api.Action {
+ return o.openAICompatible.ResolveAction(atype, name)
}
diff --git a/ai/plugins/model/model.go b/ai/plugins/model/model.go
new file mode 100644
index 00000000..cb2d5764
--- /dev/null
+++ b/ai/plugins/model/model.go
@@ -0,0 +1,32 @@
+package model
+
+import (
+ "github.com/firebase/genkit/go/ai"
+)
+
+type Model struct {
+ provider string
+ label string // Label is the internal model representation of
different providers.
+ info ai.ModelSupports
+}
+
+func New(provider string, label string, info ai.ModelSupports) Model {
+ return Model{
+ provider: provider,
+ label: label,
+ info: info,
+ }
+}
+
+// Key is the model query string of genkit registry.
+func (m Model) Key() string {
+ return m.provider + "/" + m.label
+}
+
+func (m Model) Info() ai.ModelOptions {
+ return ai.ModelOptions{
+ Label: m.label,
+ Supports: &m.info,
+ Versions: []string{m.label},
+ }
+}
diff --git a/ai/plugins/siliconflow/siliconflow.go
b/ai/plugins/siliconflow/siliconflow.go
index 00597baf..e5b5b06a 100644
--- a/ai/plugins/siliconflow/siliconflow.go
+++ b/ai/plugins/siliconflow/siliconflow.go
@@ -2,11 +2,11 @@ package siliconflow
import (
"context"
- "fmt"
+ "dubbo-admin-ai/plugins/model"
"os"
"github.com/firebase/genkit/go/ai"
- "github.com/firebase/genkit/go/core"
+ "github.com/firebase/genkit/go/core/api"
"github.com/firebase/genkit/go/genkit"
"github.com/firebase/genkit/go/plugins/compat_oai"
"github.com/openai/openai-go/option"
@@ -20,38 +20,22 @@ const (
deepseekR1 = "deepseek-ai/DeepSeek-R1"
qwenQwQ32B = "Qwen/QwQ-32B"
qwen3Coder = "Qwen/Qwen3-Coder-480B-A35B-Instruct"
-
- DeepSeekV3 = provider + "/" + deepseekV3
- QwenQwQ32B = provider + "/" + qwenQwQ32B
- Qwen3Coder = provider + "/" + qwen3Coder
- DeepSeekR1 = provider + "/" + deepseekR1
)
var (
- supportedModels = map[string]ai.ModelInfo{
- deepseekV3: {
- Label: "deepseek-ai/DeepSeek-V3",
- Supports: &compat_oai.BasicText,
- Versions: []string{"DeepSeek-V3-0324"},
- },
- qwen3Coder: {
- Label: "Qwen/Qwen3-Coder-480B-A35B-Instruct",
- Supports: &compat_oai.Multimodal,
- Versions: []string{"Qwen3-Coder-480B-A35B"},
- },
- qwenQwQ32B: {
- Label: "Qwen/QwQ-32B",
- Supports: &compat_oai.BasicText,
- Versions: []string{"QwQ-32B"},
- },
- deepseekR1: {
- Label: "deepseek-ai/DeepSeek-R1",
- Supports: &compat_oai.BasicText,
- Versions: []string{"DeepSeek-R1-0528"},
- },
+ DeepSeekV3 = model.New(provider, deepseekV3, compat_oai.BasicText)
+ QwenQwQ32B = model.New(provider, qwenQwQ32B, compat_oai.BasicText)
+ Qwen3Coder = model.New(provider, qwen3Coder, compat_oai.Multimodal)
+ DeepSeekR1 = model.New(provider, deepseekR1, compat_oai.BasicText)
+
+ supportedModels = []model.Model{
+ DeepSeekV3,
+ QwenQwQ32B,
+ Qwen3Coder,
+ DeepSeekR1,
}
- knownEmbedders = []string{}
+ // supportedEmbeddingModels = []string{}
)
type SiliconFlow struct {
@@ -67,8 +51,7 @@ func (o *SiliconFlow) Name() string {
return provider
}
-// Init implements genkit.Plugin.
-func (o *SiliconFlow) Init(ctx context.Context, g *genkit.Genkit) error {
+func (o *SiliconFlow) Init(ctx context.Context) []api.Action {
apiKey := o.APIKey
// if api key is not set, get it from environment variable
@@ -77,7 +60,7 @@ func (o *SiliconFlow) Init(ctx context.Context, g
*genkit.Genkit) error {
}
if apiKey == "" {
- return fmt.Errorf("siliconflow plugin initialization failed:
apiKey is required")
+ panic("SiliconFlow plugin initialization failed: apiKey is
required")
}
if o.openAICompatible == nil {
@@ -95,47 +78,40 @@ func (o *SiliconFlow) Init(ctx context.Context, g
*genkit.Genkit) error {
}
o.openAICompatible.Provider = provider
- if err := o.openAICompatible.Init(ctx, g); err != nil {
- return err
- }
+ compatActions := o.openAICompatible.Init(ctx)
- // define default models
- for model, info := range supportedModels {
- if _, err := o.DefineModel(g, model, info); err != nil {
- return err
- }
- }
+ var actions []api.Action
+ actions = append(actions, compatActions...)
- // define default embedders
- for _, embedder := range knownEmbedders {
- if _, err := o.DefineEmbedder(g, embedder); err != nil {
- return err
- }
+ // define default models
+ for _, model := range supportedModels {
+ actions = append(actions, o.DefineModel(model.Key(),
model.Info()).(api.Action))
}
+ //TODO: define default embedders
- return nil
+ return actions
}
func (o *SiliconFlow) Model(g *genkit.Genkit, name string) ai.Model {
- return o.openAICompatible.Model(g, name, provider)
+ return o.openAICompatible.Model(g, api.NewName(provider, name))
}
-func (o *SiliconFlow) DefineModel(g *genkit.Genkit, name string, info
ai.ModelInfo) (ai.Model, error) {
- return o.openAICompatible.DefineModel(g, provider, name, info)
+func (o *SiliconFlow) DefineModel(id string, opts ai.ModelOptions) ai.Model {
+ return o.openAICompatible.DefineModel(provider, id, opts)
}
-func (o *SiliconFlow) DefineEmbedder(g *genkit.Genkit, name string)
(ai.Embedder, error) {
- return o.openAICompatible.DefineEmbedder(g, provider, name)
+func (o *SiliconFlow) DefineEmbedder(id string, opts *ai.EmbedderOptions)
ai.Embedder {
+ return o.openAICompatible.DefineEmbedder(provider, id, opts)
}
func (o *SiliconFlow) Embedder(g *genkit.Genkit, name string) ai.Embedder {
- return o.openAICompatible.Embedder(g, name, provider)
+ return o.openAICompatible.Embedder(g, api.NewName(provider, name))
}
-func (o *SiliconFlow) ListActions(ctx context.Context) []core.ActionDesc {
+func (o *SiliconFlow) ListActions(ctx context.Context) []api.ActionDesc {
return o.openAICompatible.ListActions(ctx)
}
-func (o *SiliconFlow) ResolveAction(g *genkit.Genkit, atype core.ActionType,
name string) error {
- return o.openAICompatible.ResolveAction(g, atype, name)
+func (o *SiliconFlow) ResolveAction(atype api.ActionType, name string)
api.Action {
+ return o.openAICompatible.ResolveAction(atype, name)
}
diff --git a/ai/schema/react_schema.go b/ai/schema/react_schema.go
new file mode 100644
index 00000000..2aab7abc
--- /dev/null
+++ b/ai/schema/react_schema.go
@@ -0,0 +1,136 @@
+package schema
+
+import (
+ "dubbo-admin-ai/internal/tools"
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "strings"
+
+ "github.com/firebase/genkit/go/ai"
+ "github.com/invopop/jsonschema"
+)
+
+// StreamChunk represents streaming status information for ReAct Agent
+type StreamChunk struct {
+ Stage string `json:"stage"` // "think" | "act"
+ Chunk *ai.ModelResponseChunk `json:"chunk"`
+}
+
+var (
+ UserThinkPromptTemplateWithSchema = `input:
+{{#if content}} content: {{content}} {{/if}}
+{{#if tool_responses}} tool_responses: {{tool_responses}} {{/if}}
+{{#if thought}} thought: {{thought}} {{/if}}
+The output must be a JSON object that conforms to the following schema:
+` + ThinkOutput{}.JsonSchema()
+
+ UserThinkPromptTemplate = `input:
+{{#if content}} content: {{content}} {{/if}}
+{{#if tool_responses}} tool_responses: {{tool_responses}} {{/if}}
+{{#if thought}} thought: {{thought}} {{/if}}
+The output must be a JSON object that conforms to the following schema:
+`
+)
+
+type ThinkInput struct {
+ Content string `json:"content,omitempty"`
+ ToolResponses []tools.ToolOutput `json:"tool_responses,omitempty"`
+}
+
+func (i ThinkInput) Validate(T reflect.Type) error {
+ if reflect.TypeOf(i) != T {
+ return fmt.Errorf("ThinkInput: %v is not of type %v", i, T)
+ }
+ return nil
+}
+
+func (i ThinkInput) String() string {
+ data, err := json.MarshalIndent(i, "", " ")
+ if err != nil {
+ return fmt.Sprintf("ThinkInput{error: %v}", err)
+ }
+ return string(data)
+}
+
+type ThinkOutput struct {
+ ToolRequests []tools.ToolInput `json:"tool_requests,omitempty"`
+ Thought string `json:"thought"`
+ Status Status `json:"status,omitempty"
jsonschema:"enum=CONTINUED,enum=FINISHED"`
+ FinalAnswer string `json:"final_answer,omitempty"
jsonschema:"required=false"`
+}
+
+func (ta ThinkOutput) JsonSchema() string {
+ // Create Reflector
+ reflector := &jsonschema.Reflector{
+ AllowAdditionalProperties: false,
+ RequiredFromJSONSchemaTags: true,
+ DoNotReference: true,
+ }
+
+ // Generate schema
+ schema := reflector.Reflect(ta)
+
+ // Convert to JSON
+ data, err := json.MarshalIndent(schema, "", " ")
+ if err != nil {
+ return fmt.Sprintf("JsonSchema{error: %v}", err)
+ }
+ return string(data)
+}
+
+func (ta ThinkOutput) Validate(T reflect.Type) error {
+ if reflect.TypeOf(ta) != T {
+ return fmt.Errorf("ThinkOutput: %v is not of type %v", ta, T)
+ }
+ return nil
+}
+
+func (ta ThinkOutput) String() string {
+ data, err := json.MarshalIndent(ta, "", " ")
+ if err != nil {
+ return fmt.Sprintf("ThinkOutput{error: %v}", err)
+ }
+ return string(data)
+}
+
+type ToolOutputs struct {
+ Outputs []tools.ToolOutput `json:"tool_responses"`
+}
+
+func (to ToolOutputs) Validate(T reflect.Type) error {
+ if reflect.TypeOf(to) != T {
+ return fmt.Errorf("ToolRequest: %v is not of type %v", to, T)
+ }
+ return nil
+}
+
+func (to *ToolOutputs) Add(output *tools.ToolOutput) {
+ to.Outputs = append(to.Outputs, *output)
+}
+
+func (to ToolOutputs) String() string {
+ if len(to.Outputs) == 0 {
+ return "ToolOutputs[\n <no outputs>\n]"
+ }
+
+ result := "ToolOutputs[\n"
+ for i, output := range to.Outputs {
+ // Indent each ToolOutput
+ outputStr := output.String()
+ lines := strings.Split(outputStr, "\n")
+ for j, line := range lines {
+ if j == len(lines)-1 && line == "" {
+ continue // Skip empty last line
+ }
+ result += " " + line + "\n"
+ }
+
+ // Add separator between outputs except for the last one
+ if i < len(to.Outputs)-1 {
+ result += "\n"
+ }
+ }
+ result += "]"
+ return result
+}
diff --git a/ai/schema/schema.go b/ai/schema/schema.go
new file mode 100644
index 00000000..820872b9
--- /dev/null
+++ b/ai/schema/schema.go
@@ -0,0 +1,17 @@
+package schema
+
+import (
+ "reflect"
+)
+
+type Status string
+
+const (
+ Continued Status = "CONTINUED"
+ Finished Status = "FINISHED"
+ // Pending Status = "PENDING"
+)
+
+type Schema interface {
+ Validate(T reflect.Type) error
+}
diff --git a/ai/test/flow_test.go b/ai/test/flow_test.go
deleted file mode 100644
index 39c6c3b6..00000000
--- a/ai/test/flow_test.go
+++ /dev/null
@@ -1,107 +0,0 @@
-package test
-
-import (
- "context"
- "dubbo-admin-ai/internal/agent"
- "dubbo-admin-ai/internal/schema"
- "encoding/json"
- "fmt"
- "testing"
-)
-
-func TestThinking(t *testing.T) {
- ctx := context.Background()
- agentInput := schema.ReActInput{
- UserInput: "我的微服务 order-service 运行缓慢,请帮助我诊断原因",
- }
-
- resp, err := agent.ThinkingFlow.Run(ctx, agentInput)
- if err != nil {
- t.Fatalf("failed to run thinking flow: %v", err)
- }
- if resp == nil {
- t.Fatal("expected non-nil response")
- }
-
- fmt.Print(resp)
-}
-
-func TestAct(t *testing.T) {
- ctx := context.Background()
-
- actInJson := `{
- "tool_requests": [
- {
- "thought": "查询order-service的P95和P99延迟指标,以确认性能问题的严重程度。",
- "tool_desc": {
- "tool_name": "prometheus_query_service_latency",
- "tool_input": {
- "quantile": 0.95,
- "serviceName": "order-service",
- "timeRangeMinutes": 10
- }
- }
- },
- {
- "thought": "查询order-service的QPS和错误率,评估服务负载和健康状态。",
- "tool_desc": {
- "tool_name": "prometheus_query_service_traffic",
- "tool_input": {
- "serviceName": "order-service",
- "timeRangeMinutes": 10
- }
- }
- },
- {
- "thought": "分析order-service的链路追踪数据,找出延迟最高的下游调用。",
- "tool_desc": {
- "tool_name": "trace_latency_analysis",
- "tool_input": {
- "serviceName": "order-service",
- "timeRangeMinutes": 10
- }
- }
- },
- {
- "thought": "检查order-service的Dubbo提供者和消费者列表及其状态,确认服务依赖是否正常。",
- "tool_desc": {
- "tool_name": "dubbo_service_status",
- "tool_input": {
- "serviceName": "order-service"
- }
- }
- }
- ],
- "status": "CONTINUED",
- "thought":
"初步分析表明order-service的性能问题可能有多个潜在原因,包括高延迟的下游调用、资源不足、JVM问题或数据库连接池问题等。首先需要收集系统性能数据,以便进一步诊断。"
-}`
- actIn := &schema.ThinkAggregation{}
- if err := json.Unmarshal([]byte(actInJson), actIn); err != nil {
- t.Fatalf("failed to unmarshal actInJson: %v", err)
- }
-
- resp, err := agent.ActFlow.Run(ctx, actIn)
- if err != nil {
- t.Fatalf("failed to run act flow: %v", err)
- }
- if resp == nil {
- t.Fatal("expected non-nil response")
- }
-
- for _, r := range resp {
- fmt.Println(r)
- }
-}
-
-func TestReAct(t *testing.T) {
- ctx := context.Background()
- agentInput := schema.ReActInput{
- UserInput: "我的微服务 order-service 运行缓慢,请帮助我诊断原因",
- }
-
- reActResp, err := agent.ReActFlow.Run(ctx, agentInput)
- if err != nil {
- t.Fatalf("failed to run reAct flow: %v", err)
- }
- fmt.Println(reActResp)
-}
diff --git a/ai/test/llm_test.go b/ai/test/llm_test.go
index b2081eaa..71f13eaf 100644
--- a/ai/test/llm_test.go
+++ b/ai/test/llm_test.go
@@ -2,7 +2,9 @@ package test
import (
"context"
+ "dubbo-admin-ai/agent/react"
"dubbo-admin-ai/internal/manager"
+ "dubbo-admin-ai/plugins/dashscope"
"fmt"
"log"
"testing"
@@ -39,11 +41,10 @@ func defineWeatherFlow(g *genkit.Genkit)
*core.Flow[WeatherInput, string, struct
}
func TestTextGeneration(t *testing.T) {
+ manager.Init(dashscope.Qwen3.Key(), nil)
+ _ = react.Create(manager.GetRegistry())
ctx := context.Background()
- g, err := manager.GetGlobalGenkit()
- if err != nil {
- t.Fatalf("failed to initialize genkit: %v", err)
- }
+ g := manager.GetRegistry()
resp, err := genkit.GenerateText(ctx, g, ai.WithPrompt("Hello, Who are
you?"))
if err != nil {
@@ -55,11 +56,10 @@ func TestTextGeneration(t *testing.T) {
}
func TestWeatherFlowRun(t *testing.T) {
+ manager.Init(dashscope.Qwen3.Key(), nil)
+ _ = react.Create(manager.GetRegistry())
ctx := context.Background()
- g, err := manager.GetGlobalGenkit()
- if err != nil {
- t.Fatalf("failed to initialize genkit: %v", err)
- }
+ g := manager.GetRegistry()
flow := defineWeatherFlow(g)
flow.Run(ctx, WeatherInput{Location: "San Francisco"})
diff --git a/ai/test/test.go b/ai/test/test.go
deleted file mode 100644
index 2e302f44..00000000
--- a/ai/test/test.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package test
-
-import (
- "dubbo-admin-ai/internal/agent"
- "dubbo-admin-ai/plugins/dashscope"
-)
-
-func init() {
- if err := agent.InitAgent(dashscope.Qwen3); err != nil {
- panic(err)
- }
-}
diff --git a/ai/utils/utils.go b/ai/utils/utils.go
index 03ecc837..d6fa4870 100644
--- a/ai/utils/utils.go
+++ b/ai/utils/utils.go
@@ -7,43 +7,43 @@ import (
"path/filepath"
)
-// CopyFile 复制源文件内容到目标文件,如果目标文件不存在,则会创建该文件
-// srcPath: 源文件路径
-// dstPath: 目标文件路径
+// CopyFile copies source file content to target file, creates the file if
target doesn't exist
+// srcPath: source file path
+// dstPath: target file path
func CopyFile(srcPath, dstPath string) error {
- // 打开源文件
+ // Open source file
srcFile, err := os.Open(srcPath)
if err != nil {
return fmt.Errorf("failed to open source file %s: %w", srcPath,
err)
}
defer srcFile.Close()
- // 获取源文件信息
+ // Get source file info
srcInfo, err := srcFile.Stat()
if err != nil {
return fmt.Errorf("failed to get source file info %s: %w",
srcPath, err)
}
- // 确保目标目录存在
+ // Ensure target directory exists
dstDir := filepath.Dir(dstPath)
if err := os.MkdirAll(dstDir, 0755); err != nil {
return fmt.Errorf("failed to create target directory %s: %w",
dstDir, err)
}
- // 创建或覆盖目标文件
+ // Create or overwrite target file
dstFile, err := os.Create(dstPath)
if err != nil {
return fmt.Errorf("failed to create target file %s: %w",
dstPath, err)
}
defer dstFile.Close()
- // 复制文件内容
+ // Copy file content
_, err = io.Copy(dstFile, srcFile)
if err != nil {
return fmt.Errorf("failed to copy file content: %w", err)
}
- // 设置目标文件权限与源文件相同
+ // Set target file permissions same as source file
if err := os.Chmod(dstPath, srcInfo.Mode()); err != nil {
return fmt.Errorf("failed to set file permissions: %w", err)
}