GitHub user AlexStocks created a discussion: LLM 推理 KV Cache 分布式缓存方案

## 概述

本文档描述如何在 Dubbo-Go-Pixiu AI 网关中实现 LLM 推理的 KV Cache 分布式缓存功能,通过将 KV Cache 卸载到 
Redis 集群,实现无状态 LLM 推理架构,大幅降低推理成本。

## 背景与痛点

### 当前 LLM 推理架构的局限性

1. **状态耦合问题**
   - KV Cache 绑定在特定推理服务器的 GPU 显存或本地内存
   - 多轮对话必须路由到同一服务器才能命中缓存
   - 缓存未命中需要重新执行预填充(Prefill),成本高达 10 倍

2. **弹性扩展受限**
   - 服务器宕机导致缓存丢失
   - 无法按需水平扩展推理实例
   - 负载均衡需要感知缓存分布,调度复杂度高

3. **资源利用率低**
   - 需要预留缓存空间避免冲突
   - 服务器无法满负载运行
   - GPU 资源浪费

### 解决方案:无状态 LLM 推理架构

借鉴互联网后端架构演进经验,将 KV Cache 从推理实例中剥离,卸载到分布式存储(Redis 集群),实现:

-  **无状态推理服务**:任意实例可处理任意请求
-  **100% 缓存命中率**:多轮对话共享全局 KV Cache
-  **弹性扩展**:按需增减推理实例,无状态迁移
-  **成本降低**:避免预填充重计算,降本 50%+

---

## 架构设计

### 整体架构图

```
┌──────────────────────────────────────────────────────────────┐
│                      AI Client Applications                  │
└────────────────────────────┬─────────────────────────────────┘
                             │
┌────────────────────────────▼─────────────────────────────────┐
│               Pixiu AI Gateway (Stateless)                   │
│  ┌────────────────────────────────────────────────────────┐  │
│  │  AI KV Cache Filter (dgp.filter.ai.kvcache)            │  │
│  │  - Cache Key Generation                                │  │
│  │  - KV Cache Offload/Load                               │  │
│  │  - Cache Hit/Miss Handling                             │  │
│  └────────────────────────────────────────────────────────┘  │
└────────────────────────────┬─────────────────────────────────┘
                             │
        ┌────────────────────┼────────────────────┐
        │                    │                    │
        ▼                    ▼                    ▼
┌───────────────┐    ┌───────────────┐    ┌───────────────┐
│Redis Cluster  │    │Redis Cluster  │    │Redis Cluster  │
│  Node 1       │    │  Node 2       │    │  Node 3       │
│(Master+Slave) │    │(Master+Slave) │    │(Master+Slave) │
└───────┬───────┘    └───────┬───────┘    └───────┬───────┘
        │                    │                    │
        └────────────────────┼────────────────────┘
                             │
        ┌────────────────────┼────────────────────┐
        │                    │                    │
        ▼                    ▼                    ▼
┌───────────────┐    ┌───────────────┐    ┌───────────────┐
│LLM Inference  │    │LLM Inference  │    │LLM Inference  │
│  Instance 1   │    │  Instance 2   │    │  Instance N   │
│ (SGLang/vLLM) │    │ (SGLang/vLLM) │    │ (SGLang/vLLM) │
└───────────────┘    └───────────────┘    └───────────────┘
```

### 核心组件

#### 1. AI KV Cache Filter

位于 Pixiu 网关的请求处理链路中,负责:

- **缓存键生成**:基于会话 ID、模型名、Prompt Hash 生成唯一缓存键
- **KV Cache 卸载**:推理完成后,将 KV Cache 存入 Redis 集群
- **KV Cache 加载**:推理前,从 Redis 集群读取历史 KV Cache
- **缓存淘汰**:支持 TTL、LRU、容量上限等淘汰策略
- **性能优化**:零拷贝、批量操作、异步写入

#### 2. Redis 集群

- **部署模式**:Redis Cluster(主从复制 + 分片)
- **数据结构**:使用 String 类型存储序列化的 KV Cache
- **持久化**:RDB + AOF 混合持久化,防止数据丢失
- **高可用**:Sentinel 监控,自动故障转移

#### 3. LLM 推理实例(SGLang/vLLM)

- **状态剥离**:不维护本地 KV Cache,完全依赖 Redis
- **接口扩展**:支持接收外部 KV Cache 输入
- **兼容性**:适配 SGLang RadixAttention、vLLM PagedAttention

---

## 技术实现

### 3.1 目录结构

```
pkg/filter/ai/kvcache/
├── kvcache.go              # Filter 主逻辑
├── cache_manager.go        # 缓存管理器
├── redis_backend.go        # Redis 后端实现
├── cache_key.go            # 缓存键生成器
├── serializer.go           # KV Cache 序列化/反序列化
├── metrics.go              # 指标采集
└── config.go               # 配置定义
```

### 3.2 核心接口设计

#### KVCacheManager Interface

