AlexStocks commented on code in PR #802: URL: https://github.com/apache/dubbo-go-samples/pull/802#discussion_r1985990016
########## llm/go-client/frontend/handlers/chat.go: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package handlers + +import ( + "context" + "io" + "log" + "net/http" + "runtime/debug" + "strings" + "time" +) + +import ( + "github.com/apache/dubbo-go-samples/llm/go-client/frontend/service" + chat "github.com/apache/dubbo-go-samples/llm/proto" + "github.com/gin-contrib/sessions" + "github.com/gin-gonic/gin" +) + +type ChatHandler struct { + svc chat.ChatService + ctxManager *service.ContextManager +} + +func NewChatHandler(svc chat.ChatService, mgr *service.ContextManager) *ChatHandler { + return &ChatHandler{ + svc: svc, + ctxManager: mgr, + } +} + +func (h *ChatHandler) Index(c *gin.Context) { + session := sessions.Default(c) + ctxID := session.Get("current_context") + if ctxID == nil { + ctxID = h.ctxManager.CreateContext() + session.Set("current_context", ctxID) + session.Save() + } + + c.HTML(http.StatusOK, "index.html", gin.H{ + "title": "LLM Chat", + }) +} + +func (h *ChatHandler) Chat(c *gin.Context) { + session := sessions.Default(c) + ctxID, ok := session.Get("current_context").(string) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid session context"}) + return + } + + var req struct { + Message string `json:"message"` + Bin string `json:"bin"` + } + + if err := c.BindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request payload"}) + return + } + + sepIndex := strings.Index(",", req.Bin) + + messages := h.ctxManager.GetHistory(ctxID) + messages = append(messages, &chat.ChatMessage{ + Role: "human", + Content: req.Message, + Bin: []byte(req.Bin[sepIndex+1:]), + }) + + stream, err := h.svc.Chat(context.Background(), &chat.ChatRequest{ + Messages: messages, + }) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + defer func() { + if err := stream.Close(); err != nil { + log.Println("Error closing stream:", err) + } + }() + + // 设置响应头 + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "close") + + responseCh := make(chan string, 100) // 使用缓冲区 + + // 流处理协程 + go func() { + defer func() { + if r := recover(); r != nil { + log.Printf("Recovered in stream processing: %v\n%s", r, debug.Stack()) + } + close(responseCh) + }() + + for { + select { + case <-c.Request.Context().Done(): // 客户端断开 + log.Println("Client disconnected, stopping stream processing") + return + default: + if !stream.Recv() { + if err := stream.Err(); err != nil { + log.Printf("Stream receive error: %v", err) + } + return + } + content := stream.Msg().Content + responseCh <- content + } + } + }() + + // SSE stream output + c.Stream(func(w io.Writer) bool { + select { + case chunk, ok := <-responseCh: + if !ok { + return false + } + c.SSEvent("message", gin.H{"content": chunk}) + return true + case <-time.After(30 * time.Second): + log.Println("Stream timed out") + return false + case <-c.Request.Context().Done(): + log.Println("Client disconnected") + return false + } + }) +} + +func (h *ChatHandler) NewContext(c *gin.Context) { + session := sessions.Default(c) + newCtxID := h.ctxManager.CreateContext() + session.Set("current_context", newCtxID) + session.Save() + + c.JSON(http.StatusOK, gin.H{ + "context_id": newCtxID, + }) +} + +func (h *ChatHandler) ListContexts(c *gin.Context) { + session := sessions.Default(c) + currentCtx := session.Get("current_context").(string) + + h.ctxManager.Mu.RLock() + defer h.ctxManager.Mu.RUnlock() + + contexts := make([]string, 0, len(h.ctxManager.Contexts)) + for ctxID := range h.ctxManager.Contexts { + contexts = append(contexts, ctxID) + } + + c.JSON(http.StatusOK, gin.H{ + "current": currentCtx, + "contexts": contexts, + }) +} + +func (h *ChatHandler) SwitchContext(c *gin.Context) { + var req struct { + ContextID string `json:"context_id"` + } + if err := c.BindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + h.ctxManager.Mu.RLock() + defer h.ctxManager.Mu.RUnlock() Review Comment: 这个 lock scope 会不会太大了?line 198 和 line 206 的网络回包也被 lock 进去了 ########## llm/go-client/frontend/main.go: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import "fmt" + +import ( Review Comment: 用工具 format 下 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
