This is an automated email from the ASF dual-hosted git repository.

zfeng pushed a commit to branch AI
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/AI by this push:
     new 9c3c9d29 Ospp 2025 transport (#917)
9c3c9d29 is described below

commit 9c3c9d2962b23812739e53aee14ee834de5f46e1
Author: 深几许 <[email protected]>
AuthorDate: Thu Oct 30 00:06:40 2025 +0800

    Ospp 2025 transport (#917)
    
    * upd: clear branch
    
    * feat: implement ospp_2025_transport
    
    * feat: implement ospp_2025_a2a-protocol go.work
    
    ---------
    
    Co-authored-by: FinnTew <[email protected]>
    Co-authored-by: FinnTew <[email protected]>
---
 go.work                          |   5 +-
 go.work.sum                      |  46 ++++++
 transport/README.md              | 331 +++++++++++++++++++++++++++++++++++++++
 transport/common/errors.go       |  93 +++++++++++
 transport/common/registry.go     | 134 ++++++++++++++++
 transport/common/types.go        | 159 +++++++++++++++++++
 transport/common/utils.go        | 165 +++++++++++++++++++
 transport/go.mod                 |  15 ++
 transport/go.sum                 |  34 ++++
 transport/grpc/builder.go        | 187 ++++++++++++++++++++++
 transport/grpc/client.go         | 251 +++++++++++++++++++++++++++++
 transport/grpc/grpc.go           |  91 +++++++++++
 transport/grpc/server.go         | 257 ++++++++++++++++++++++++++++++
 transport/grpc/stream_reader.go  | 113 +++++++++++++
 transport/grpc/stream_writer.go  | 122 +++++++++++++++
 transport/jsonrpc/builder.go     | 155 ++++++++++++++++++
 transport/jsonrpc/client.go      | 208 ++++++++++++++++++++++++
 transport/jsonrpc/jsonrpc.go     |  74 +++++++++
 transport/jsonrpc/server.go      | 247 +++++++++++++++++++++++++++++
 transport/jsonrpc/sse_builder.go | 185 ++++++++++++++++++++++
 transport/jsonrpc/sse_client.go  | 211 +++++++++++++++++++++++++
 transport/jsonrpc/sse_server.go  | 314 +++++++++++++++++++++++++++++++++++++
 transport/jsonrpc/sse_types.go   | 144 +++++++++++++++++
 transport/jsonrpc/types.go       | 155 ++++++++++++++++++
 transport/transport.go           |  95 +++++++++++
 25 files changed, 3790 insertions(+), 1 deletion(-)

diff --git a/go.work b/go.work
index 5c403828..86dff4e2 100644
--- a/go.work
+++ b/go.work
@@ -1,3 +1,6 @@
 go 1.24.5
 
-use ./agenthub
+use (
+  ./transport
+  ./agenthub
+)
\ No newline at end of file
diff --git a/go.work.sum b/go.work.sum
new file mode 100644
index 00000000..9fbb984b
--- /dev/null
+++ b/go.work.sum
@@ -0,0 +1,46 @@
+cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=
+cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
+cloud.google.com/go/compute/metadata v0.7.0 
h1:PBWF+iiAerVNe8UCHxdOt6eHLVc3ydFeOCw78U8ytSU=
+cloud.google.com/go/compute/metadata v0.7.0/go.mod 
h1:j5MvL9PprKL39t166CoB1uVHfQMs4tFQZZcKwksXUjo=
+github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp 
v1.27.0 h1:ErKg/3iS1AKcTkf3yixlZ54f9U1rljCkQyEXWUnIUxc=
+github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp 
v1.27.0/go.mod h1:yAZHSGnqScoU556rBOVkwLze6WP5N+U11RHuWaGVxwY=
+github.com/cespare/xxhash/v2 v2.3.0 
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+github.com/cespare/xxhash/v2 v2.3.0/go.mod 
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 
h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls=
+github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod 
h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
+github.com/envoyproxy/go-control-plane v0.13.4 
h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M=
+github.com/envoyproxy/go-control-plane v0.13.4/go.mod 
h1:kDfuBlDVsSj2MjrLEtRWtHlsWIFcGyB2RMO44Dc5GZA=
+github.com/envoyproxy/go-control-plane/envoy v1.32.4 
h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A=
+github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod 
h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw=
+github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 
h1:/G9QYbddjL25KvtKTv3an9lx6VBE2cnb8wp1vEGNYGI=
+github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod 
h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4=
+github.com/envoyproxy/protoc-gen-validate v1.2.1 
h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
+github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod 
h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
+github.com/go-jose/go-jose/v4 v4.0.5 
h1:M6T8+mKZl/+fNNuFHvGIzDz7BTLQPIounk/b9dw3AaE=
+github.com/go-jose/go-jose/v4 v4.0.5/go.mod 
h1:s3P1lRrkT8igV8D9OjyL4WRyHvjB6a4JSllnOrmmBOA=
+github.com/golang/glog v1.2.5 h1:DrW6hGnjIhtvhOIiAKT6Psh/Kd/ldepEa81DKeiRJ5I=
+github.com/golang/glog v1.2.5/go.mod 
h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
+github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 
h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
+github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod 
h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
+github.com/spiffe/go-spiffe/v2 v2.5.0 
h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=
+github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod 
h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
+github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM=
+github.com/zeebo/errs v1.4.0/go.mod 
h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
+go.opentelemetry.io/contrib/detectors/gcp v1.36.0 
h1:F7q2tNlCaHY9nMKHR6XH9/qkp8FktLnIcy6jJNyOCQw=
+go.opentelemetry.io/contrib/detectors/gcp v1.36.0/go.mod 
h1:IbBN8uAIIx734PTonTPxAxnjc2pQTxWNkwfstZ+6H2k=
+golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8=
+golang.org/x/crypto v0.38.0/go.mod 
h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw=
+golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
+golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
+golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI=
+golang.org/x/oauth2 v0.30.0/go.mod 
h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU=
+golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
+golang.org/x/sync v0.14.0/go.mod 
h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
+golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg=
+golang.org/x/term v0.32.0/go.mod 
h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ=
+golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d 
h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
+golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod 
h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 
h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/genproto/googleapis/api v0.0.0-20250528174236-200df99c418a 
h1:SGktgSolFCo75dnHJF2yMvnns6jCmHFJ0vE4Vn2JKvQ=
+google.golang.org/genproto/googleapis/api 
v0.0.0-20250528174236-200df99c418a/go.mod 
h1:a77HrdMjoeKbnd2jmgcWdaS++ZLZAEq3orIOAEIKiVw=
diff --git a/transport/README.md b/transport/README.md
new file mode 100644
index 00000000..b7f6e17c
--- /dev/null
+++ b/transport/README.md
@@ -0,0 +1,331 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+-->
+
+# Seata Go AI Transport Layer Implementation
+
+This package provides a unified transport layer for both JSON-RPC and gRPC 
protocols, supporting both streaming and non-streaming operations.
+
+## Features
+
+### Protocols Supported
+- **JSON-RPC 2.0**: HTTP-based with Server-Sent Events (SSE) for streaming
+- **gRPC**: Full support including server streaming RPCs
+
+### Core Capabilities
+- **Unified Interface**: Common abstractions for both protocols
+- **Streaming Support**: 
+  - JSON-RPC: SSE-based streaming
+  - gRPC: Server streaming RPCs
+- **Registry Pattern**: Automatic protocol detection and builder registration
+- **Type Safety**: Proper interface contracts with compile-time checks
+- **Error Handling**: Protocol-specific error types with common interface
+- **Extensible**: Easy to add new transport protocols
+
+## Architecture
+
+### Package Structure
+```
+pkg/transport/
+├── common/          # Common interfaces and utilities
+│   ├── types.go     # Core types and interfaces
+│   ├── registry.go  # Protocol registry and factory
+│   ├── utils.go     # Utility functions
+│   └── errors.go    # Error definitions
+├── jsonrpc/         # JSON-RPC implementation
+│   ├── types.go     # JSON-RPC specific types
+│   ├── client.go    # HTTP client implementation
+│   ├── server.go    # HTTP server implementation
+│   ├── sse_*.go     # Server-Sent Events streaming
+│   ├── builder.go   # Factory builders
+│   └── jsonrpc.go   # Package exports and registration
+├── grpc/            # gRPC implementation
+│   ├── client.go    # gRPC client wrapper
+│   ├── server.go    # gRPC server wrapper
+│   ├── stream_*.go  # Streaming implementations
+│   ├── builder.go   # Factory builders
+│   └── grpc.go      # Package exports and registration
+└── transport.go     # Main package exports
+```
+
+### Core Interfaces
+
+#### Client Interface
+```go
+type Client interface {
+    Call(ctx context.Context, msg *Message) (*Response, error)
+    Stream(ctx context.Context, msg *Message) (StreamReader, error)
+    Protocol() Protocol
+    Close() error
+}
+```
+
+#### Server Interface
+```go
+type Server interface {
+    Serve() error
+    Stop(ctx context.Context) error
+    RegisterHandler(method string, handler Handler)
+    RegisterStreamHandler(method string, handler StreamHandler)
+    Protocol() Protocol
+}
+```
+
+## Usage Examples
+
+### Basic JSON-RPC Server
+```go
+import "seata-go-ai-transport"
+
+// Create server
+config := &transport.Config{
+    Protocol: transport.ProtocolJSONRPC,
+    Address:  ":8080",
+    Options: map[string]interface{}{
+        "streaming": true, // Enable SSE
+    },
+}
+
+server, err := transport.CreateServer(config)
+if err != nil {
+    log.Fatal(err)
+}
+
+// Register handler
+server.RegisterHandler("echo", func(ctx context.Context, req 
*transport.Message) (*transport.Response, error) {
+    return transport.CreateResponse(map[string]interface{}{
+        "echo": string(req.Payload),
+    })
+})
+
+// Start server
+go server.Serve()
+```
+
+### Basic JSON-RPC Client
+```go
+// Create client
+config := &transport.Config{
+    Protocol: transport.ProtocolJSONRPC,
+    Address:  "http://localhost:8080";,
+    Options: map[string]interface{}{
+        "streaming": true,
+    },
+}
+
+client, err := transport.CreateClient(config)
+if err != nil {
+    log.Fatal(err)
+}
+defer client.Close()
+
+// Make call
+msg, _ := transport.MarshalMessage(map[string]string{"hello": "world"})
+msg.Method = "echo"
+
+resp, err := client.Call(context.Background(), msg)
+if err != nil {
+    log.Fatal(err)
+}
+
+var result map[string]interface{}
+transport.UnmarshalResponse(resp, &result)
+fmt.Println(result)
+```
+
+### Streaming Example
+```go
+// Register streaming handler
+server.RegisterStreamHandler("stream", func(ctx context.Context, req 
*transport.Message, stream transport.StreamWriter) error {
+    for i := 0; i < 5; i++ {
+        resp, _ := transport.CreateStreamResponse(map[string]int{"count": i}, 
i == 4)
+        if err := stream.Send(resp); err != nil {
+            return err
+        }
+        time.Sleep(time.Second)
+    }
+    return nil
+})
+
+// Client streaming call
+streamMsg, _ := transport.MarshalMessage(nil)
+streamMsg.Method = "stream"
+
+stream, err := client.Stream(context.Background(), streamMsg)
+if err != nil {
+    log.Fatal(err)
+}
+defer stream.Close()
+
+for {
+    resp, err := stream.Recv()
+    if err != nil {
+        break
+    }
+    if resp.Done {
+        break
+    }
+    fmt.Printf("Stream data: %s\n", string(resp.Data))
+}
+```
+
+### gRPC Usage
+```go
+// Create gRPC server
+config := &transport.Config{
+    Protocol: transport.ProtocolGRPC,
+    Address:  ":9090",
+    Options: map[string]interface{}{
+        "insecure": true,
+    },
+}
+
+server, err := transport.CreateServer(config)
+if err != nil {
+    log.Fatal(err)
+}
+
+// Register methods (requires proto definitions)
+grpcServer := server.(*grpc.Server)
+grpcServer.RegisterMethod("Ping", grpc.CreateUnaryMethodInfo(
+    "/service/Ping",
+    &PingRequest{},
+    &PongResponse{},
+))
+
+// Start server
+go server.Serve()
+```
+
+### Direct Usage (Without Registry)
+```go
+import (
+    "seata-go-ai-transport/jsonrpc"
+    "seata-go-ai-transport/grpc"
+)
+
+// Direct JSON-RPC client
+jsonClient := jsonrpc.NewSSEJSONRPCClient("http://localhost:8080";, "")
+defer jsonClient.Close()
+
+// Direct gRPC client
+grpcClient, err := grpc.NewGRPCClient("localhost:9090", true)
+if err != nil {
+    log.Fatal(err)
+}
+defer grpcClient.Close()
+```
+
+## Configuration Options
+
+### JSON-RPC Options
+```go
+Options: map[string]interface{}{
+    "endpoint":        "http://localhost:8080";,    // Server endpoint
+    "stream_endpoint": "http://localhost:8080/stream";, // SSE endpoint
+    "timeout":         "30s",                      // Request timeout
+    "headers": map[string]string{                  // Custom headers
+        "Authorization": "Bearer token",
+    },
+    "streaming":       true,                       // Enable SSE streaming
+    "stream_path":     "/stream",                  // SSE path for server
+    "read_timeout":    "30s",                      // Server read timeout
+    "write_timeout":   "30s",                      // Server write timeout
+    "idle_timeout":    "120s",                     // Server idle timeout
+}
+```
+
+### gRPC Options
+```go
+Options: map[string]interface{}{
+    "target":          "localhost:9090",          // Server target
+    "insecure":        true,                      // Use insecure connection
+    "timeout":         "30s",                     // Connection timeout
+    "dial_options":    []grpc.DialOption{...},    // Custom dial options
+    "server_options":  []grpc.ServerOption{...},  // Custom server options
+    "methods": map[string]grpc.MethodInfo{        // Method definitions
+        "Ping": grpc.CreateUnaryMethodInfo(...),
+    },
+}
+```
+
+## Error Handling
+
+The transport layer provides structured error handling:
+
+```go
+// Check if error is transport-specific
+if transport.IsTransportError(err) {
+    if transportErr, ok := transport.GetTransportError(err); ok {
+        fmt.Printf("Protocol: %s, Code: %d, Message: %s\n", 
+            transportErr.Protocol, transportErr.Code, transportErr.Message)
+    }
+}
+
+// JSON-RPC specific errors
+if jsonrpc.IsJSONRPCError(err) {
+    if jsonErr, ok := jsonrpc.GetJSONRPCError(err); ok {
+        fmt.Printf("JSON-RPC Error: %d - %s\n", jsonErr.Code, jsonErr.Message)
+    }
+}
+```
+
+## Implementation Details
+
+### JSON-RPC Features
+- **HTTP/1.1 Transport**: Standard HTTP POST requests
+- **SSE Streaming**: Server-Sent Events for real-time streaming
+- **JSON-RPC 2.0 Compliance**: Full specification compliance
+- **Error Propagation**: Standard JSON-RPC error codes
+- **Content Negotiation**: Proper HTTP headers and content types
+
+### gRPC Features
+- **Server Streaming**: Full server streaming RPC support
+- **Protocol Buffers**: Integration with existing protobuf definitions
+- **Connection Management**: Proper connection lifecycle
+- **Metadata Support**: gRPC metadata as transport headers
+- **Graceful Shutdown**: Clean server shutdown with context cancellation
+
+### Streaming Implementations
+- **JSON-RPC**: Uses SSE (Server-Sent Events) over HTTP
+- **gRPC**: Uses native gRPC server streaming RPCs
+- **Common Interface**: Both implement the same `StreamReader`/`StreamWriter` 
interfaces
+- **Error Handling**: Stream-specific error propagation
+
+## Thread Safety
+
+All implementations are thread-safe:
+- **Clients**: Safe for concurrent use
+- **Servers**: Handle multiple concurrent connections
+- **Streams**: Individual streams are not thread-safe (by design)
+- **Registry**: Thread-safe protocol registration and lookup
+
+## Extension Points
+
+To add a new transport protocol:
+
+1. Implement the `Client` and `Server` interfaces
+2. Create `ClientBuilder` and `ServerBuilder` implementations
+3. Register builders in package `init()` function
+4. Add protocol constant to `common/types.go`
+
+Example:
+```go
+func init() {
+    common.RegisterClientBuilder(common.ProtocolWebSocket, &WSClientBuilder{})
+    common.RegisterServerBuilder(common.ProtocolWebSocket, &WSServerBuilder{})
+}
+```
\ No newline at end of file
diff --git a/transport/common/errors.go b/transport/common/errors.go
new file mode 100644
index 00000000..4a2c9889
--- /dev/null
+++ b/transport/common/errors.go
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+       "errors"
+       "fmt"
+)
+
+var (
+       // ErrProtocolNotSupported indicates the protocol is not supported
+       ErrProtocolNotSupported = errors.New("protocol not supported")
+
+       // ErrClientClosed indicates the client is already closed
+       ErrClientClosed = errors.New("client is closed")
+
+       // ErrServerClosed indicates the server is already closed
+       ErrServerClosed = errors.New("server is closed")
+
+       // ErrStreamClosed indicates the stream is already closed
+       ErrStreamClosed = errors.New("stream is closed")
+
+       // ErrInvalidConfig indicates invalid configuration
+       ErrInvalidConfig = errors.New("invalid configuration")
+
+       // ErrHandlerNotFound indicates handler not found for method
+       ErrHandlerNotFound = errors.New("handler not found")
+
+       // ErrTimeout indicates operation timeout
+       ErrTimeout = errors.New("operation timeout")
+
+       // ErrConnectionFailed indicates connection failed
+       ErrConnectionFailed = errors.New("connection failed")
+)
+
+// TransportError represents a transport-specific error
+type TransportError struct {
+       Protocol Protocol
+       Code     int
+       Message  string
+       Cause    error
+}
+
+// Error implements the error interface
+func (e *TransportError) Error() string {
+       if e.Cause != nil {
+               return fmt.Sprintf("%s transport error [%d]: %s: %v", 
e.Protocol, e.Code, e.Message, e.Cause)
+       }
+       return fmt.Sprintf("%s transport error [%d]: %s", e.Protocol, e.Code, 
e.Message)
+}
+
+// Unwrap returns the cause error
+func (e *TransportError) Unwrap() error {
+       return e.Cause
+}
+
+// NewTransportError creates a new transport error
+func NewTransportError(protocol Protocol, code int, message string, cause 
error) *TransportError {
+       return &TransportError{
+               Protocol: protocol,
+               Code:     code,
+               Message:  message,
+               Cause:    cause,
+       }
+}
+
+// IsTransportError checks if an error is a TransportError
+func IsTransportError(err error) bool {
+       var te *TransportError
+       return errors.As(err, &te)
+}
+
+// GetTransportError extracts TransportError from error
+func GetTransportError(err error) (*TransportError, bool) {
+       var te *TransportError
+       ok := errors.As(err, &te)
+       return te, ok
+}
diff --git a/transport/common/registry.go b/transport/common/registry.go
new file mode 100644
index 00000000..86f64a72
--- /dev/null
+++ b/transport/common/registry.go
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+       "fmt"
+       "sync"
+)
+
+// Registry manages transport builders
+type Registry struct {
+       mu             sync.RWMutex
+       clientBuilders map[Protocol]ClientBuilder
+       serverBuilders map[Protocol]ServerBuilder
+}
+
+// NewRegistry creates a new transport registry
+func NewRegistry() *Registry {
+       return &Registry{
+               clientBuilders: make(map[Protocol]ClientBuilder),
+               serverBuilders: make(map[Protocol]ServerBuilder),
+       }
+}
+
+// RegisterClientBuilder registers a client builder for a protocol
+func (r *Registry) RegisterClientBuilder(protocol Protocol, builder 
ClientBuilder) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       r.clientBuilders[protocol] = builder
+}
+
+// RegisterServerBuilder registers a server builder for a protocol
+func (r *Registry) RegisterServerBuilder(protocol Protocol, builder 
ServerBuilder) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       r.serverBuilders[protocol] = builder
+}
+
+// GetClientBuilder gets a client builder for a protocol
+func (r *Registry) GetClientBuilder(protocol Protocol) (ClientBuilder, error) {
+       r.mu.RLock()
+       defer r.mu.RUnlock()
+
+       builder, exists := r.clientBuilders[protocol]
+       if !exists {
+               return nil, fmt.Errorf("no client builder registered for 
protocol %s", protocol)
+       }
+       return builder, nil
+}
+
+// GetServerBuilder gets a server builder for a protocol
+func (r *Registry) GetServerBuilder(protocol Protocol) (ServerBuilder, error) {
+       r.mu.RLock()
+       defer r.mu.RUnlock()
+
+       builder, exists := r.serverBuilders[protocol]
+       if !exists {
+               return nil, fmt.Errorf("no server builder registered for 
protocol %s", protocol)
+       }
+       return builder, nil
+}
+
+// CreateClient creates a client using the registered builder
+func (r *Registry) CreateClient(config *Config) (Client, error) {
+       builder, err := r.GetClientBuilder(config.Protocol)
+       if err != nil {
+               return nil, err
+       }
+       return builder.Build(config)
+}
+
+// CreateServer creates a server using the registered builder
+func (r *Registry) CreateServer(config *Config) (Server, error) {
+       builder, err := r.GetServerBuilder(config.Protocol)
+       if err != nil {
+               return nil, err
+       }
+       return builder.Build(config)
+}
+
+// SupportedProtocols returns all supported protocols
+func (r *Registry) SupportedProtocols() []Protocol {
+       r.mu.RLock()
+       defer r.mu.RUnlock()
+
+       protocols := make([]Protocol, 0, len(r.clientBuilders))
+       for protocol := range r.clientBuilders {
+               protocols = append(protocols, protocol)
+       }
+       return protocols
+}
+
+// Default registry instance
+var defaultRegistry = NewRegistry()
+
+// RegisterClientBuilder registers a client builder in the default registry
+func RegisterClientBuilder(protocol Protocol, builder ClientBuilder) {
+       defaultRegistry.RegisterClientBuilder(protocol, builder)
+}
+
+// RegisterServerBuilder registers a server builder in the default registry
+func RegisterServerBuilder(protocol Protocol, builder ServerBuilder) {
+       defaultRegistry.RegisterServerBuilder(protocol, builder)
+}
+
+// CreateClient creates a client using the default registry
+func CreateClient(config *Config) (Client, error) {
+       return defaultRegistry.CreateClient(config)
+}
+
+// CreateServer creates a server using the default registry
+func CreateServer(config *Config) (Server, error) {
+       return defaultRegistry.CreateServer(config)
+}
+
+// GetSupportedProtocols returns all supported protocols from default registry
+func GetSupportedProtocols() []Protocol {
+       return defaultRegistry.SupportedProtocols()
+}
diff --git a/transport/common/types.go b/transport/common/types.go
new file mode 100644
index 00000000..8932420e
--- /dev/null
+++ b/transport/common/types.go
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+       "context"
+       "io"
+)
+
+// Protocol represents the transport protocol type
+type Protocol int
+
+const (
+       ProtocolGRPC Protocol = iota
+       ProtocolJSONRPC
+)
+
+func (p Protocol) String() string {
+       switch p {
+       case ProtocolGRPC:
+               return "grpc"
+       case ProtocolJSONRPC:
+               return "jsonrpc"
+       default:
+               return "unknown"
+       }
+}
+
+// StreamType represents the streaming capability
+type StreamType int
+
+const (
+       StreamTypeUnary StreamType = iota
+       StreamTypeServerStream
+       StreamTypeClientStream
+       StreamTypeBiDirStream
+)
+
+// Message represents a generic transport message
+type Message struct {
+       Method  string
+       Payload []byte
+       Headers map[string]string
+}
+
+// Response represents a generic transport response
+type Response struct {
+       Data    []byte
+       Headers map[string]string
+       Error   error
+}
+
+// StreamResponse represents a streaming response
+type StreamResponse struct {
+       Data    []byte
+       Headers map[string]string
+       Done    bool
+       Error   error
+}
+
+// Client represents a transport client interface
+type Client interface {
+       // Call makes a unary RPC call
+       Call(ctx context.Context, msg *Message) (*Response, error)
+
+       // Stream makes a streaming RPC call
+       Stream(ctx context.Context, msg *Message) (StreamReader, error)
+
+       // Protocol returns the underlying protocol
+       Protocol() Protocol
+
+       // Close closes the client connection
+       Close() error
+}
+
+// Server represents a transport server interface
+type Server interface {
+       // Serve starts the server
+       Serve() error
+
+       // Stop stops the server gracefully
+       Stop(ctx context.Context) error
+
+       // RegisterHandler registers a handler for a method
+       RegisterHandler(method string, handler Handler)
+
+       // RegisterStreamHandler registers a streaming handler
+       RegisterStreamHandler(method string, handler StreamHandler)
+
+       // Protocol returns the underlying protocol
+       Protocol() Protocol
+}
+
+// Handler represents a unary RPC handler
+type Handler func(ctx context.Context, req *Message) (*Response, error)
+
+// StreamHandler represents a streaming RPC handler
+type StreamHandler func(ctx context.Context, req *Message, stream 
StreamWriter) error
+
+// StreamReader provides methods to read from a stream
+type StreamReader interface {
+       // Recv receives the next message from the stream
+       Recv() (*StreamResponse, error)
+
+       // Close closes the stream
+       Close() error
+}
+
+// StreamWriter provides methods to write to a stream
+type StreamWriter interface {
+       // Send sends a message to the stream
+       Send(resp *StreamResponse) error
+
+       // Close closes the stream
+       Close() error
+}
+
+// Config represents transport configuration
+type Config struct {
+       Protocol Protocol               `json:"protocol"`
+       Address  string                 `json:"address"`
+       Options  map[string]interface{} `json:"options,omitempty"`
+}
+
+// ClientBuilder builds transport clients
+type ClientBuilder interface {
+       // Build creates a new client with the given config
+       Build(config *Config) (Client, error)
+
+       // Protocol returns the protocol this builder supports
+       Protocol() Protocol
+}
+
+// ServerBuilder builds transport servers
+type ServerBuilder interface {
+       // Build creates a new server with the given config
+       Build(config *Config) (Server, error)
+
+       // Protocol returns the protocol this builder supports
+       Protocol() Protocol
+}
+
+var _ io.Closer = (StreamReader)(nil)
+var _ io.Closer = (StreamWriter)(nil)
diff --git a/transport/common/utils.go b/transport/common/utils.go
new file mode 100644
index 00000000..a73dc9a2
--- /dev/null
+++ b/transport/common/utils.go
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+       "encoding/json"
+       "fmt"
+)
+
+// MarshalMessage marshals a message to JSON
+func MarshalMessage(v interface{}) (*Message, error) {
+       data, err := json.Marshal(v)
+       if err != nil {
+               return nil, fmt.Errorf("failed to marshal message: %w", err)
+       }
+
+       return &Message{
+               Payload: data,
+               Headers: make(map[string]string),
+       }, nil
+}
+
+// UnmarshalResponse unmarshals a response from JSON
+func UnmarshalResponse(resp *Response, v interface{}) error {
+       if resp.Error != nil {
+               return resp.Error
+       }
+
+       if err := json.Unmarshal(resp.Data, v); err != nil {
+               return fmt.Errorf("failed to unmarshal response: %w", err)
+       }
+
+       return nil
+}
+
+// UnmarshalStreamResponse unmarshals a stream response from JSON
+func UnmarshalStreamResponse(resp *StreamResponse, v interface{}) error {
+       if resp.Error != nil {
+               return resp.Error
+       }
+
+       if resp.Done {
+               return nil
+       }
+
+       if err := json.Unmarshal(resp.Data, v); err != nil {
+               return fmt.Errorf("failed to unmarshal stream response: %w", 
err)
+       }
+
+       return nil
+}
+
+// CreateResponse creates a response with JSON marshaled data
+func CreateResponse(v interface{}) (*Response, error) {
+       data, err := json.Marshal(v)
+       if err != nil {
+               return nil, fmt.Errorf("failed to marshal response: %w", err)
+       }
+
+       return &Response{
+               Data:    data,
+               Headers: make(map[string]string),
+       }, nil
+}
+
+// CreateStreamResponse creates a stream response with JSON marshaled data
+func CreateStreamResponse(v interface{}, done bool) (*StreamResponse, error) {
+       var data []byte
+       var err error
+
+       if v != nil && !done {
+               data, err = json.Marshal(v)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to marshal stream 
response: %w", err)
+               }
+       }
+
+       return &StreamResponse{
+               Data:    data,
+               Headers: make(map[string]string),
+               Done:    done,
+       }, nil
+}
+
+// CreateErrorResponse creates an error response
+func CreateErrorResponse(err error) *Response {
+       return &Response{
+               Data:    nil,
+               Headers: make(map[string]string),
+               Error:   err,
+       }
+}
+
+// CreateErrorStreamResponse creates an error stream response
+func CreateErrorStreamResponse(err error) *StreamResponse {
+       return &StreamResponse{
+               Data:    nil,
+               Headers: make(map[string]string),
+               Done:    true,
+               Error:   err,
+       }
+}
+
+// SetHeader sets a header in message
+func (m *Message) SetHeader(key, value string) {
+       if m.Headers == nil {
+               m.Headers = make(map[string]string)
+       }
+       m.Headers[key] = value
+}
+
+// GetHeader gets a header from message
+func (m *Message) GetHeader(key string) string {
+       if m.Headers == nil {
+               return ""
+       }
+       return m.Headers[key]
+}
+
+// SetHeader sets a header in response
+func (r *Response) SetHeader(key, value string) {
+       if r.Headers == nil {
+               r.Headers = make(map[string]string)
+       }
+       r.Headers[key] = value
+}
+
+// GetHeader gets a header from response
+func (r *Response) GetHeader(key string) string {
+       if r.Headers == nil {
+               return ""
+       }
+       return r.Headers[key]
+}
+
+// SetHeader sets a header in stream response
+func (sr *StreamResponse) SetHeader(key, value string) {
+       if sr.Headers == nil {
+               sr.Headers = make(map[string]string)
+       }
+       sr.Headers[key] = value
+}
+
+// GetHeader gets a header from stream response
+func (sr *StreamResponse) GetHeader(key string) string {
+       if sr.Headers == nil {
+               return ""
+       }
+       return sr.Headers[key]
+}
diff --git a/transport/go.mod b/transport/go.mod
new file mode 100644
index 00000000..a3b0eeb1
--- /dev/null
+++ b/transport/go.mod
@@ -0,0 +1,15 @@
+module seata-go-ai-transport
+
+go 1.24.5
+
+require (
+       google.golang.org/grpc v1.74.2
+       google.golang.org/protobuf v1.36.6
+)
+
+require (
+       golang.org/x/net v0.40.0 // indirect
+       golang.org/x/sys v0.33.0 // indirect
+       golang.org/x/text v0.25.0 // indirect
+       google.golang.org/genproto/googleapis/rpc 
v0.0.0-20250528174236-200df99c418a // indirect
+)
diff --git a/transport/go.sum b/transport/go.sum
new file mode 100644
index 00000000..e5226a87
--- /dev/null
+++ b/transport/go.sum
@@ -0,0 +1,34 @@
+github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
+github.com/go-logr/logr v1.4.3/go.mod 
h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
+github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
+github.com/go-logr/stdr v1.2.2/go.mod 
h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/golang/protobuf v1.5.4 
h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
+github.com/golang/protobuf v1.5.4/go.mod 
h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
+github.com/google/go-cmp v0.7.0/go.mod 
h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+go.opentelemetry.io/auto/sdk v1.1.0 
h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
+go.opentelemetry.io/auto/sdk v1.1.0/go.mod 
h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
+go.opentelemetry.io/otel v1.36.0 
h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
+go.opentelemetry.io/otel v1.36.0/go.mod 
h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
+go.opentelemetry.io/otel/metric v1.36.0 
h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE=
+go.opentelemetry.io/otel/metric v1.36.0/go.mod 
h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs=
+go.opentelemetry.io/otel/sdk v1.36.0 
h1:b6SYIuLRs88ztox4EyrvRti80uXIFy+Sqzoh9kFULbs=
+go.opentelemetry.io/otel/sdk v1.36.0/go.mod 
h1:+lC+mTgD+MUWfjJubi2vvXWcVxyr9rmlshZni72pXeY=
+go.opentelemetry.io/otel/sdk/metric v1.36.0 
h1:r0ntwwGosWGaa0CrSt8cuNuTcccMXERFwHX4dThiPis=
+go.opentelemetry.io/otel/sdk/metric v1.36.0/go.mod 
h1:qTNOhFDfKRwX0yXOqJYegL5WRaW376QbB7P4Pb0qva4=
+go.opentelemetry.io/otel/trace v1.36.0 
h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w=
+go.opentelemetry.io/otel/trace v1.36.0/go.mod 
h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA=
+golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY=
+golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
+golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
+golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
+golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4=
+golang.org/x/text v0.25.0/go.mod 
h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a 
h1:v2PbRU4K3llS09c7zodFpNePeamkAwG3mPrAery9VeE=
+google.golang.org/genproto/googleapis/rpc 
v0.0.0-20250528174236-200df99c418a/go.mod 
h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
+google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4=
+google.golang.org/grpc v1.74.2/go.mod 
h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM=
+google.golang.org/protobuf v1.36.6 
h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
+google.golang.org/protobuf v1.36.6/go.mod 
h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
diff --git a/transport/grpc/builder.go b/transport/grpc/builder.go
new file mode 100644
index 00000000..bde23031
--- /dev/null
+++ b/transport/grpc/builder.go
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+       "fmt"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "seata-go-ai-transport/common"
+)
+
+// ClientBuilder implements common.ClientBuilder for gRPC
+type ClientBuilder struct{}
+
+var _ common.ClientBuilder = (*ClientBuilder)(nil)
+
+// Build creates a new gRPC client with the given config
+func (b *ClientBuilder) Build(config *common.Config) (common.Client, error) {
+       if config.Protocol != common.ProtocolGRPC {
+               return nil, fmt.Errorf("invalid protocol %s for gRPC client", 
config.Protocol)
+       }
+
+       clientConfig := &ClientConfig{
+               Target:   config.Address,
+               Insecure: true, // Default to insecure for simplicity
+               Timeout:  30 * time.Second,
+               Methods:  make(map[string]MethodInfo),
+       }
+
+       var dialOpts []grpc.DialOption
+
+       // Parse options
+       if config.Options != nil {
+               if target, ok := config.Options["target"].(string); ok {
+                       clientConfig.Target = target
+               }
+               if ins, ok := config.Options["insecure"].(bool); ok {
+                       clientConfig.Insecure = ins
+               }
+               if timeoutStr, ok := config.Options["timeout"].(string); ok {
+                       if timeout, err := time.ParseDuration(timeoutStr); err 
== nil {
+                               clientConfig.Timeout = timeout
+                       }
+               }
+               if timeout, ok := config.Options["timeout"].(time.Duration); ok 
{
+                       clientConfig.Timeout = timeout
+               }
+               if options, ok := 
config.Options["dial_options"].([]grpc.DialOption); ok {
+                       dialOpts = append(dialOpts, options...)
+               }
+               if methods, ok := 
config.Options["methods"].(map[string]MethodInfo); ok {
+                       clientConfig.Methods = methods
+               }
+       }
+
+       // Use address as target if not specified in options
+       if clientConfig.Target == "" {
+               clientConfig.Target = config.Address
+       }
+
+       clientConfig.Options = dialOpts
+
+       return NewClient(clientConfig)
+}
+
+// Protocol returns the protocol this builder supports
+func (b *ClientBuilder) Protocol() common.Protocol {
+       return common.ProtocolGRPC
+}
+
+// ServerBuilder implements common.ServerBuilder for gRPC
+type ServerBuilder struct{}
+
+var _ common.ServerBuilder = (*ServerBuilder)(nil)
+
+// Build creates a new gRPC server with the given config
+func (b *ServerBuilder) Build(config *common.Config) (common.Server, error) {
+       if config.Protocol != common.ProtocolGRPC {
+               return nil, fmt.Errorf("invalid protocol %s for gRPC server", 
config.Protocol)
+       }
+
+       serverConfig := &ServerConfig{
+               Address: config.Address,
+               Methods: make(map[string]MethodInfo),
+       }
+
+       var serverOpts []grpc.ServerOption
+
+       // Parse options
+       if config.Options != nil {
+               if options, ok := 
config.Options["server_options"].([]grpc.ServerOption); ok {
+                       serverOpts = append(serverOpts, options...)
+               }
+               if methods, ok := 
config.Options["methods"].(map[string]MethodInfo); ok {
+                       serverConfig.Methods = methods
+               }
+       }
+
+       serverConfig.Options = serverOpts
+
+       return NewServer(serverConfig)
+}
+
+// Protocol returns the protocol this builder supports
+func (b *ServerBuilder) Protocol() common.Protocol {
+       return common.ProtocolGRPC
+}
+
+// Helper functions for creating configurations
+
+// NewGRPCClientConfig creates a client config with common options
+func NewGRPCClientConfig(target string, isInsecure bool) *ClientConfig {
+       config := &ClientConfig{
+               Target:   target,
+               Insecure: isInsecure,
+               Timeout:  30 * time.Second,
+               Methods:  make(map[string]MethodInfo),
+               Options:  []grpc.DialOption{},
+       }
+
+       if isInsecure {
+               config.Options = append(config.Options, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       }
+
+       return config
+}
+
+// NewGRPCServerConfig creates a server config with common options
+func NewGRPCServerConfig(address string) *ServerConfig {
+       return &ServerConfig{
+               Address: address,
+               Methods: make(map[string]MethodInfo),
+               Options: []grpc.ServerOption{},
+       }
+}
+
+// WithDialOptions adds dial options to client config
+func (c *ClientConfig) WithDialOptions(opts ...grpc.DialOption) *ClientConfig {
+       c.Options = append(c.Options, opts...)
+       return c
+}
+
+// WithServerOptions adds server options to server config
+func (c *ServerConfig) WithServerOptions(opts ...grpc.ServerOption) 
*ServerConfig {
+       c.Options = append(c.Options, opts...)
+       return c
+}
+
+// WithMethods adds method information to config
+func (c *ClientConfig) WithMethods(methods map[string]MethodInfo) 
*ClientConfig {
+       if c.Methods == nil {
+               c.Methods = make(map[string]MethodInfo)
+       }
+       for k, v := range methods {
+               c.Methods[k] = v
+       }
+       return c
+}
+
+// WithMethods adds method information to server config
+func (c *ServerConfig) WithMethods(methods map[string]MethodInfo) 
*ServerConfig {
+       if c.Methods == nil {
+               c.Methods = make(map[string]MethodInfo)
+       }
+       for k, v := range methods {
+               c.Methods[k] = v
+       }
+       return c
+}
diff --git a/transport/grpc/client.go b/transport/grpc/client.go
new file mode 100644
index 00000000..82587e98
--- /dev/null
+++ b/transport/grpc/client.go
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/protobuf/proto"
+
+       "seata-go-ai-transport/common"
+)
+
+// Client implements the common.Client interface for gRPC
+type Client struct {
+       conn    *grpc.ClientConn
+       client  interface{} // Generic gRPC client
+       closed  int32
+       mu      sync.RWMutex
+       methods map[string]MethodInfo
+}
+
+// MethodInfo contains information about a gRPC method
+type MethodInfo struct {
+       FullName    string
+       IsStreaming bool
+       InputType   proto.Message
+       OutputType  proto.Message
+}
+
+// ClientConfig represents gRPC client configuration
+type ClientConfig struct {
+       Target   string                `json:"target"`
+       Insecure bool                  `json:"insecure"`
+       Timeout  time.Duration         `json:"timeout"`
+       Options  []grpc.DialOption     `json:"-"`
+       Methods  map[string]MethodInfo `json:"-"`
+}
+
+var _ common.Client = (*Client)(nil)
+
+// NewClient creates a new gRPC client
+func NewClient(config *ClientConfig) (*Client, error) {
+       opts := config.Options
+       if opts == nil {
+               opts = []grpc.DialOption{}
+       }
+
+       if config.Insecure {
+               opts = append(opts, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       }
+
+       if config.Timeout > 0 {
+               opts = append(opts, grpc.WithTimeout(config.Timeout))
+       }
+
+       conn, err := grpc.NewClient(config.Target, opts...)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create gRPC connection: %w", 
err)
+       }
+
+       client := &Client{
+               conn:    conn,
+               methods: make(map[string]MethodInfo),
+       }
+
+       if config.Methods != nil {
+               for method, info := range config.Methods {
+                       client.methods[method] = info
+               }
+       }
+
+       return client, nil
+}
+
+// Call makes a unary RPC call
+func (c *Client) Call(ctx context.Context, msg *common.Message) 
(*common.Response, error) {
+       if atomic.LoadInt32(&c.closed) == 1 {
+               return nil, common.ErrClientClosed
+       }
+
+       // Get method info
+       c.mu.RLock()
+       methodInfo, exists := c.methods[msg.Method]
+       c.mu.RUnlock()
+
+       if !exists {
+               return nil, fmt.Errorf("method %s not registered", msg.Method)
+       }
+
+       if methodInfo.IsStreaming {
+               return nil, fmt.Errorf("method %s is a streaming method, use 
Stream() instead", msg.Method)
+       }
+
+       // Create input message
+       input := proto.Clone(methodInfo.InputType)
+       if len(msg.Payload) > 0 {
+               if err := json.Unmarshal(msg.Payload, input); err != nil {
+                       return nil, fmt.Errorf("failed to unmarshal request: 
%w", err)
+               }
+       }
+
+       // Create output message
+       output := proto.Clone(methodInfo.OutputType)
+
+       // Make the call
+       err := c.conn.Invoke(ctx, methodInfo.FullName, input, output)
+       if err != nil {
+               return &common.Response{
+                       Headers: make(map[string]string),
+                       Error:   err,
+               }, nil
+       }
+
+       // Marshal response
+       respData, err := json.Marshal(output)
+       if err != nil {
+               return nil, fmt.Errorf("failed to marshal response: %w", err)
+       }
+
+       return &common.Response{
+               Data:    respData,
+               Headers: make(map[string]string),
+       }, nil
+}
+
+// Stream makes a streaming RPC call
+func (c *Client) Stream(ctx context.Context, msg *common.Message) 
(common.StreamReader, error) {
+       if atomic.LoadInt32(&c.closed) == 1 {
+               return nil, common.ErrClientClosed
+       }
+
+       // Get method info
+       c.mu.RLock()
+       methodInfo, exists := c.methods[msg.Method]
+       c.mu.RUnlock()
+
+       if !exists {
+               return nil, fmt.Errorf("method %s not registered", msg.Method)
+       }
+
+       if !methodInfo.IsStreaming {
+               return nil, fmt.Errorf("method %s is not a streaming method, 
use Call() instead", msg.Method)
+       }
+
+       // Create input message
+       input := proto.Clone(methodInfo.InputType)
+       if len(msg.Payload) > 0 {
+               if err := json.Unmarshal(msg.Payload, input); err != nil {
+                       return nil, fmt.Errorf("failed to unmarshal request: 
%w", err)
+               }
+       }
+
+       // Create streaming description
+       streamDesc := &grpc.StreamDesc{
+               StreamName:    extractMethodName(methodInfo.FullName),
+               ServerStreams: true,
+               ClientStreams: false,
+       }
+
+       // Start stream
+       stream, err := c.conn.NewStream(ctx, streamDesc, methodInfo.FullName)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create stream: %w", err)
+       }
+
+       // Send input message
+       if err := stream.SendMsg(input); err != nil {
+               _ = stream.CloseSend()
+               return nil, fmt.Errorf("failed to send message: %w", err)
+       }
+
+       // Close sending side
+       if err := stream.CloseSend(); err != nil {
+               return nil, fmt.Errorf("failed to close send: %w", err)
+       }
+
+       return NewGRPCStreamReader(ctx, stream, methodInfo.OutputType), nil
+}
+
+// Protocol returns the underlying protocol
+func (c *Client) Protocol() common.Protocol {
+       return common.ProtocolGRPC
+}
+
+// Close closes the client connection
+func (c *Client) Close() error {
+       if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
+               return common.ErrClientClosed
+       }
+
+       return c.conn.Close()
+}
+
+// RegisterMethod registers a method with the client
+func (c *Client) RegisterMethod(method string, info MethodInfo) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       c.methods[method] = info
+}
+
+// extractMethodName extracts the method name from full method name
+func extractMethodName(fullMethod string) string {
+       // Extract method name from full method path like 
"/package.Service/Method"
+       for i := len(fullMethod) - 1; i >= 0; i-- {
+               if fullMethod[i] == '/' {
+                       return fullMethod[i+1:]
+               }
+       }
+       return fullMethod
+}
+
+// SetClient sets the underlying gRPC client (for custom clients)
+func (c *Client) SetClient(client interface{}) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       c.client = client
+}
+
+// GetClient gets the underlying gRPC client
+func (c *Client) GetClient() interface{} {
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+       return c.client
+}
+
+// GetConnection gets the underlying gRPC connection
+func (c *Client) GetConnection() *grpc.ClientConn {
+       return c.conn
+}
diff --git a/transport/grpc/grpc.go b/transport/grpc/grpc.go
new file mode 100644
index 00000000..015a5ac2
--- /dev/null
+++ b/transport/grpc/grpc.go
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+       "google.golang.org/protobuf/proto"
+
+       "seata-go-ai-transport/common"
+)
+
+// Package initialization - register builders with the common registry
+func init() {
+       common.RegisterClientBuilder(common.ProtocolGRPC, &ClientBuilder{})
+       common.RegisterServerBuilder(common.ProtocolGRPC, &ServerBuilder{})
+}
+
+// Public constructors for direct usage
+
+// NewGRPCClient creates a new gRPC client directly
+func NewGRPCClient(target string, isInsecure bool) (*Client, error) {
+       config := NewGRPCClientConfig(target, isInsecure)
+       return NewClient(config)
+}
+
+// NewGRPCServer creates a new gRPC server directly
+func NewGRPCServer(address string) (*Server, error) {
+       config := NewGRPCServerConfig(address)
+       return NewServer(config)
+}
+
+// Utility functions for method registration
+
+// CreateMethodInfo creates MethodInfo for a gRPC method
+func CreateMethodInfo(fullName string, isStreaming bool, inputType, outputType 
proto.Message) MethodInfo {
+       return MethodInfo{
+               FullName:    fullName,
+               IsStreaming: isStreaming,
+               InputType:   inputType,
+               OutputType:  outputType,
+       }
+}
+
+// CreateUnaryMethodInfo creates MethodInfo for a unary method
+func CreateUnaryMethodInfo(fullName string, inputType, outputType 
proto.Message) MethodInfo {
+       return CreateMethodInfo(fullName, false, inputType, outputType)
+}
+
+// CreateStreamingMethodInfo creates MethodInfo for a streaming method
+func CreateStreamingMethodInfo(fullName string, inputType, outputType 
proto.Message) MethodInfo {
+       return CreateMethodInfo(fullName, true, inputType, outputType)
+}
+
+// Helper functions for error handling
+
+// IsGRPCError checks if an error is a gRPC error
+func IsGRPCError(err error) bool {
+       // This can be extended to check for specific gRPC error types
+       if err == nil {
+               return false
+       }
+       // Check if it's a gRPC status error
+       _, ok := err.(interface {
+               GRPCStatus() interface{}
+       })
+       return ok
+}
+
+// CreateTransportError creates a transport error from gRPC error
+func CreateTransportError(err error) *common.TransportError {
+       return common.NewTransportError(
+               common.ProtocolGRPC,
+               -1, // gRPC errors don't have simple integer codes
+               err.Error(),
+               err,
+       )
+}
diff --git a/transport/grpc/server.go b/transport/grpc/server.go
new file mode 100644
index 00000000..275981c5
--- /dev/null
+++ b/transport/grpc/server.go
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "net"
+       "sync"
+       "sync/atomic"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/protobuf/proto"
+
+       "seata-go-ai-transport/common"
+)
+
+// Server implements the common.Server interface for gRPC
+type Server struct {
+       server         *grpc.Server
+       address        string
+       listener       net.Listener
+       handlers       map[string]common.Handler
+       streamHandlers map[string]common.StreamHandler
+       methods        map[string]MethodInfo
+       mu             sync.RWMutex
+       started        int32
+       closed         int32
+}
+
+// ServerConfig represents gRPC server configuration
+type ServerConfig struct {
+       Address string                `json:"address"`
+       Options []grpc.ServerOption   `json:"-"`
+       Methods map[string]MethodInfo `json:"-"`
+}
+
+var _ common.Server = (*Server)(nil)
+
+// NewServer creates a new gRPC server
+func NewServer(config *ServerConfig) (*Server, error) {
+       opts := config.Options
+       if opts == nil {
+               opts = []grpc.ServerOption{}
+       }
+
+       server := &Server{
+               server:         grpc.NewServer(opts...),
+               address:        config.Address,
+               handlers:       make(map[string]common.Handler),
+               streamHandlers: make(map[string]common.StreamHandler),
+               methods:        make(map[string]MethodInfo),
+       }
+
+       if config.Methods != nil {
+               for method, info := range config.Methods {
+                       server.methods[method] = info
+               }
+       }
+
+       return server, nil
+}
+
+// Serve starts the server
+func (s *Server) Serve() error {
+       if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
+               return fmt.Errorf("server already started")
+       }
+
+       listener, err := net.Listen("tcp", s.address)
+       if err != nil {
+               atomic.StoreInt32(&s.started, 0)
+               return fmt.Errorf("failed to listen on %s: %w", s.address, err)
+       }
+
+       s.listener = listener
+       return s.server.Serve(listener)
+}
+
+// Stop stops the server gracefully
+func (s *Server) Stop(ctx context.Context) error {
+       if !atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
+               return common.ErrServerClosed
+       }
+
+       // Create a channel to signal when graceful stop is complete
+       done := make(chan struct{})
+
+       go func() {
+               s.server.GracefulStop()
+               close(done)
+       }()
+
+       // Wait for graceful stop or context cancellation
+       select {
+       case <-done:
+               return nil
+       case <-ctx.Done():
+               // Force stop if context is cancelled
+               s.server.Stop()
+               return ctx.Err()
+       }
+}
+
+// RegisterHandler registers a handler for a method
+func (s *Server) RegisterHandler(method string, handler common.Handler) {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       s.handlers[method] = handler
+}
+
+// RegisterStreamHandler registers a streaming handler
+func (s *Server) RegisterStreamHandler(method string, handler 
common.StreamHandler) {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       s.streamHandlers[method] = handler
+}
+
+// Protocol returns the underlying protocol
+func (s *Server) Protocol() common.Protocol {
+       return common.ProtocolGRPC
+}
+
+// RegisterMethod registers a method with the server
+func (s *Server) RegisterMethod(method string, info MethodInfo) {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       s.methods[method] = info
+}
+
+// CreateUnaryHandler creates a gRPC unary handler from a common handler
+func (s *Server) CreateUnaryHandler(method string, handler common.Handler) 
grpc.UnaryHandler {
+       return func(ctx context.Context, req interface{}) (interface{}, error) {
+               // Get method info
+               s.mu.RLock()
+               methodInfo, exists := s.methods[method]
+               s.mu.RUnlock()
+
+               if !exists {
+                       return nil, fmt.Errorf("method %s not registered", 
method)
+               }
+
+               // Convert proto message to JSON
+               reqProto, ok := req.(proto.Message)
+               if !ok {
+                       return nil, fmt.Errorf("request is not a proto message")
+               }
+
+               reqData, err := json.Marshal(reqProto)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to marshal request: %w", 
err)
+               }
+
+               // Create common message
+               msg := &common.Message{
+                       Method:  method,
+                       Payload: reqData,
+                       Headers: extractMetadata(ctx),
+               }
+
+               // Call handler
+               resp, err := handler(ctx, msg)
+               if err != nil {
+                       return nil, err
+               }
+
+               if resp.Error != nil {
+                       return nil, resp.Error
+               }
+
+               // Create output message
+               output := proto.Clone(methodInfo.OutputType)
+               if len(resp.Data) > 0 {
+                       if err := json.Unmarshal(resp.Data, output); err != nil 
{
+                               return nil, fmt.Errorf("failed to unmarshal 
response: %w", err)
+                       }
+               }
+
+               return output, nil
+       }
+}
+
+// CreateStreamHandler creates a gRPC stream handler from a common stream 
handler
+func (s *Server) CreateStreamHandler(method string, handler 
common.StreamHandler) grpc.StreamHandler {
+       return func(srv interface{}, stream grpc.ServerStream) error {
+               // Get method info
+               s.mu.RLock()
+               methodInfo, exists := s.methods[method]
+               s.mu.RUnlock()
+
+               if !exists {
+                       return fmt.Errorf("method %s not registered", method)
+               }
+
+               // Receive request
+               input := proto.Clone(methodInfo.InputType)
+               if err := stream.RecvMsg(input); err != nil {
+                       return fmt.Errorf("failed to receive message: %w", err)
+               }
+
+               // Convert to JSON
+               reqData, err := json.Marshal(input)
+               if err != nil {
+                       return fmt.Errorf("failed to marshal request: %w", err)
+               }
+
+               // Create common message
+               msg := &common.Message{
+                       Method:  method,
+                       Payload: reqData,
+                       Headers: extractMetadata(stream.Context()),
+               }
+
+               // Create stream writer
+               streamWriter := NewGRPCStreamWriter(stream, 
methodInfo.OutputType)
+
+               // Call handler
+               return handler(stream.Context(), msg, streamWriter)
+       }
+}
+
+// GetServer returns the underlying gRPC server
+func (s *Server) GetServer() *grpc.Server {
+       return s.server
+}
+
+// extractMetadata extracts gRPC metadata as headers
+func extractMetadata(ctx context.Context) map[string]string {
+       headers := make(map[string]string)
+
+       if md, ok := metadata.FromIncomingContext(ctx); ok {
+               for k, values := range md {
+                       if len(values) > 0 {
+                               headers[k] = values[0]
+                       }
+               }
+       }
+
+       return headers
+}
diff --git a/transport/grpc/stream_reader.go b/transport/grpc/stream_reader.go
new file mode 100644
index 00000000..b1fb7ce1
--- /dev/null
+++ b/transport/grpc/stream_reader.go
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "io"
+       "sync/atomic"
+
+       "google.golang.org/grpc"
+       "google.golang.org/protobuf/proto"
+
+       "seata-go-ai-transport/common"
+)
+
+// GRPCStreamReader implements common.StreamReader for gRPC streams
+type GRPCStreamReader struct {
+       ctx        context.Context
+       stream     grpc.ClientStream
+       outputType proto.Message
+       closed     int32
+}
+
+var _ common.StreamReader = (*GRPCStreamReader)(nil)
+
+// NewGRPCStreamReader creates a new gRPC stream reader
+func NewGRPCStreamReader(ctx context.Context, stream grpc.ClientStream, 
outputType proto.Message) *GRPCStreamReader {
+       return &GRPCStreamReader{
+               ctx:        ctx,
+               stream:     stream,
+               outputType: outputType,
+       }
+}
+
+// Recv receives the next message from the stream
+func (r *GRPCStreamReader) Recv() (*common.StreamResponse, error) {
+       if atomic.LoadInt32(&r.closed) == 1 {
+               return nil, common.ErrStreamClosed
+       }
+
+       // Create output message instance
+       output := proto.Clone(r.outputType)
+
+       // Receive message from stream
+       err := r.stream.RecvMsg(output)
+       if err != nil {
+               if err == io.EOF {
+                       return &common.StreamResponse{
+                               Done: true,
+                       }, nil
+               }
+               return &common.StreamResponse{
+                       Error: err,
+                       Done:  true,
+               }, nil
+       }
+
+       // Marshal message to JSON
+       data, err := json.Marshal(output)
+       if err != nil {
+               return &common.StreamResponse{
+                       Error: fmt.Errorf("failed to marshal response: %w", 
err),
+                       Done:  true,
+               }, nil
+       }
+
+       return &common.StreamResponse{
+               Data:    data,
+               Headers: r.extractHeaders(),
+       }, nil
+}
+
+// Close closes the stream
+func (r *GRPCStreamReader) Close() error {
+       if !atomic.CompareAndSwapInt32(&r.closed, 0, 1) {
+               return common.ErrStreamClosed
+       }
+
+       return r.stream.CloseSend()
+}
+
+// extractHeaders extracts headers from the gRPC stream
+func (r *GRPCStreamReader) extractHeaders() map[string]string {
+       headers := make(map[string]string)
+
+       // Extract metadata from stream
+       if md, err := r.stream.Header(); err == nil {
+               for k, values := range md {
+                       if len(values) > 0 {
+                               headers[k] = values[0]
+                       }
+               }
+       }
+
+       return headers
+}
diff --git a/transport/grpc/stream_writer.go b/transport/grpc/stream_writer.go
new file mode 100644
index 00000000..f57c481e
--- /dev/null
+++ b/transport/grpc/stream_writer.go
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+       "encoding/json"
+       "fmt"
+       "sync"
+       "sync/atomic"
+
+       "google.golang.org/grpc"
+       "google.golang.org/protobuf/proto"
+
+       "seata-go-ai-transport/common"
+)
+
+// GRPCStreamWriter implements common.StreamWriter for gRPC streams
+type GRPCStreamWriter struct {
+       stream     grpc.ServerStream
+       outputType proto.Message
+       closed     int32
+       mu         sync.Mutex
+}
+
+var _ common.StreamWriter = (*GRPCStreamWriter)(nil)
+
+// NewGRPCStreamWriter creates a new gRPC stream writer
+func NewGRPCStreamWriter(stream grpc.ServerStream, outputType proto.Message) 
*GRPCStreamWriter {
+       return &GRPCStreamWriter{
+               stream:     stream,
+               outputType: outputType,
+       }
+}
+
+// Send sends a message to the stream
+func (w *GRPCStreamWriter) Send(resp *common.StreamResponse) error {
+       if atomic.LoadInt32(&w.closed) == 1 {
+               return common.ErrStreamClosed
+       }
+
+       w.mu.Lock()
+       defer w.mu.Unlock()
+
+       // Handle error response
+       if resp.Error != nil {
+               return resp.Error
+       }
+
+       // If marked as done with no data, don't send anything
+       if resp.Done && len(resp.Data) == 0 {
+               return nil
+       }
+
+       // Create output message
+       output := proto.Clone(w.outputType)
+
+       // Unmarshal response data if present
+       if len(resp.Data) > 0 {
+               if err := json.Unmarshal(resp.Data, output); err != nil {
+                       return fmt.Errorf("failed to unmarshal response data: 
%w", err)
+               }
+       }
+
+       // Send message
+       if err := w.stream.SendMsg(output); err != nil {
+               return fmt.Errorf("failed to send message: %w", err)
+       }
+
+       return nil
+}
+
+// Close closes the stream
+func (w *GRPCStreamWriter) Close() error {
+       if !atomic.CompareAndSwapInt32(&w.closed, 0, 1) {
+               return common.ErrStreamClosed
+       }
+
+       // gRPC server streams are automatically closed when the handler returns
+       // No explicit close action needed
+       return nil
+}
+
+// SendError sends an error through the stream
+func (w *GRPCStreamWriter) SendError(err error) error {
+       return w.Send(&common.StreamResponse{
+               Error: err,
+               Done:  true,
+       })
+}
+
+// SendData sends data through the stream
+func (w *GRPCStreamWriter) SendData(data interface{}, done bool) error {
+       var jsonData []byte
+       var err error
+
+       if data != nil {
+               jsonData, err = json.Marshal(data)
+               if err != nil {
+                       return fmt.Errorf("failed to marshal data: %w", err)
+               }
+       }
+
+       return w.Send(&common.StreamResponse{
+               Data: jsonData,
+               Done: done,
+       })
+}
diff --git a/transport/jsonrpc/builder.go b/transport/jsonrpc/builder.go
new file mode 100644
index 00000000..74807bd1
--- /dev/null
+++ b/transport/jsonrpc/builder.go
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+       "encoding/json"
+       "fmt"
+       "time"
+
+       "seata-go-ai-transport/common"
+)
+
+// ClientBuilder implements common.ClientBuilder for JSON-RPC
+type ClientBuilder struct{}
+
+var _ common.ClientBuilder = (*ClientBuilder)(nil)
+
+// Build creates a new JSON-RPC client with the given config
+func (b *ClientBuilder) Build(config *common.Config) (common.Client, error) {
+       if config.Protocol != common.ProtocolJSONRPC {
+               return nil, fmt.Errorf("invalid protocol %s for JSON-RPC 
client", config.Protocol)
+       }
+
+       clientConfig := &ClientConfig{
+               Endpoint: config.Address,
+               Timeout:  30 * time.Second,
+               Headers:  make(map[string]string),
+       }
+
+       // Parse options
+       if config.Options != nil {
+               if endpoint, ok := config.Options["endpoint"].(string); ok {
+                       clientConfig.Endpoint = endpoint
+               }
+               if timeoutStr, ok := config.Options["timeout"].(string); ok {
+                       if timeout, err := time.ParseDuration(timeoutStr); err 
== nil {
+                               clientConfig.Timeout = timeout
+                       }
+               }
+               if timeout, ok := config.Options["timeout"].(time.Duration); ok 
{
+                       clientConfig.Timeout = timeout
+               }
+               if headers, ok := 
config.Options["headers"].(map[string]interface{}); ok {
+                       for k, v := range headers {
+                               if s, ok := v.(string); ok {
+                                       clientConfig.Headers[k] = s
+                               }
+                       }
+               }
+               if headers, ok := 
config.Options["headers"].(map[string]string); ok {
+                       for k, v := range headers {
+                               clientConfig.Headers[k] = v
+                       }
+               }
+       }
+
+       // Use address as endpoint if not specified in options
+       if clientConfig.Endpoint == "" {
+               clientConfig.Endpoint = config.Address
+       }
+
+       return NewClient(clientConfig), nil
+}
+
+// Protocol returns the protocol this builder supports
+func (b *ClientBuilder) Protocol() common.Protocol {
+       return common.ProtocolJSONRPC
+}
+
+// ServerBuilder implements common.ServerBuilder for JSON-RPC
+type ServerBuilder struct{}
+
+var _ common.ServerBuilder = (*ServerBuilder)(nil)
+
+// Build creates a new JSON-RPC server with the given config
+func (b *ServerBuilder) Build(config *common.Config) (common.Server, error) {
+       if config.Protocol != common.ProtocolJSONRPC {
+               return nil, fmt.Errorf("invalid protocol %s for JSON-RPC 
server", config.Protocol)
+       }
+
+       serverConfig := &ServerConfig{
+               Address:      config.Address,
+               ReadTimeout:  30 * time.Second,
+               WriteTimeout: 30 * time.Second,
+               IdleTimeout:  120 * time.Second,
+       }
+
+       // Parse options
+       if config.Options != nil {
+               if readTimeoutStr, ok := 
config.Options["read_timeout"].(string); ok {
+                       if timeout, err := time.ParseDuration(readTimeoutStr); 
err == nil {
+                               serverConfig.ReadTimeout = timeout
+                       }
+               }
+               if readTimeout, ok := 
config.Options["read_timeout"].(time.Duration); ok {
+                       serverConfig.ReadTimeout = readTimeout
+               }
+               if writeTimeoutStr, ok := 
config.Options["write_timeout"].(string); ok {
+                       if timeout, err := time.ParseDuration(writeTimeoutStr); 
err == nil {
+                               serverConfig.WriteTimeout = timeout
+                       }
+               }
+               if writeTimeout, ok := 
config.Options["write_timeout"].(time.Duration); ok {
+                       serverConfig.WriteTimeout = writeTimeout
+               }
+               if idleTimeoutStr, ok := 
config.Options["idle_timeout"].(string); ok {
+                       if timeout, err := time.ParseDuration(idleTimeoutStr); 
err == nil {
+                               serverConfig.IdleTimeout = timeout
+                       }
+               }
+               if idleTimeout, ok := 
config.Options["idle_timeout"].(time.Duration); ok {
+                       serverConfig.IdleTimeout = idleTimeout
+               }
+       }
+
+       return NewServer(serverConfig), nil
+}
+
+// Protocol returns the protocol this builder supports
+func (b *ServerBuilder) Protocol() common.Protocol {
+       return common.ProtocolJSONRPC
+}
+
+// NewClientConfig creates a client config from JSON
+func NewClientConfig(data []byte) (*ClientConfig, error) {
+       var config ClientConfig
+       if err := json.Unmarshal(data, &config); err != nil {
+               return nil, fmt.Errorf("failed to unmarshal client config: %w", 
err)
+       }
+       return &config, nil
+}
+
+// NewServerConfig creates a server config from JSON
+func NewServerConfig(data []byte) (*ServerConfig, error) {
+       var config ServerConfig
+       if err := json.Unmarshal(data, &config); err != nil {
+               return nil, fmt.Errorf("failed to unmarshal server config: %w", 
err)
+       }
+       return &config, nil
+}
diff --git a/transport/jsonrpc/client.go b/transport/jsonrpc/client.go
new file mode 100644
index 00000000..df1f7e16
--- /dev/null
+++ b/transport/jsonrpc/client.go
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+       "bytes"
+       "context"
+       "encoding/json"
+       "fmt"
+       "io"
+       "net/http"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "seata-go-ai-transport/common"
+)
+
+// Client implements the common.Client interface for JSON-RPC
+type Client struct {
+       endpoint   string
+       httpClient *http.Client
+       headers    map[string]string
+       idCounter  int64
+       closed     int32
+       mu         sync.RWMutex
+}
+
+// ClientConfig represents JSON-RPC client configuration
+type ClientConfig struct {
+       Endpoint string            `json:"endpoint"`
+       Timeout  time.Duration     `json:"timeout"`
+       Headers  map[string]string `json:"headers"`
+}
+
+var _ common.Client = (*Client)(nil)
+
+// NewClient creates a new JSON-RPC client
+func NewClient(config *ClientConfig) *Client {
+       timeout := config.Timeout
+       if timeout == 0 {
+               timeout = 30 * time.Second
+       }
+
+       headers := make(map[string]string)
+       if config.Headers != nil {
+               for k, v := range config.Headers {
+                       headers[k] = v
+               }
+       }
+
+       // Set default content type if not specified
+       if _, exists := headers["Content-Type"]; !exists {
+               headers["Content-Type"] = "application/json"
+       }
+
+       return &Client{
+               endpoint: config.Endpoint,
+               httpClient: &http.Client{
+                       Timeout: timeout,
+               },
+               headers: headers,
+       }
+}
+
+// Call makes a unary RPC call
+func (c *Client) Call(ctx context.Context, msg *common.Message) 
(*common.Response, error) {
+       if atomic.LoadInt32(&c.closed) == 1 {
+               return nil, common.ErrClientClosed
+       }
+
+       // Generate unique ID for the request
+       id := atomic.AddInt64(&c.idCounter, 1)
+
+       // Create JSON-RPC request
+       var params interface{}
+       if len(msg.Payload) > 0 {
+               if err := json.Unmarshal(msg.Payload, &params); err != nil {
+                       return nil, fmt.Errorf("failed to unmarshal payload: 
%w", err)
+               }
+       }
+
+       req, err := NewRequest(msg.Method, params, id)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create request: %w", err)
+       }
+
+       // Marshal request to JSON
+       reqData, err := json.Marshal(req)
+       if err != nil {
+               return nil, fmt.Errorf("failed to marshal request: %w", err)
+       }
+
+       // Create HTTP request
+       httpReq, err := http.NewRequestWithContext(ctx, "POST", c.endpoint, 
bytes.NewReader(reqData))
+       if err != nil {
+               return nil, fmt.Errorf("failed to create HTTP request: %w", err)
+       }
+
+       // Set headers
+       c.mu.RLock()
+       for k, v := range c.headers {
+               httpReq.Header.Set(k, v)
+       }
+       // Add message headers
+       for k, v := range msg.Headers {
+               httpReq.Header.Set(k, v)
+       }
+       c.mu.RUnlock()
+
+       // Send request
+       httpResp, err := c.httpClient.Do(httpReq)
+       if err != nil {
+               return nil, fmt.Errorf("failed to send request: %w", err)
+       }
+       defer func(Body io.ReadCloser) {
+               _ = Body.Close()
+       }(httpResp.Body)
+
+       // Read response body
+       respData, err := io.ReadAll(httpResp.Body)
+       if err != nil {
+               return nil, fmt.Errorf("failed to read response: %w", err)
+       }
+
+       // Check HTTP status
+       if httpResp.StatusCode != http.StatusOK {
+               return nil, fmt.Errorf("HTTP error: %d %s", 
httpResp.StatusCode, string(respData))
+       }
+
+       // Parse JSON-RPC response
+       var jsonResp JSONRPCResponse
+       if err := json.Unmarshal(respData, &jsonResp); err != nil {
+               return nil, fmt.Errorf("failed to unmarshal response: %w", err)
+       }
+
+       // Create common response
+       resp := &common.Response{
+               Headers: make(map[string]string),
+       }
+
+       // Copy HTTP headers to response headers
+       for k, values := range httpResp.Header {
+               if len(values) > 0 {
+                       resp.Headers[k] = values[0]
+               }
+       }
+
+       // Handle JSON-RPC error
+       if jsonResp.Error != nil {
+               resp.Error = jsonResp.Error
+               return resp, nil
+       }
+
+       // Set response data
+       resp.Data = jsonResp.Result
+
+       return resp, nil
+}
+
+// Stream makes a streaming RPC call (not supported for basic JSON-RPC)
+func (c *Client) Stream(ctx context.Context, msg *common.Message) 
(common.StreamReader, error) {
+       return nil, fmt.Errorf("streaming not supported by basic JSON-RPC 
client, use SSE client instead")
+}
+
+// Protocol returns the underlying protocol
+func (c *Client) Protocol() common.Protocol {
+       return common.ProtocolJSONRPC
+}
+
+// Close closes the client connection
+func (c *Client) Close() error {
+       if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
+               return common.ErrClientClosed
+       }
+
+       c.httpClient.CloseIdleConnections()
+       return nil
+}
+
+// SetHeader sets a header for all requests
+func (c *Client) SetHeader(key, value string) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       c.headers[key] = value
+}
+
+// RemoveHeader removes a header
+func (c *Client) RemoveHeader(key string) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       delete(c.headers, key)
+}
diff --git a/transport/jsonrpc/jsonrpc.go b/transport/jsonrpc/jsonrpc.go
new file mode 100644
index 00000000..fcc5915b
--- /dev/null
+++ b/transport/jsonrpc/jsonrpc.go
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+       "errors"
+       "seata-go-ai-transport/common"
+)
+
+// Package initialization - register builders with the common registry
+func init() {
+       // Register SSE-enabled builders as the default for JSON-RPC
+       // These support both regular and streaming operations
+       common.RegisterClientBuilder(common.ProtocolJSONRPC, 
&SSEClientBuilder{})
+       common.RegisterServerBuilder(common.ProtocolJSONRPC, 
&SSEServerBuilder{})
+}
+
+// Public constructors for direct usage
+
+// NewJSONRPCClient creates a new JSON-RPC client directly
+func NewJSONRPCClient(endpoint string) *Client {
+       return NewClient(&ClientConfig{
+               Endpoint: endpoint,
+               Headers:  make(map[string]string),
+       })
+}
+
+// NewJSONRPCServer creates a new JSON-RPC server directly
+func NewJSONRPCServer(address string) *Server {
+       return NewServer(&ServerConfig{
+               Address: address,
+       })
+}
+
+// Utility functions for working with JSON-RPC
+
+// IsJSONRPCError checks if an error is a JSON-RPC error
+func IsJSONRPCError(err error) bool {
+       var JSONRPCError *JSONRPCError
+       ok := errors.As(err, &JSONRPCError)
+       return ok
+}
+
+// GetJSONRPCError extracts JSON-RPC error from error
+func GetJSONRPCError(err error) (*JSONRPCError, bool) {
+       var jsonErr *JSONRPCError
+       ok := errors.As(err, &jsonErr)
+       return jsonErr, ok
+}
+
+// CreateTransportError creates a transport error from JSON-RPC error
+func CreateTransportError(jsonErr *JSONRPCError) *common.TransportError {
+       return common.NewTransportError(
+               common.ProtocolJSONRPC,
+               jsonErr.Code,
+               jsonErr.Message,
+               jsonErr,
+       )
+}
diff --git a/transport/jsonrpc/server.go b/transport/jsonrpc/server.go
new file mode 100644
index 00000000..e6d7a771
--- /dev/null
+++ b/transport/jsonrpc/server.go
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "io"
+       "net"
+       "net/http"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "seata-go-ai-transport/common"
+)
+
+// Server implements the common.Server interface for JSON-RPC
+type Server struct {
+       address        string
+       server         *http.Server
+       handlers       map[string]common.Handler
+       streamHandlers map[string]common.StreamHandler
+       mu             sync.RWMutex
+       started        int32
+       closed         int32
+}
+
+// ServerConfig represents JSON-RPC server configuration
+type ServerConfig struct {
+       Address      string        `json:"address"`
+       ReadTimeout  time.Duration `json:"read_timeout"`
+       WriteTimeout time.Duration `json:"write_timeout"`
+       IdleTimeout  time.Duration `json:"idle_timeout"`
+}
+
+var _ common.Server = (*Server)(nil)
+
+// NewServer creates a new JSON-RPC server
+func NewServer(config *ServerConfig) *Server {
+       mux := http.NewServeMux()
+       server := &Server{
+               address:        config.Address,
+               handlers:       make(map[string]common.Handler),
+               streamHandlers: make(map[string]common.StreamHandler),
+               server: &http.Server{
+                       Addr:         config.Address,
+                       Handler:      mux,
+                       ReadTimeout:  config.ReadTimeout,
+                       WriteTimeout: config.WriteTimeout,
+                       IdleTimeout:  config.IdleTimeout,
+               },
+       }
+
+       // Register JSON-RPC endpoint
+       mux.HandleFunc("/", server.handleJSONRPC)
+
+       return server
+}
+
+// Serve starts the server
+func (s *Server) Serve() error {
+       if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
+               return fmt.Errorf("server already started")
+       }
+
+       listener, err := net.Listen("tcp", s.address)
+       if err != nil {
+               atomic.StoreInt32(&s.started, 0)
+               return fmt.Errorf("failed to listen on %s: %w", s.address, err)
+       }
+
+       return s.server.Serve(listener)
+}
+
+// Stop stops the server gracefully
+func (s *Server) Stop(ctx context.Context) error {
+       if !atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
+               return common.ErrServerClosed
+       }
+
+       return s.server.Shutdown(ctx)
+}
+
+// RegisterHandler registers a handler for a method
+func (s *Server) RegisterHandler(method string, handler common.Handler) {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       s.handlers[method] = handler
+}
+
+// RegisterStreamHandler registers a streaming handler
+func (s *Server) RegisterStreamHandler(method string, handler 
common.StreamHandler) {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       s.streamHandlers[method] = handler
+}
+
+// Protocol returns the underlying protocol
+func (s *Server) Protocol() common.Protocol {
+       return common.ProtocolJSONRPC
+}
+
+// handleJSONRPC handles JSON-RPC requests
+func (s *Server) handleJSONRPC(w http.ResponseWriter, r *http.Request) {
+       if r.Method != http.MethodPost {
+               http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+               return
+       }
+
+       // Set response headers
+       w.Header().Set("Content-Type", "application/json")
+
+       // Read request body
+       body, err := io.ReadAll(r.Body)
+       if err != nil {
+               s.writeErrorResponse(w, ParseError, nil)
+               return
+       }
+
+       // Parse JSON-RPC request
+       var req JSONRPCRequest
+       if err := json.Unmarshal(body, &req); err != nil {
+               s.writeErrorResponse(w, ParseError, nil)
+               return
+       }
+
+       // Validate JSON-RPC version
+       if req.JSONRPC != Version {
+               s.writeErrorResponse(w, InvalidRequest, req.ID)
+               return
+       }
+
+       // Handle request
+       s.handleRequest(w, r, &req)
+}
+
+// handleRequest handles a single JSON-RPC request
+func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request, req 
*JSONRPCRequest) {
+       // Get handler
+       s.mu.RLock()
+       handler, exists := s.handlers[req.Method]
+       s.mu.RUnlock()
+
+       if !exists {
+               if !req.IsNotification() {
+                       s.writeErrorResponse(w, MethodNotFound, req.ID)
+               }
+               return
+       }
+
+       // Create common message
+       msg := &common.Message{
+               Method:  req.Method,
+               Payload: req.Params,
+               Headers: make(map[string]string),
+       }
+
+       // Copy HTTP headers
+       for k, values := range r.Header {
+               if len(values) > 0 {
+                       msg.Headers[k] = values[0]
+               }
+       }
+
+       // Call handler
+       resp, err := handler(r.Context(), msg)
+       if err != nil {
+               if !req.IsNotification() {
+                       // Convert error to JSON-RPC error
+                       var jsonErr *JSONRPCError
+                       if transportErr, ok := common.GetTransportError(err); 
ok {
+                               jsonErr = NewJSONRPCError(transportErr.Code, 
transportErr.Message, transportErr.Cause)
+                       } else {
+                               jsonErr = NewJSONRPCError(InternalErrorCode, 
err.Error(), nil)
+                       }
+                       s.writeErrorResponse(w, jsonErr, req.ID)
+               }
+               return
+       }
+
+       // For notifications, don't send response
+       if req.IsNotification() {
+               w.WriteHeader(http.StatusNoContent)
+               return
+       }
+
+       // Create response
+       var result interface{}
+       if len(resp.Data) > 0 {
+               if err := json.Unmarshal(resp.Data, &result); err != nil {
+                       s.writeErrorResponse(w, 
NewJSONRPCError(InternalErrorCode, "Failed to unmarshal result", err.Error()), 
req.ID)
+                       return
+               }
+       }
+
+       jsonResp, err := NewResponse(result, req.ID)
+       if err != nil {
+               s.writeErrorResponse(w, NewJSONRPCError(InternalErrorCode, 
"Failed to create response", err.Error()), req.ID)
+               return
+       }
+
+       // Set response headers
+       for k, v := range resp.Headers {
+               w.Header().Set(k, v)
+       }
+
+       // Write response
+       respData, err := json.Marshal(jsonResp)
+       if err != nil {
+               s.writeErrorResponse(w, NewJSONRPCError(InternalErrorCode, 
"Failed to marshal response", err.Error()), req.ID)
+               return
+       }
+
+       w.WriteHeader(http.StatusOK)
+       w.Write(respData)
+}
+
+// writeErrorResponse writes a JSON-RPC error response
+func (s *Server) writeErrorResponse(w http.ResponseWriter, jsonErr 
*JSONRPCError, id interface{}) {
+       resp := NewErrorResponse(jsonErr, id)
+       data, err := json.Marshal(resp)
+       if err != nil {
+               // If we can't marshal the error response, send a basic HTTP 
error
+               http.Error(w, "Internal Server Error", 
http.StatusInternalServerError)
+               return
+       }
+
+       w.WriteHeader(http.StatusOK) // JSON-RPC errors are still HTTP 200
+       w.Write(data)
+}
diff --git a/transport/jsonrpc/sse_builder.go b/transport/jsonrpc/sse_builder.go
new file mode 100644
index 00000000..ed081b7d
--- /dev/null
+++ b/transport/jsonrpc/sse_builder.go
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+       "fmt"
+       "time"
+
+       "seata-go-ai-transport/common"
+)
+
+// SSEClientBuilder implements common.ClientBuilder for SSE-enabled JSON-RPC
+type SSEClientBuilder struct{}
+
+var _ common.ClientBuilder = (*SSEClientBuilder)(nil)
+
+// Build creates a new SSE JSON-RPC client with the given config
+func (b *SSEClientBuilder) Build(config *common.Config) (common.Client, error) 
{
+       if config.Protocol != common.ProtocolJSONRPC {
+               return nil, fmt.Errorf("invalid protocol %s for SSE JSON-RPC 
client", config.Protocol)
+       }
+
+       clientConfig := &ClientConfig{
+               Endpoint: config.Address,
+               Timeout:  30 * time.Second,
+               Headers:  make(map[string]string),
+       }
+
+       sseConfig := &SSEClientConfig{
+               ClientConfig: clientConfig,
+       }
+
+       // Parse options
+       if config.Options != nil {
+               if endpoint, ok := config.Options["endpoint"].(string); ok {
+                       clientConfig.Endpoint = endpoint
+               }
+               if streamEndpoint, ok := 
config.Options["stream_endpoint"].(string); ok {
+                       sseConfig.StreamEndpoint = streamEndpoint
+               }
+               if timeoutStr, ok := config.Options["timeout"].(string); ok {
+                       if timeout, err := time.ParseDuration(timeoutStr); err 
== nil {
+                               clientConfig.Timeout = timeout
+                       }
+               }
+               if timeout, ok := config.Options["timeout"].(time.Duration); ok 
{
+                       clientConfig.Timeout = timeout
+               }
+               if headers, ok := 
config.Options["headers"].(map[string]interface{}); ok {
+                       for k, v := range headers {
+                               if s, ok := v.(string); ok {
+                                       clientConfig.Headers[k] = s
+                               }
+                       }
+               }
+               if headers, ok := 
config.Options["headers"].(map[string]string); ok {
+                       for k, v := range headers {
+                               clientConfig.Headers[k] = v
+                       }
+               }
+               // Support streaming flag to determine if SSE should be used
+               if streaming, ok := config.Options["streaming"].(bool); ok && 
streaming {
+                       return NewSSEClient(sseConfig), nil
+               }
+       }
+
+       // Use address as endpoint if not specified in options
+       if clientConfig.Endpoint == "" {
+               clientConfig.Endpoint = config.Address
+       }
+
+       // Return SSE client by default for JSON-RPC with streaming capability
+       return NewSSEClient(sseConfig), nil
+}
+
+// Protocol returns the protocol this builder supports
+func (b *SSEClientBuilder) Protocol() common.Protocol {
+       return common.ProtocolJSONRPC
+}
+
+// SSEServerBuilder implements common.ServerBuilder for SSE-enabled JSON-RPC
+type SSEServerBuilder struct{}
+
+var _ common.ServerBuilder = (*SSEServerBuilder)(nil)
+
+// Build creates a new SSE JSON-RPC server with the given config
+func (b *SSEServerBuilder) Build(config *common.Config) (common.Server, error) 
{
+       if config.Protocol != common.ProtocolJSONRPC {
+               return nil, fmt.Errorf("invalid protocol %s for SSE JSON-RPC 
server", config.Protocol)
+       }
+
+       serverConfig := &ServerConfig{
+               Address:      config.Address,
+               ReadTimeout:  30 * time.Second,
+               WriteTimeout: 30 * time.Second,
+               IdleTimeout:  120 * time.Second,
+       }
+
+       sseConfig := &SSEServerConfig{
+               ServerConfig: serverConfig,
+       }
+
+       // Parse options
+       if config.Options != nil {
+               if streamPath, ok := config.Options["stream_path"].(string); ok 
{
+                       sseConfig.StreamPath = streamPath
+               }
+               if readTimeoutStr, ok := 
config.Options["read_timeout"].(string); ok {
+                       if timeout, err := time.ParseDuration(readTimeoutStr); 
err == nil {
+                               serverConfig.ReadTimeout = timeout
+                       }
+               }
+               if readTimeout, ok := 
config.Options["read_timeout"].(time.Duration); ok {
+                       serverConfig.ReadTimeout = readTimeout
+               }
+               if writeTimeoutStr, ok := 
config.Options["write_timeout"].(string); ok {
+                       if timeout, err := time.ParseDuration(writeTimeoutStr); 
err == nil {
+                               serverConfig.WriteTimeout = timeout
+                       }
+               }
+               if writeTimeout, ok := 
config.Options["write_timeout"].(time.Duration); ok {
+                       serverConfig.WriteTimeout = writeTimeout
+               }
+               if idleTimeoutStr, ok := 
config.Options["idle_timeout"].(string); ok {
+                       if timeout, err := time.ParseDuration(idleTimeoutStr); 
err == nil {
+                               serverConfig.IdleTimeout = timeout
+                       }
+               }
+               if idleTimeout, ok := 
config.Options["idle_timeout"].(time.Duration); ok {
+                       serverConfig.IdleTimeout = idleTimeout
+               }
+               // Support streaming flag to determine if SSE should be used
+               if streaming, ok := config.Options["streaming"].(bool); ok && 
streaming {
+                       return NewSSEServer(sseConfig), nil
+               }
+       }
+
+       // Return SSE server by default for JSON-RPC with streaming capability
+       return NewSSEServer(sseConfig), nil
+}
+
+// Protocol returns the protocol this builder supports
+func (b *SSEServerBuilder) Protocol() common.Protocol {
+       return common.ProtocolJSONRPC
+}
+
+// Convenience constructors
+
+// NewSSEJSONRPCClient creates a new SSE JSON-RPC client directly
+func NewSSEJSONRPCClient(endpoint string, streamEndpoint string) *SSEClient {
+       config := &SSEClientConfig{
+               ClientConfig: &ClientConfig{
+                       Endpoint: endpoint,
+                       Headers:  make(map[string]string),
+               },
+               StreamEndpoint: streamEndpoint,
+       }
+       return NewSSEClient(config)
+}
+
+// NewSSEJSONRPCServer creates a new SSE JSON-RPC server directly
+func NewSSEJSONRPCServer(address string, streamPath string) *SSEServer {
+       config := &SSEServerConfig{
+               ServerConfig: &ServerConfig{
+                       Address: address,
+               },
+               StreamPath: streamPath,
+       }
+       return NewSSEServer(config)
+}
diff --git a/transport/jsonrpc/sse_client.go b/transport/jsonrpc/sse_client.go
new file mode 100644
index 00000000..4f22916c
--- /dev/null
+++ b/transport/jsonrpc/sse_client.go
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+       "bytes"
+       "context"
+       "encoding/json"
+       "fmt"
+       "net/http"
+       "sync"
+       "sync/atomic"
+
+       "seata-go-ai-transport/common"
+)
+
+// SSEClient implements streaming JSON-RPC over Server-Sent Events
+type SSEClient struct {
+       *Client
+       streamEndpoint string
+}
+
+// SSEClientConfig extends ClientConfig with streaming endpoint
+type SSEClientConfig struct {
+       *ClientConfig
+       StreamEndpoint string `json:"stream_endpoint"`
+}
+
+var _ common.Client = (*SSEClient)(nil)
+
+// NewSSEClient creates a new SSE-enabled JSON-RPC client
+func NewSSEClient(config *SSEClientConfig) *SSEClient {
+       client := NewClient(config.ClientConfig)
+
+       streamEndpoint := config.StreamEndpoint
+       if streamEndpoint == "" {
+               streamEndpoint = config.Endpoint + "/stream"
+       }
+
+       return &SSEClient{
+               Client:         client,
+               streamEndpoint: streamEndpoint,
+       }
+}
+
+// Stream makes a streaming RPC call using SSE
+func (c *SSEClient) Stream(ctx context.Context, msg *common.Message) 
(common.StreamReader, error) {
+       if atomic.LoadInt32(&c.closed) == 1 {
+               return nil, common.ErrClientClosed
+       }
+
+       // Generate unique ID for the request
+       id := atomic.AddInt64(&c.idCounter, 1)
+
+       // Create JSON-RPC request
+       var params interface{}
+       if len(msg.Payload) > 0 {
+               if err := json.Unmarshal(msg.Payload, &params); err != nil {
+                       return nil, fmt.Errorf("failed to unmarshal payload: 
%w", err)
+               }
+       }
+
+       req, err := NewRequest(msg.Method, params, id)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create request: %w", err)
+       }
+
+       // Marshal request to JSON
+       reqData, err := json.Marshal(req)
+       if err != nil {
+               return nil, fmt.Errorf("failed to marshal request: %w", err)
+       }
+
+       // Create HTTP request for streaming
+       httpReq, err := http.NewRequestWithContext(ctx, "POST", 
c.streamEndpoint, bytes.NewReader(reqData))
+       if err != nil {
+               return nil, fmt.Errorf("failed to create HTTP request: %w", err)
+       }
+
+       // Set headers for SSE
+       c.mu.RLock()
+       for k, v := range c.headers {
+               httpReq.Header.Set(k, v)
+       }
+       // Add message headers
+       for k, v := range msg.Headers {
+               httpReq.Header.Set(k, v)
+       }
+       c.mu.RUnlock()
+
+       httpReq.Header.Set("Accept", "text/event-stream")
+       httpReq.Header.Set("Cache-Control", "no-cache")
+
+       // Send request
+       httpResp, err := c.httpClient.Do(httpReq)
+       if err != nil {
+               return nil, fmt.Errorf("failed to send request: %w", err)
+       }
+
+       // Check HTTP status
+       if httpResp.StatusCode != http.StatusOK {
+               _ = httpResp.Body.Close()
+               return nil, fmt.Errorf("HTTP error: %d", httpResp.StatusCode)
+       }
+
+       // Create stream reader
+       return NewSSEStreamReader(ctx, httpResp), nil
+}
+
+// SSEStreamReader implements common.StreamReader for SSE
+type SSEStreamReader struct {
+       ctx    context.Context
+       resp   *http.Response
+       parser *SSEParser
+       closed int32
+       mu     sync.Mutex
+}
+
+var _ common.StreamReader = (*SSEStreamReader)(nil)
+
+// NewSSEStreamReader creates a new SSE stream reader
+func NewSSEStreamReader(ctx context.Context, resp *http.Response) 
*SSEStreamReader {
+       return &SSEStreamReader{
+               ctx:    ctx,
+               resp:   resp,
+               parser: NewSSEParser(ctx, resp.Body),
+       }
+}
+
+// Recv receives the next message from the stream
+func (r *SSEStreamReader) Recv() (*common.StreamResponse, error) {
+       if atomic.LoadInt32(&r.closed) == 1 {
+               return nil, common.ErrStreamClosed
+       }
+
+       event, err := r.parser.NextEvent()
+       if err != nil {
+               return &common.StreamResponse{
+                       Done:  true,
+                       Error: err,
+               }, nil
+       }
+
+       // Parse the SSE data as JSON-RPC response
+       var jsonResp JSONRPCResponse
+       if err := json.Unmarshal([]byte(event.Data), &jsonResp); err != nil {
+               return &common.StreamResponse{
+                       Error: fmt.Errorf("failed to unmarshal SSE data: %w", 
err),
+               }, nil
+       }
+
+       // Create stream response
+       streamResp := &common.StreamResponse{
+               Headers: make(map[string]string),
+       }
+
+       // Copy headers from HTTP response
+       for k, values := range r.resp.Header {
+               if len(values) > 0 {
+                       streamResp.Headers[k] = values[0]
+               }
+       }
+
+       // Add SSE-specific headers
+       if event.ID != "" {
+               streamResp.Headers["SSE-Event-ID"] = event.ID
+       }
+       if event.Event != "" {
+               streamResp.Headers["SSE-Event-Type"] = event.Event
+       }
+
+       // Handle JSON-RPC error
+       if jsonResp.Error != nil {
+               streamResp.Error = jsonResp.Error
+               return streamResp, nil
+       }
+
+       // Set response data
+       streamResp.Data = jsonResp.Result
+
+       // Check for end of stream marker
+       if event.Event == "end" || event.Event == "close" {
+               streamResp.Done = true
+       }
+
+       return streamResp, nil
+}
+
+// Close closes the stream
+func (r *SSEStreamReader) Close() error {
+       if !atomic.CompareAndSwapInt32(&r.closed, 0, 1) {
+               return common.ErrStreamClosed
+       }
+
+       return r.resp.Body.Close()
+}
diff --git a/transport/jsonrpc/sse_server.go b/transport/jsonrpc/sse_server.go
new file mode 100644
index 00000000..91af3227
--- /dev/null
+++ b/transport/jsonrpc/sse_server.go
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io"
+       "net/http"
+       "sync"
+       "sync/atomic"
+
+       "seata-go-ai-transport/common"
+)
+
+// SSEServer extends the basic JSON-RPC server with SSE streaming support
+type SSEServer struct {
+       *Server // Embed regular server
+}
+
+// SSEServerConfig extends ServerConfig
+type SSEServerConfig struct {
+       *ServerConfig
+       StreamPath string `json:"stream_path"`
+}
+
+var _ common.Server = (*SSEServer)(nil)
+
+// NewSSEServer creates a new SSE-enabled JSON-RPC server
+func NewSSEServer(config *SSEServerConfig) *SSEServer {
+       server := NewServer(config.ServerConfig)
+
+       streamPath := config.StreamPath
+       if streamPath == "" {
+               streamPath = "/stream"
+       }
+
+       sseServer := &SSEServer{
+               Server: server,
+       }
+
+       // Add SSE streaming endpoint
+       mux := server.server.Handler.(*http.ServeMux)
+       mux.HandleFunc(streamPath, sseServer.handleSSEStream)
+
+       return sseServer
+}
+
+// handleSSEStream handles SSE streaming requests
+func (s *SSEServer) handleSSEStream(w http.ResponseWriter, r *http.Request) {
+       if r.Method != http.MethodPost {
+               http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+               return
+       }
+
+       // Set SSE headers
+       w.Header().Set("Content-Type", "text/event-stream")
+       w.Header().Set("Cache-Control", "no-cache")
+       w.Header().Set("Connection", "keep-alive")
+       w.Header().Set("Access-Control-Allow-Origin", "*")
+       w.Header().Set("Access-Control-Allow-Headers", "Cache-Control")
+
+       // Read request body
+       body, err := io.ReadAll(r.Body)
+       if err != nil {
+               s.writeSSEError(w, ParseError, nil)
+               return
+       }
+
+       // Parse JSON-RPC request
+       var req JSONRPCRequest
+       if err := json.Unmarshal(body, &req); err != nil {
+               s.writeSSEError(w, ParseError, nil)
+               return
+       }
+
+       // Validate JSON-RPC version
+       if req.JSONRPC != Version {
+               s.writeSSEError(w, InvalidRequest, req.ID)
+               return
+       }
+
+       // Handle streaming request
+       s.handleStreamRequest(w, r, &req)
+}
+
+// handleStreamRequest handles a single streaming JSON-RPC request
+func (s *SSEServer) handleStreamRequest(w http.ResponseWriter, r 
*http.Request, req *JSONRPCRequest) {
+       // Get streaming handler
+       s.mu.RLock()
+       handler, exists := s.streamHandlers[req.Method]
+       s.mu.RUnlock()
+
+       if !exists {
+               s.writeSSEError(w, MethodNotFound, req.ID)
+               return
+       }
+
+       // Create common message
+       msg := &common.Message{
+               Method:  req.Method,
+               Payload: req.Params,
+               Headers: make(map[string]string),
+       }
+
+       // Copy HTTP headers
+       for k, values := range r.Header {
+               if len(values) > 0 {
+                       msg.Headers[k] = values[0]
+               }
+       }
+
+       // Create stream writer
+       streamWriter := NewSSEStreamWriter(w, req.ID)
+
+       // Call streaming handler
+       if err := handler(r.Context(), msg, streamWriter); err != nil {
+               s.writeSSEError(w, NewJSONRPCError(InternalErrorCode, 
err.Error(), nil), req.ID)
+               return
+       }
+
+       // Send end event
+       streamWriter.sendEndEvent()
+}
+
+// writeSSEError writes a JSON-RPC error as SSE event
+func (s *SSEServer) writeSSEError(w http.ResponseWriter, jsonErr 
*JSONRPCError, id interface{}) {
+       resp := NewErrorResponse(jsonErr, id)
+       data, err := json.Marshal(resp)
+       if err != nil {
+               // If we can't marshal the error response, send a basic error 
event
+               event := &SSEEvent{
+                       Event: "error",
+                       Data:  "Internal Server Error",
+               }
+               fmt.Fprint(w, event.String())
+               return
+       }
+
+       event := &SSEEvent{
+               Event: "error",
+               Data:  string(data),
+       }
+       fmt.Fprint(w, event.String())
+
+       if f, ok := w.(http.Flusher); ok {
+               f.Flush()
+       }
+}
+
+// SSEStreamWriter implements common.StreamWriter for SSE
+type SSEStreamWriter struct {
+       writer  http.ResponseWriter
+       flusher http.Flusher
+       id      interface{}
+       eventID int64
+       closed  int32
+       mu      sync.Mutex
+}
+
+var _ common.StreamWriter = (*SSEStreamWriter)(nil)
+
+// NewSSEStreamWriter creates a new SSE stream writer
+func NewSSEStreamWriter(w http.ResponseWriter, id interface{}) 
*SSEStreamWriter {
+       flusher, _ := w.(http.Flusher)
+       return &SSEStreamWriter{
+               writer:  w,
+               flusher: flusher,
+               id:      id,
+       }
+}
+
+// Send sends a message to the stream
+func (w *SSEStreamWriter) Send(resp *common.StreamResponse) error {
+       if atomic.LoadInt32(&w.closed) == 1 {
+               return common.ErrStreamClosed
+       }
+
+       w.mu.Lock()
+       defer w.mu.Unlock()
+
+       // Handle error response
+       if resp.Error != nil {
+               var jsonErr *JSONRPCError
+               var jsonRpcErr *JSONRPCError
+               if errors.As(resp.Error, &jsonRpcErr) {
+                       jsonErr = jsonRpcErr
+               }
+
+               errorResp := NewErrorResponse(jsonErr, w.id)
+               data, err := json.Marshal(errorResp)
+               if err != nil {
+                       return fmt.Errorf("failed to marshal error response: 
%w", err)
+               }
+
+               event := &SSEEvent{
+                       ID:    fmt.Sprintf("%d", atomic.AddInt64(&w.eventID, 
1)),
+                       Event: "error",
+                       Data:  string(data),
+               }
+
+               if _, err := fmt.Fprint(w.writer, event.String()); err != nil {
+                       return fmt.Errorf("failed to write SSE event: %w", err)
+               }
+
+               if w.flusher != nil {
+                       w.flusher.Flush()
+               }
+               return nil
+       }
+
+       // Create JSON-RPC response
+       var result interface{}
+       if len(resp.Data) > 0 {
+               if err := json.Unmarshal(resp.Data, &result); err != nil {
+                       return fmt.Errorf("failed to unmarshal response data: 
%w", err)
+               }
+       }
+
+       jsonResp, err := NewResponse(result, w.id)
+       if err != nil {
+               return fmt.Errorf("failed to create JSON-RPC response: %w", err)
+       }
+
+       data, err := json.Marshal(jsonResp)
+       if err != nil {
+               return fmt.Errorf("failed to marshal JSON-RPC response: %w", 
err)
+       }
+
+       // Determine event type
+       eventType := "data"
+       if resp.Done {
+               eventType = "end"
+       }
+
+       event := &SSEEvent{
+               ID:    fmt.Sprintf("%d", atomic.AddInt64(&w.eventID, 1)),
+               Event: eventType,
+               Data:  string(data),
+       }
+
+       if _, err := fmt.Fprint(w.writer, event.String()); err != nil {
+               return fmt.Errorf("failed to write SSE event: %w", err)
+       }
+
+       if w.flusher != nil {
+               w.flusher.Flush()
+       }
+
+       return nil
+}
+
+// Close closes the stream
+func (w *SSEStreamWriter) Close() error {
+       if !atomic.CompareAndSwapInt32(&w.closed, 0, 1) {
+               return common.ErrStreamClosed
+       }
+
+       w.mu.Lock()
+       defer w.mu.Unlock()
+
+       // Send close event
+       event := &SSEEvent{
+               ID:    fmt.Sprintf("%d", atomic.AddInt64(&w.eventID, 1)),
+               Event: "close",
+               Data:  "",
+       }
+
+       fmt.Fprint(w.writer, event.String())
+
+       if w.flusher != nil {
+               w.flusher.Flush()
+       }
+
+       return nil
+}
+
+// sendEndEvent sends an end event to mark the stream as complete
+func (w *SSEStreamWriter) sendEndEvent() {
+       if atomic.LoadInt32(&w.closed) == 1 {
+               return
+       }
+
+       w.mu.Lock()
+       defer w.mu.Unlock()
+
+       event := &SSEEvent{
+               ID:    fmt.Sprintf("%d", atomic.AddInt64(&w.eventID, 1)),
+               Event: "end",
+               Data:  "",
+       }
+
+       fmt.Fprint(w.writer, event.String())
+
+       if w.flusher != nil {
+               w.flusher.Flush()
+       }
+}
diff --git a/transport/jsonrpc/sse_types.go b/transport/jsonrpc/sse_types.go
new file mode 100644
index 00000000..2be57088
--- /dev/null
+++ b/transport/jsonrpc/sse_types.go
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+       "bufio"
+       "context"
+       "fmt"
+       "io"
+       "strings"
+)
+
+// SSEEvent represents a Server-Sent Event
+type SSEEvent struct {
+       ID    string `json:"id,omitempty"`
+       Event string `json:"event,omitempty"`
+       Data  string `json:"data"`
+       Retry int    `json:"retry,omitempty"`
+}
+
+// String formats the SSE event for transmission
+func (e *SSEEvent) String() string {
+       var sb strings.Builder
+
+       if e.ID != "" {
+               sb.WriteString(fmt.Sprintf("id: %s\n", e.ID))
+       }
+       if e.Event != "" {
+               sb.WriteString(fmt.Sprintf("event: %s\n", e.Event))
+       }
+       if e.Retry > 0 {
+               sb.WriteString(fmt.Sprintf("retry: %d\n", e.Retry))
+       }
+
+       // Handle multi-line data
+       lines := strings.Split(e.Data, "\n")
+       for _, line := range lines {
+               sb.WriteString(fmt.Sprintf("data: %s\n", line))
+       }
+
+       sb.WriteString("\n")
+       return sb.String()
+}
+
+// SSEParser parses Server-Sent Events from a reader
+type SSEParser struct {
+       scanner *bufio.Scanner
+       ctx     context.Context
+}
+
+// NewSSEParser creates a new SSE parser
+func NewSSEParser(ctx context.Context, reader io.Reader) *SSEParser {
+       return &SSEParser{
+               scanner: bufio.NewScanner(reader),
+               ctx:     ctx,
+       }
+}
+
+// NextEvent reads the next SSE event
+func (p *SSEParser) NextEvent() (*SSEEvent, error) {
+       event := &SSEEvent{}
+       var dataLines []string
+
+       for {
+               select {
+               case <-p.ctx.Done():
+                       return nil, p.ctx.Err()
+               default:
+               }
+
+               if !p.scanner.Scan() {
+                       if err := p.scanner.Err(); err != nil {
+                               return nil, fmt.Errorf("scanner error: %w", err)
+                       }
+                       // EOF
+                       if len(dataLines) > 0 || event.ID != "" || event.Event 
!= "" {
+                               event.Data = strings.Join(dataLines, "\n")
+                               return event, nil
+                       }
+                       return nil, io.EOF
+               }
+
+               line := p.scanner.Text()
+
+               // Empty line indicates end of event
+               if line == "" {
+                       if len(dataLines) > 0 || event.ID != "" || event.Event 
!= "" {
+                               event.Data = strings.Join(dataLines, "\n")
+                               return event, nil
+                       }
+                       continue
+               }
+
+               // Skip comments
+               if strings.HasPrefix(line, ":") {
+                       continue
+               }
+
+               // Parse field
+               colonIndex := strings.Index(line, ":")
+               if colonIndex == -1 {
+                       // Field name only
+                       switch line {
+                       case "data":
+                               dataLines = append(dataLines, "")
+                       }
+                       continue
+               }
+
+               field := line[:colonIndex]
+               value := line[colonIndex+1:]
+
+               // Remove leading space from value
+               if len(value) > 0 && value[0] == ' ' {
+                       value = value[1:]
+               }
+
+               switch field {
+               case "id":
+                       event.ID = value
+               case "event":
+                       event.Event = value
+               case "data":
+                       dataLines = append(dataLines, value)
+               case "retry":
+                       // Parse retry value (not implemented in this basic 
version)
+               }
+       }
+}
diff --git a/transport/jsonrpc/types.go b/transport/jsonrpc/types.go
new file mode 100644
index 00000000..f766c5d1
--- /dev/null
+++ b/transport/jsonrpc/types.go
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+       "encoding/json"
+       "fmt"
+)
+
+// Version JSONRPC version constant
+const Version = "2.0"
+
+// JSONRPCRequest represents a JSON-RPC 2.0 request
+type JSONRPCRequest struct {
+       JSONRPC string          `json:"jsonrpc"`
+       Method  string          `json:"method"`
+       Params  json.RawMessage `json:"params,omitempty"`
+       ID      interface{}     `json:"id,omitempty"`
+}
+
+// JSONRPCResponse represents a JSON-RPC 2.0 response
+type JSONRPCResponse struct {
+       JSONRPC string          `json:"jsonrpc"`
+       Result  json.RawMessage `json:"result,omitempty"`
+       Error   *JSONRPCError   `json:"error,omitempty"`
+       ID      interface{}     `json:"id,omitempty"`
+}
+
+// JSONRPCError represents a JSON-RPC 2.0 error
+type JSONRPCError struct {
+       Code    int         `json:"code"`
+       Message string      `json:"message"`
+       Data    interface{} `json:"data,omitempty"`
+}
+
+// Error implements the error interface
+func (e *JSONRPCError) Error() string {
+       if e.Data != nil {
+               return fmt.Sprintf("jsonrpc error [%d]: %s (data: %v)", e.Code, 
e.Message, e.Data)
+       }
+       return fmt.Sprintf("jsonrpc error [%d]: %s", e.Code, e.Message)
+}
+
+// Standard JSON-RPC error codes
+const (
+       ParseErrorCode     = -32700
+       InvalidRequestCode = -32600
+       MethodNotFoundCode = -32601
+       InvalidParamsCode  = -32602
+       InternalErrorCode  = -32603
+       ServerErrorCode    = -32000 // to -32099 range for server errors
+)
+
+// Standard JSON-RPC errors
+var (
+       ParseError     = &JSONRPCError{Code: ParseErrorCode, Message: "Parse 
error"}
+       InvalidRequest = &JSONRPCError{Code: InvalidRequestCode, Message: 
"Invalid Request"}
+       MethodNotFound = &JSONRPCError{Code: MethodNotFoundCode, Message: 
"Method not found"}
+       InvalidParams  = &JSONRPCError{Code: InvalidParamsCode, Message: 
"Invalid params"}
+       InternalError  = &JSONRPCError{Code: InternalErrorCode, Message: 
"Internal error"}
+)
+
+// NewJSONRPCError creates a new JSON-RPC error
+func NewJSONRPCError(code int, message string, data interface{}) *JSONRPCError 
{
+       return &JSONRPCError{
+               Code:    code,
+               Message: message,
+               Data:    data,
+       }
+}
+
+// NewRequest creates a new JSON-RPC request
+func NewRequest(method string, params interface{}, id interface{}) 
(*JSONRPCRequest, error) {
+       var paramsBytes json.RawMessage
+       if params != nil {
+               data, err := json.Marshal(params)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to marshal params: %w", 
err)
+               }
+               paramsBytes = data
+       }
+
+       return &JSONRPCRequest{
+               JSONRPC: Version,
+               Method:  method,
+               Params:  paramsBytes,
+               ID:      id,
+       }, nil
+}
+
+// NewResponse creates a new JSON-RPC response with result
+func NewResponse(result interface{}, id interface{}) (*JSONRPCResponse, error) 
{
+       var resultBytes json.RawMessage
+       if result != nil {
+               data, err := json.Marshal(result)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to marshal result: %w", 
err)
+               }
+               resultBytes = data
+       }
+
+       return &JSONRPCResponse{
+               JSONRPC: Version,
+               Result:  resultBytes,
+               ID:      id,
+       }, nil
+}
+
+// NewErrorResponse creates a new JSON-RPC error response
+func NewErrorResponse(err *JSONRPCError, id interface{}) *JSONRPCResponse {
+       return &JSONRPCResponse{
+               JSONRPC: Version,
+               Error:   err,
+               ID:      id,
+       }
+}
+
+// IsNotification checks if the request is a notification (no ID)
+func (r *JSONRPCRequest) IsNotification() bool {
+       return r.ID == nil
+}
+
+// UnmarshalParams unmarshals the request params into the given value
+func (r *JSONRPCRequest) UnmarshalParams(v interface{}) error {
+       if len(r.Params) == 0 {
+               return nil
+       }
+       return json.Unmarshal(r.Params, v)
+}
+
+// UnmarshalResult unmarshals the response result into the given value
+func (r *JSONRPCResponse) UnmarshalResult(v interface{}) error {
+       if r.Error != nil {
+               return r.Error
+       }
+       if len(r.Result) == 0 {
+               return nil
+       }
+       return json.Unmarshal(r.Result, v)
+}
diff --git a/transport/transport.go b/transport/transport.go
new file mode 100644
index 00000000..39434fa0
--- /dev/null
+++ b/transport/transport.go
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+       "seata-go-ai-transport/common"
+       _ "seata-go-ai-transport/grpc"
+       _ "seata-go-ai-transport/jsonrpc"
+)
+
+// Re-export common types for convenience
+type (
+       Protocol       = common.Protocol
+       StreamType     = common.StreamType
+       Message        = common.Message
+       Response       = common.Response
+       StreamResponse = common.StreamResponse
+       Client         = common.Client
+       Server         = common.Server
+       Handler        = common.Handler
+       StreamHandler  = common.StreamHandler
+       StreamReader   = common.StreamReader
+       StreamWriter   = common.StreamWriter
+       Config         = common.Config
+       ClientBuilder  = common.ClientBuilder
+       ServerBuilder  = common.ServerBuilder
+)
+
+// Re-export protocol constants
+const (
+       ProtocolGRPC    = common.ProtocolGRPC
+       ProtocolJSONRPC = common.ProtocolJSONRPC
+)
+
+// Re-export stream type constants
+const (
+       StreamTypeUnary        = common.StreamTypeUnary
+       StreamTypeServerStream = common.StreamTypeServerStream
+       StreamTypeClientStream = common.StreamTypeClientStream
+       StreamTypeBiDirStream  = common.StreamTypeBiDirStream
+)
+
+// Re-export registry functions
+var (
+       RegisterClientBuilder = common.RegisterClientBuilder
+       RegisterServerBuilder = common.RegisterServerBuilder
+       CreateClient          = common.CreateClient
+       CreateServer          = common.CreateServer
+       GetSupportedProtocols = common.GetSupportedProtocols
+)
+
+// Re-export utility functions
+var (
+       MarshalMessage            = common.MarshalMessage
+       UnmarshalResponse         = common.UnmarshalResponse
+       UnmarshalStreamResponse   = common.UnmarshalStreamResponse
+       CreateResponse            = common.CreateResponse
+       CreateStreamResponse      = common.CreateStreamResponse
+       CreateErrorResponse       = common.CreateErrorResponse
+       CreateErrorStreamResponse = common.CreateErrorStreamResponse
+)
+
+// Re-export common errors
+var (
+       ErrProtocolNotSupported = common.ErrProtocolNotSupported
+       ErrClientClosed         = common.ErrClientClosed
+       ErrServerClosed         = common.ErrServerClosed
+       ErrStreamClosed         = common.ErrStreamClosed
+       ErrInvalidConfig        = common.ErrInvalidConfig
+       ErrHandlerNotFound      = common.ErrHandlerNotFound
+       ErrTimeout              = common.ErrTimeout
+       ErrConnectionFailed     = common.ErrConnectionFailed
+)
+
+// Re-export error functions
+var (
+       NewTransportError = common.NewTransportError
+       IsTransportError  = common.IsTransportError
+       GetTransportError = common.GetTransportError
+)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to