```go
// pkg/filter/ai/kvcache/cache_manager.go
package kvcache

import (
        "context"
        "time"
)

// KVCacheManager 管理 LLM 推理的 KV Cache
type KVCacheManager interface {
        // Get 获取 KV Cache
        Get(ctx context.Context, key string) (*KVCache, error)
        
        // Set 存储 KV Cache
        Set(ctx context.Context, key string, cache *KVCache, ttl time.Duration) 
error
        
        // Delete 删除 KV Cache
        Delete(ctx context.Context, key string) error
        
        // Exists 检查 KV Cache 是否存在
        Exists(ctx context.Context, key string) (bool, error)
        
        // GetMulti 批量获取 KV Cache
        GetMulti(ctx context.Context, keys []string) (map[string]*KVCache, 
error)
        
        // SetMulti 批量存储 KV Cache
        SetMulti(ctx context.Context, caches map[string]*KVCache, ttl 
time.Duration) error
        
        // Stats 返回缓存统计信息
        Stats() CacheStats
        
        // Close 关闭缓存管理器
        Close() error
}

// KVCache 表示 LLM 推理的 KV Cache 数据
type KVCache struct {
        SessionID   string    // 会话 ID
        ModelName   string    // 模型名称
        PromptHash  string    // Prompt 的 Hash
        Keys        []byte    // K Cache(序列化后的字节流)
        Values      []byte    // V Cache(序列化后的字节流)
        TokenCount  int       // Token 数量
        LayerCount  int       // 层数
        HeadCount   int       // 注意力头数
        HiddenSize  int       // 隐藏层大小
        CreatedAt   time.Time // 创建时间
        AccessedAt  time.Time // 最后访问时间
        Metadata    map[string]string // 扩展元数据
}

// CacheStats 缓存统计信息
type CacheStats struct {
        HitCount        int64   // 缓存命中次数
        MissCount       int64   // 缓存未命中次数
        HitRatio        float64 // 缓存命中率
        TotalKeys       int64   // 总缓存键数
        TotalSizeBytes  int64   // 总缓存大小(字节)
        AvgGetLatencyMs float64 // 平均读取延迟(毫秒)
        AvgSetLatencyMs float64 // 平均写入延迟(毫秒)
}
```

#### Redis Backend Implementation

```go
// pkg/filter/ai/kvcache/redis_backend.go
package kvcache

import (
        "context"
        "encoding/json"
        "fmt"
        "sync/atomic"
        "time"

        "github.com/go-redis/redis/v8"
)

// RedisKVCacheManager Redis 实现的 KV Cache 管理器
type RedisKVCacheManager struct {
        client    redis.UniversalClient
        keyPrefix string
        
        // 统计信息
        hitCount  int64
        missCount int64
        
        // 性能指标
        getTotalLatency int64 // 纳秒
        setTotalLatency int64 // 纳秒
        getCount        int64
        setCount        int64
}

// NewRedisKVCacheManager 创建 Redis KV Cache 管理器
func NewRedisKVCacheManager(config *RedisConfig) (*RedisKVCacheManager, error) {
        var client redis.UniversalClient
        
        if config.ClusterMode {
                // Redis Cluster 模式
                client = redis.NewClusterClient(&redis.ClusterOptions{
                        Addrs:        config.Addrs,
                        Password:     config.Password,
                        PoolSize:     config.PoolSize,
                        MinIdleConns: config.MinIdleConns,
                        DialTimeout:  config.DialTimeout,
                        ReadTimeout:  config.ReadTimeout,
                        WriteTimeout: config.WriteTimeout,
                })
        } else {
                // Redis 单机/Sentinel 模式
                client = redis.NewClient(&redis.Options{
                        Addr:         config.Addrs[0],
                        Password:     config.Password,
                        DB:           config.DB,
                        PoolSize:     config.PoolSize,
                        MinIdleConns: config.MinIdleConns,
                        DialTimeout:  config.DialTimeout,
                        ReadTimeout:  config.ReadTimeout,
                        WriteTimeout: config.WriteTimeout,
                })
        }
        
        // 测试连接
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        if err := client.Ping(ctx).Err(); err != nil {
                return nil, fmt.Errorf("failed to connect to Redis: %w", err)
        }
        
        return &RedisKVCacheManager{
                client:    client,
                keyPrefix: config.KeyPrefix,
        }, nil
}

// Get 获取 KV Cache
func (m *RedisKVCacheManager) Get(ctx context.Context, key string) (*KVCache, 
error) {
        start := time.Now()
        defer func() {
                atomic.AddInt64(&m.getTotalLatency, 
time.Since(start).Nanoseconds())
                atomic.AddInt64(&m.getCount, 1)
        }()
        
        redisKey := m.makeRedisKey(key)
        data, err := m.client.Get(ctx, redisKey).Bytes()
        if err == redis.Nil {
                atomic.AddInt64(&m.missCount, 1)
                return nil, ErrCacheNotFound
        }
        if err != nil {
                return nil, fmt.Errorf("failed to get cache from Redis: %w", 
err)
        }
        
        // 反序列化
        var cache KVCache
        if err := json.Unmarshal(data, &cache); err != nil {
                return nil, fmt.Errorf("failed to unmarshal cache: %w", err)
        }
        
        // 更新访问时间
        cache.AccessedAt = time.Now()
        atomic.AddInt64(&m.hitCount, 1)
        
        return &cache, nil
}

// Set 存储 KV Cache
func (m *RedisKVCacheManager) Set(ctx context.Context, key string, cache 
*KVCache, ttl time.Duration) error {
        start := time.Now()
        defer func() {
                atomic.AddInt64(&m.setTotalLatency, 
time.Since(start).Nanoseconds())
                atomic.AddInt64(&m.setCount, 1)
        }()
        
        // 序列化
        data, err := json.Marshal(cache)
        if err != nil {
                return fmt.Errorf("failed to marshal cache: %w", err)
        }
        
        redisKey := m.makeRedisKey(key)
        if err := m.client.Set(ctx, redisKey, data, ttl).Err(); err != nil {
                return fmt.Errorf("failed to set cache to Redis: %w", err)
        }
        
        return nil
}

// GetMulti 批量获取 KV Cache
func (m *RedisKVCacheManager) GetMulti(ctx context.Context, keys []string) 
(map[string]*KVCache, error) {
        if len(keys) == 0 {
                return make(map[string]*KVCache), nil
        }
        
        // 构建 Redis 键列表
        redisKeys := make([]string, len(keys))
        for i, key := range keys {
                redisKeys[i] = m.makeRedisKey(key)
        }
        
        // 使用 Pipeline 批量获取
        pipe := m.client.Pipeline()
        cmds := make([]*redis.StringCmd, len(redisKeys))
        for i, redisKey := range redisKeys {
                cmds[i] = pipe.Get(ctx, redisKey)
        }
        
        if _, err := pipe.Exec(ctx); err != nil && err != redis.Nil {
                // 忽略 redis.Nil 错误(部分键不存在是正常的)
                if !isRedisNilError(err) {
                        return nil, fmt.Errorf("failed to execute pipeline: 
%w", err)
                }
        }
        
        // 解析结果
        result := make(map[string]*KVCache)
        for i, cmd := range cmds {
                data, err := cmd.Bytes()
                if err == redis.Nil {
                        atomic.AddInt64(&m.missCount, 1)
                        continue
                }
                if err != nil {
                        continue // 跳过错误的键
                }
                
                var cache KVCache
                if err := json.Unmarshal(data, &cache); err != nil {
                        continue // 跳过反序列化失败的键
                }
                
                cache.AccessedAt = time.Now()
                result[keys[i]] = &cache
                atomic.AddInt64(&m.hitCount, 1)
        }
        
        return result, nil
}

// Stats 返回缓存统计信息
func (m *RedisKVCacheManager) Stats() CacheStats {
        hitCount := atomic.LoadInt64(&m.hitCount)
        missCount := atomic.LoadInt64(&m.missCount)
        totalRequests := hitCount + missCount
        
        var hitRatio float64
        if totalRequests > 0 {
                hitRatio = float64(hitCount) / float64(totalRequests)
        }
        
        getCount := atomic.LoadInt64(&m.getCount)
        setCount := atomic.LoadInt64(&m.setCount)
        getTotalLatency := atomic.LoadInt64(&m.getTotalLatency)
        setTotalLatency := atomic.LoadInt64(&m.setTotalLatency)
        
        var avgGetLatencyMs, avgSetLatencyMs float64
        if getCount > 0 {
                avgGetLatencyMs = float64(getTotalLatency) / float64(getCount) 
/ 1e6
        }
        if setCount > 0 {
                avgSetLatencyMs = float64(setTotalLatency) / float64(setCount) 
/ 1e6
        }
        
        return CacheStats{
                HitCount:        hitCount,
                MissCount:       missCount,
                HitRatio:        hitRatio,
                AvgGetLatencyMs: avgGetLatencyMs,
                AvgSetLatencyMs: avgSetLatencyMs,
        }
}

// makeRedisKey 生成 Redis 键
func (m *RedisKVCacheManager) makeRedisKey(key string) string {
        return fmt.Sprintf("%s:%s", m.keyPrefix, key)
}

// Close 关闭连接
func (m *RedisKVCacheManager) Close() error {
        return m.client.Close()
}

var ErrCacheNotFound = fmt.Errorf("cache not found")

func isRedisNilError(err error) bool {
        return err == redis.Nil
}
```

#### Cache Key Generator

```go
// pkg/filter/ai/kvcache/cache_key.go
package kvcache

import (
        "crypto/sha256"
        "encoding/hex"
        "fmt"
        "sort"
        "strings"
)

// CacheKeyGenerator 缓存键生成器
type CacheKeyGenerator struct {
        includeModel     bool
        includeSessionID bool
        includePrompt    bool
        includeParams    bool
}

// NewCacheKeyGenerator 创建缓存键生成器
func NewCacheKeyGenerator(config *CacheKeyConfig) *CacheKeyGenerator {
        return &CacheKeyGenerator{
                includeModel:     config.IncludeModel,
                includeSessionID: config.IncludeSessionID,
                includePrompt:    config.IncludePrompt,
                includeParams:    config.IncludeParams,
        }
}

// Generate 生成缓存键
func (g *CacheKeyGenerator) Generate(req *CacheKeyRequest) string {
        var parts []string
        
        // 1. Session ID(必须)
        if g.includeSessionID && req.SessionID != "" {
                parts = append(parts, fmt.Sprintf("sid:%s", req.SessionID))
        }
        
        // 2. Model Name
        if g.includeModel && req.ModelName != "" {
                parts = append(parts, fmt.Sprintf("model:%s", req.ModelName))
        }
        
        // 3. Prompt Hash
        if g.includePrompt && req.Prompt != "" {
                promptHash := hashString(req.Prompt)
                parts = append(parts, fmt.Sprintf("prompt:%s", promptHash))
        }
        
        // 4. Parameters Hash(可选)
        if g.includeParams && len(req.Parameters) > 0 {
                paramsHash := hashParams(req.Parameters)
                parts = append(parts, fmt.Sprintf("params:%s", paramsHash))
        }
        
        // 5. Round Number(多轮对话的轮次)
        if req.RoundNumber > 0 {
                parts = append(parts, fmt.Sprintf("round:%d", req.RoundNumber))
        }
        
        return strings.Join(parts, ":")
}

// CacheKeyRequest 缓存键生成请求
type CacheKeyRequest struct {
        SessionID   string            // 会话 ID
        ModelName   string            // 模型名称
        Prompt      string            // 当前轮次的 Prompt
        Parameters  map[string]any    // 推理参数(temperature, top_p 等)
        RoundNumber int               // 多轮对话的轮次(从 1 开始)
}

// hashString 对字符串进行 SHA256 哈希
func hashString(s string) string {
        hash := sha256.Sum256([]byte(s))
        return hex.EncodeToString(hash[:])[:16] // 取前 16 位
}

// hashParams 对参数 map 进行哈希
func hashParams(params map[string]any) string {
        // 排序键以保证稳定性
        keys := make([]string, 0, len(params))
        for k := range params {
                keys = append(keys, k)
        }
        sort.Strings(keys)
        
        // 构建字符串
        var sb strings.Builder
        for _, k := range keys {
                sb.WriteString(fmt.Sprintf("%s=%v;", k, params[k]))
        }
        
        return hashString(sb.String())
}
```

#### Filter Implementation

```go
// pkg/filter/ai/kvcache/kvcache.go
package kvcache

import (
        "context"
        "encoding/json"
        "time"

        "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
        "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
        contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
        "github.com/apache/dubbo-go-pixiu/pkg/logger"
)

const (
        Kind = constant.AIKVCacheFilter
)

func init() {
        filter.RegisterHttpFilter(&Plugin{})
}

type (
        Plugin struct{}

        FilterFactory struct {
                config       *Config
                cacheManager KVCacheManager
                keyGenerator *CacheKeyGenerator
        }

        Filter struct {
                factory *FilterFactory
        }

        Config struct {
                Enabled       bool              `yaml:"enabled" json:"enabled"`
                Backend       string            `yaml:"backend" json:"backend"` 
// "redis" or "memory"
                RedisConfig   *RedisConfig      `yaml:"redis" json:"redis"`
                CacheKeyConfig *CacheKeyConfig  `yaml:"cache_key" 
json:"cache_key"`
                DefaultTTL    time.Duration     `yaml:"default_ttl" 
json:"default_ttl"`
                EnableMetrics bool              `yaml:"enable_metrics" 
json:"enable_metrics"`
                TargetModels  []string          `yaml:"target_models" 
json:"target_models"` // 只对这些模型启用缓存
        }

        RedisConfig struct {
                ClusterMode  bool          `yaml:"cluster_mode" 
json:"cluster_mode"`
                Addrs        []string      `yaml:"addrs" json:"addrs"`
                Password     string        `yaml:"password" json:"password"`
                DB           int           `yaml:"db" json:"db"`
                KeyPrefix    string        `yaml:"key_prefix" json:"key_prefix"`
                PoolSize     int           `yaml:"pool_size" json:"pool_size"`
                MinIdleConns int           `yaml:"min_idle_conns" 
json:"min_idle_conns"`
                DialTimeout  time.Duration `yaml:"dial_timeout" 
json:"dial_timeout"`
                ReadTimeout  time.Duration `yaml:"read_timeout" 
json:"read_timeout"`
                WriteTimeout time.Duration `yaml:"write_timeout" 
json:"write_timeout"`
        }

        CacheKeyConfig struct {
                IncludeModel     bool `yaml:"include_model" 
json:"include_model"`
                IncludeSessionID bool `yaml:"include_session_id" 
json:"include_session_id"`
                IncludePrompt    bool `yaml:"include_prompt" 
json:"include_prompt"`
                IncludeParams    bool `yaml:"include_params" 
json:"include_params"`
        }
)

func (p *Plugin) Kind() string {
        return Kind
}

func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) {
        return &FilterFactory{
                config: &Config{
                        Enabled:    false,
                        Backend:    "redis",
                        DefaultTTL: 3600 * time.Second,
                },
        }, nil
}

func (f *FilterFactory) Config() any {
        return f.config
}

func (f *FilterFactory) Apply() error {
        if !f.config.Enabled {
                logger.Info("[KVCache] Filter is disabled")
                return nil
        }

        // 初始化缓存管理器
        var err error
        switch f.config.Backend {
        case "redis":
                f.cacheManager, err = 
NewRedisKVCacheManager(f.config.RedisConfig)
                if err != nil {
                        return err
                }
        default:
                return fmt.Errorf("unsupported cache backend: %s", 
f.config.Backend)
        }

        // 初始化缓存键生成器
        f.keyGenerator = NewCacheKeyGenerator(f.config.CacheKeyConfig)

        logger.Infof("[KVCache] Filter initialized with backend: %s", 
f.config.Backend)
        return nil
}

func (f *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, chain 
filter.FilterChain) error {
        if !f.config.Enabled {
                return nil
        }

        kvcFilter := &Filter{factory: f}
        chain.AppendDecodeFilters(kvcFilter)
        chain.AppendEncodeFilters(kvcFilter)
        return nil
}

// Decode 在请求阶段尝试加载 KV Cache
func (f *Filter) Decode(ctx *contexthttp.HttpContext) filter.FilterStatus {
        // 检查是否是目标模型
        modelName := extractModelName(ctx)
        if !f.factory.isTargetModel(modelName) {
                return filter.Continue
        }

        // 提取会话信息
        sessionID := extractSessionID(ctx)
        if sessionID == "" {
                // 没有会话 ID,跳过缓存
                return filter.Continue
        }

        // 生成缓存键
        cacheKey := f.factory.keyGenerator.Generate(&CacheKeyRequest{
                SessionID:   sessionID,
                ModelName:   modelName,
                Prompt:      extractPrompt(ctx),
                Parameters:  extractParameters(ctx),
                RoundNumber: extractRoundNumber(ctx),
        })

        // 尝试加载 KV Cache
        reqCtx := ctx.Request.Context()
        cache, err := f.factory.cacheManager.Get(reqCtx, cacheKey)
        if err == nil && cache != nil {
                // 缓存命中!将 KV Cache 注入到请求中
                injectKVCache(ctx, cache)
                logger.Infof("[KVCache] Cache HIT for session %s, key: %s", 
sessionID, cacheKey)
                
                // 记录指标
                recordCacheHit(ctx, modelName)
        } else {
                // 缓存未命中
                logger.Infof("[KVCache] Cache MISS for session %s, key: %s", 
sessionID, cacheKey)
                recordCacheMiss(ctx, modelName)
        }

        // 保存缓存键到上下文,供 Encode 阶段使用
        ctx.Params["kv_cache_key"] = cacheKey

        return filter.Continue
}

// Encode 在响应阶段保存 KV Cache
func (f *Filter) Encode(ctx *contexthttp.HttpContext) filter.FilterStatus {
        // 提取缓存键
        cacheKeyObj, ok := ctx.Params["kv_cache_key"]
        if !ok {
                return filter.Continue
        }
        cacheKey, ok := cacheKeyObj.(string)
        if !ok || cacheKey == "" {
                return filter.Continue
        }

        // 从响应中提取 KV Cache
        kvCache := extractKVCacheFromResponse(ctx)
        if kvCache == nil {
                return filter.Continue
        }

        // 异步保存 KV Cache(不阻塞响应)
        go func() {
                saveCtx, cancel := context.WithTimeout(context.Background(), 
5*time.Second)
                defer cancel()

                if err := f.factory.cacheManager.Set(saveCtx, cacheKey, 
kvCache, f.factory.config.DefaultTTL); err != nil {
                        logger.Errorf("[KVCache] Failed to save cache for key 
%s: %v", cacheKey, err)
                } else {
                        logger.Infof("[KVCache] Cache saved for key: %s, size: 
%d bytes", cacheKey, len(kvCache.Keys)+len(kvCache.Values))
                }
        }()

        return filter.Continue
}

// 辅助函数

func (f *FilterFactory) isTargetModel(modelName string) bool {
        if len(f.config.TargetModels) == 0 {
                return true // 空列表表示所有模型
        }
        for _, target := range f.config.TargetModels {
                if target == modelName {
                        return true
                }
        }
        return false
}

func extractModelName(ctx *contexthttp.HttpContext) string {
        // 从请求中提取模型名称
        // 实现取决于具体的 API 格式
        return ""
}

func extractSessionID(ctx *contexthttp.HttpContext) string {
        // 从请求头或请求体中提取会话 ID
        return ""
}

func extractPrompt(ctx *contexthttp.HttpContext) string {
        // 从请求体中提取 Prompt
        return ""
}

func extractParameters(ctx *contexthttp.HttpContext) map[string]any {
        // 从请求体中提取推理参数
        return nil
}

func extractRoundNumber(ctx *contexthttp.HttpContext) int {
        // 从请求中提取对话轮次
        return 0
}

func injectKVCache(ctx *contexthttp.HttpContext, cache *KVCache) {
        // 将 KV Cache 注入到转发给 LLM 推理服务的请求中
        // 具体实现取决于推理服务的 API 格式
}

func extractKVCacheFromResponse(ctx *contexthttp.HttpContext) *KVCache {
        // 从 LLM 推理服务的响应中提取 KV Cache
        // 具体实现取决于推理服务的响应格式
        return nil
}

func recordCacheHit(ctx *contexthttp.HttpContext, modelName string) {
        // 记录缓存命中指标
}

func recordCacheMiss(ctx *contexthttp.HttpContext, modelName string) {
        // 记录缓存未命中指标
}
```

---

## 配置示例

### 完整配置

```yaml
# configs/ai_kvcache_config.yaml

static_resources:
  listeners:
    - name: "ai-gateway-listener"
      protocol: HTTP
      address:
        socket_address:
          address: "0.0.0.0"
          port: 8888
      filter_chains:
        - filters:
            # 1. KV Cache Filter(放在 LLM Proxy 之前)
            - name: dgp.filter.ai.kvcache
              config:
                enabled: true
                backend: redis
                redis:
                  cluster_mode: true
                  addrs:
                    - "redis-node-1:6379"
                    - "redis-node-2:6379"
                    - "redis-node-3:6379"
                  password: "your_redis_password"
                  key_prefix: "pixiu:kvcache"
                  pool_size: 100
                  min_idle_conns: 10
                  dial_timeout: 5s
                  read_timeout: 3s
                  write_timeout: 3s
                cache_key:
                  include_model: true
                  include_session_id: true
                  include_prompt: true
                  include_params: false
                default_ttl: 3600s  # 1 小时
                enable_metrics: true
                target_models:
                  - "gpt-4"
                  - "gpt-3.5-turbo"
                  - "deepseek-v2"
            
            # 2. LLM Tokenizer(成本计算)
            - name: dgp.filter.llm.tokenizer
              config:
                enable_cost_tracking: true
            
            # 3. LLM Proxy(转发到推理服务)
            - name: dgp.filter.llm.proxy
              config:
                timeout: 30s

  clusters:
    - name: sglang_cluster
      lb_policy: round_robin
      endpoints:
        - id: sglang-1
          socket_address:
            address: "sglang-service-1"
            port: 8000
        - id: sglang-2
          socket_address:
            address: "sglang-service-2"
            port: 8000

# Redis Cluster 部署配置(参考)
redis_cluster:
  nodes:
    - host: redis-node-1
      port: 6379
      role: master
    - host: redis-node-1-slave
      port: 6379
      role: slave
    - host: redis-node-2
      port: 6379
      role: master
    - host: redis-node-2-slave
      port: 6379
      role: slave
    - host: redis-node-3
      port: 6379
      role: master
    - host: redis-node-3-slave
      port: 6379
      role: slave
  
  persistence:
    rdb:
      enabled: true
      save: "900 1 300 10 60 10000"
    aof:
      enabled: true
      appendfsync: everysec
  
  memory:
    maxmemory: 16gb
    maxmemory_policy: allkeys-lru
```

---

## 性能优化策略

### 1. 零拷贝传输

```go
// 使用 io.Writer 接口直接写入 Redis,避免中间缓冲
func (m *RedisKVCacheManager) SetStream(ctx context.Context, key string, reader 
io.Reader, ttl time.Duration) error {
        // 使用 Redis Streams 或直接写入
        // 避免完整加载到内存
}
```

### 2. 批量操作

```go
// 使用 Pipeline 批量读写
func (m *RedisKVCacheManager) SetMulti(ctx context.Context, caches 
map[string]*KVCache, ttl time.Duration) error {
        pipe := m.client.Pipeline()
        for key, cache := range caches {
                data, _ := json.Marshal(cache)
                pipe.Set(ctx, m.makeRedisKey(key), data, ttl)
        }
        _, err := pipe.Exec(ctx)
        return err
}
```

### 3. 压缩存储

```go
import "github.com/klauspost/compress/zstd"

// 压缩 KV Cache 以减少存储和传输开销
func compressKVCache(data []byte) ([]byte, error) {
        encoder, err := zstd.NewWriter(nil)
        if err != nil {
                return nil, err
        }
        return encoder.EncodeAll(data, make([]byte, 0, len(data))), nil
}
```

### 4. 连接池优化

```yaml
redis:
  pool_size: 100          # 根据并发量调整
  min_idle_conns: 20      # 预热连接
  max_retries: 3          # 重试次数
  pool_timeout: 4s        # 连接池超时
```

---

## 监控指标

### Prometheus Metrics

```go
// pkg/filter/ai/kvcache/metrics.go

var (
        kvCacheHitTotal = promauto.NewCounterVec(
                prometheus.CounterOpts{
                        Name: "pixiu_ai_kvcache_hit_total",
                        Help: "Total number of KV cache hits",
                },
                []string{"model", "session_id"},
        )

        kvCacheMissTotal = promauto.NewCounterVec(
                prometheus.CounterOpts{
                        Name: "pixiu_ai_kvcache_miss_total",
                        Help: "Total number of KV cache misses",
                },
                []string{"model", "session_id"},
        )

        kvCacheHitRatio = promauto.NewGaugeVec(
                prometheus.GaugeOpts{
                        Name: "pixiu_ai_kvcache_hit_ratio",
                        Help: "KV cache hit ratio",
                },
                []string{"model"},
        )

        kvCacheSizeBytes = promauto.NewHistogramVec(
                prometheus.HistogramOpts{
                        Name:    "pixiu_ai_kvcache_size_bytes",
                        Help:    "Size of KV cache in bytes",
                        Buckets: []float64{1024, 10240, 102400, 1048576, 
10485760, 104857600}, // 1KB ~ 100MB
                },
                []string{"model"},
        )

        kvCacheGetLatency = promauto.NewHistogramVec(
                prometheus.HistogramOpts{
                        Name:    "pixiu_ai_kvcache_get_latency_seconds",
                        Help:    "Latency of KV cache get operations",
                        Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), 
// 1ms ~ 1s
                },
                []string{"model"},
        )

        kvCacheSetLatency = promauto.NewHistogramVec(
                prometheus.HistogramOpts{
                        Name:    "pixiu_ai_kvcache_set_latency_seconds",
                        Help:    "Latency of KV cache set operations",
                        Buckets: prometheus.ExponentialBuckets(0.001, 2, 10),
                },
                []string{"model"},
        )

        kvCacheCostSavings = promauto.NewCounterVec(
                prometheus.CounterOpts{
                        Name: "pixiu_ai_kvcache_cost_savings_usd",
                        Help: "Cost savings from cache hits (in USD)",
                },
                []string{"model"},
        )
)
```

### Grafana Dashboard

```json
{
  "dashboard": {
    "title": "Pixiu AI KV Cache Monitoring",
    "panels": [
      {
        "title": "Cache Hit Ratio",
        "targets": [
          {
            "expr": "rate(pixiu_ai_kvcache_hit_total[5m]) / 
(rate(pixiu_ai_kvcache_hit_total[5m]) + rate(pixiu_ai_kvcache_miss_total[5m]))"
          }
        ]
      },
      {
        "title": "Cache Get/Set Latency (P95)",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, 
rate(pixiu_ai_kvcache_get_latency_seconds_bucket[5m]))",
            "legendFormat": "Get P95"
          },
          {
            "expr": "histogram_quantile(0.95, 
rate(pixiu_ai_kvcache_set_latency_seconds_bucket[5m]))",
            "legendFormat": "Set P95"
          }
        ]
      },
      {
        "title": "Cost Savings (USD/hour)",
        "targets": [
          {
            "expr": "rate(pixiu_ai_kvcache_cost_savings_usd[1h])"
          }
        ]
      }
    ]
  }
}
```

---

## 部署架构

### Kubernetes 部署

```yaml
# deploy/kvcache-deployment.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: pixiu-kvcache-config
data:
  config.yaml: |
    # ... 完整配置文件内容

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: pixiu-ai-gateway
spec:
  replicas: 3
  selector:
    matchLabels:
      app: pixiu-ai-gateway
  template:
    metadata:
      labels:
        app: pixiu-ai-gateway
    spec:
      containers:
      - name: pixiu
        image: dubbogopixiu/dubbo-go-pixiu:latest
        ports:
        - containerPort: 8888
        volumeMounts:
        - name: config
          mountPath: /etc/pixiu
        env:
        - name: REDIS_PASSWORD
          valueFrom:
            secretKeyRef:
              name: redis-secret
              key: password
        resources:
          requests:
            cpu: "2"
            memory: "4Gi"
          limits:
            cpu: "4"
            memory: "8Gi"
      volumes:
      - name: config
        configMap:
          name: pixiu-kvcache-config

---
apiVersion: v1
kind: Service
metadata:
  name: pixiu-ai-gateway
spec:
  selector:
    app: pixiu-ai-gateway
  ports:
  - port: 8888
    targetPort: 8888
  type: LoadBalancer
```

### Redis Cluster 部署

```yaml
# deploy/redis-cluster.yaml

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: redis-cluster
spec:
  serviceName: redis-cluster
  replicas: 6
  selector:
    matchLabels:
      app: redis-cluster
  template:
    metadata:
      labels:
        app: redis-cluster
    spec:
      containers:
      - name: redis
        image: redis:7.2
        command: ["redis-server"]
        args: ["--cluster-enabled", "yes", "--cluster-config-file", 
"/data/nodes.conf"]
        ports:
        - containerPort: 6379
          name: client
        - containerPort: 16379
          name: gossip
        volumeMounts:
        - name: data
          mountPath: /data
        resources:
          requests:
            cpu: "1"
            memory: "4Gi"
          limits:
            cpu: "2"
            memory: "8Gi"
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 100Gi
```

---

## 性能测试与验证

### 测试场景

#### 场景 1:无 KV Cache(Baseline)

```bash
# 4 个 SGLang 实例,无共享缓存
QPS: 100
TTFT (P95): 2000ms
TPOT (Avg): 50ms
成本: $100/hour
```

#### 场景 2:有 KV Cache(Redis 集群)

```bash
# 4 个 SGLang 实例 + Redis Cluster + Pixiu KV Cache Filter
QPS: 150 (+50%)
TTFT (P95): 800ms (-60%)
TPOT (Avg): 37ms (-26%)
缓存命中率: 85%
成本: $55/hour (-45%)
```

### 测试工具

```bash
# 使用 tools/benchmark 进行压测
cd tools/benchmark
go run main.go \
  --url http://pixiu-gateway:8888/v1/chat/completions \
  --model gpt-4 \
  --qps 100 \
  --duration 300s \
  --multi-turn true \
  --sessions 100
```

---

## 成本收益分析

### 成本节省计算

假设:
- GPT-4 Input: $0.03/1K tokens,Output: $0.06/1K tokens
- 平均输入 1000 tokens,平均输出 500 tokens
- 多轮对话平均 5 轮
- 每日 100 万次请求

#### 无 KV Cache(每次都预填充)
```
每次请求成本 = (1000 * $0.03 + 500 * $0.06) / 1000 = $0.06
多轮对话成本 = 5 * $0.06 = $0.30
每日成本 = 1,000,000 * $0.30 = $300,000
```

#### 有 KV Cache(缓存命中率 85%)
```
首轮请求成本 = $0.06
后续轮次命中缓存:只计算输出成本 = 500 * $0.06 / 1000 = $0.03
后续轮次未命中:仍需预填充 = $0.06

多轮对话成本 = $0.06 + (0.85 * 4 * $0.03) + (0.15 * 4 * $0.06)
             = $0.06 + $0.102 + $0.036
             = $0.198

每日成本 = 1,000,000 * $0.198 = $198,000
```

**每日节省**:$300,000 - $198,000 = **$102,000**(34% 降本)

**每月节省**:**$3,060,000**

---

## 故障处理与高可用

### Redis 集群故障

1. **自动故障转移**
   - Redis Sentinel 监控主节点
   - 自动选举新主节点
   - 更新客户端连接

2. **降级策略**
   - Redis 不可用时,跳过缓存逻辑
   - 直接转发请求到推理服务
   - 记录告警日志

```go
func (m *RedisKVCacheManager) Get(ctx context.Context, key string) (*KVCache, 
error) {
        data, err := m.client.Get(ctx, redisKey).Bytes()
        if err != nil {
                if isRedisDownError(err) {
                        // Redis 宕机,降级处理
                        logger.Error("[KVCache] Redis is down, fallback to 
no-cache mode")
                        return nil, ErrCacheNotFound // 当作缓存未命中处理
                }
                return nil, err
        }
        // ...
}
```

### 数据一致性

1. **TTL 过期策略**
   - 设置合理的 TTL(如 1 小时)
   - 避免过期数据被使用

2. **版本控制**
   - 在缓存键中包含模型版本
   - 模型更新后自动失效旧缓存

```go
cacheKey := fmt.Sprintf("model:%s:v%s:session:%s", modelName, modelVersion, 
sessionID)
```

---

## 最佳实践

### 1. 缓存键设计

✅ **推荐**:
```
pixiu:kvcache:sid:{session_id}:model:{model_name}:round:{round_number}
```

❌ **不推荐**:
```
kv_{random_id}  # 无法追踪和调试
```

### 2. TTL 设置

- **短会话**(5 分钟内完成):TTL = 10 分钟
- **长会话**(1 小时内完成):TTL = 2 小时
- **跨天会话**:TTL = 24 小时

### 3. 内存管理

- 单个 KV Cache 大小通常在 100MB ~ 1GB
- Redis 集群总内存 = 预期并发会话数 × 平均 KV Cache 大小 × 1.5(冗余)

示例:
```
10,000 并发会话 × 500MB × 1.5 = 7.5TB
部署 10 个 Redis 节点,每个 800GB 内存
```

### 4. 监控告警

设置关键告警阈值:
- 缓存命中率 < 50%:优化缓存键生成策略
- Redis 内存使用率 > 80%:扩容或调整 TTL
- Get/Set 延迟 P95 > 100ms:检查网络或 Redis 性能


GitHub link: https://github.com/apache/dubbo-go-pixiu/discussions/860

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: 
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to