This is an automated email from the ASF dual-hosted git repository.
zfeng pushed a commit to branch AI
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/AI by this push:
new 9c3c9d29 Ospp 2025 transport (#917)
9c3c9d29 is described below
commit 9c3c9d2962b23812739e53aee14ee834de5f46e1
Author: 深几许 <[email protected]>
AuthorDate: Thu Oct 30 00:06:40 2025 +0800
Ospp 2025 transport (#917)
* upd: clear branch
* feat: implement ospp_2025_transport
* feat: implement ospp_2025_a2a-protocol go.work
---------
Co-authored-by: FinnTew <[email protected]>
Co-authored-by: FinnTew <[email protected]>
---
go.work | 5 +-
go.work.sum | 46 ++++++
transport/README.md | 331 +++++++++++++++++++++++++++++++++++++++
transport/common/errors.go | 93 +++++++++++
transport/common/registry.go | 134 ++++++++++++++++
transport/common/types.go | 159 +++++++++++++++++++
transport/common/utils.go | 165 +++++++++++++++++++
transport/go.mod | 15 ++
transport/go.sum | 34 ++++
transport/grpc/builder.go | 187 ++++++++++++++++++++++
transport/grpc/client.go | 251 +++++++++++++++++++++++++++++
transport/grpc/grpc.go | 91 +++++++++++
transport/grpc/server.go | 257 ++++++++++++++++++++++++++++++
transport/grpc/stream_reader.go | 113 +++++++++++++
transport/grpc/stream_writer.go | 122 +++++++++++++++
transport/jsonrpc/builder.go | 155 ++++++++++++++++++
transport/jsonrpc/client.go | 208 ++++++++++++++++++++++++
transport/jsonrpc/jsonrpc.go | 74 +++++++++
transport/jsonrpc/server.go | 247 +++++++++++++++++++++++++++++
transport/jsonrpc/sse_builder.go | 185 ++++++++++++++++++++++
transport/jsonrpc/sse_client.go | 211 +++++++++++++++++++++++++
transport/jsonrpc/sse_server.go | 314 +++++++++++++++++++++++++++++++++++++
transport/jsonrpc/sse_types.go | 144 +++++++++++++++++
transport/jsonrpc/types.go | 155 ++++++++++++++++++
transport/transport.go | 95 +++++++++++
25 files changed, 3790 insertions(+), 1 deletion(-)
diff --git a/go.work b/go.work
index 5c403828..86dff4e2 100644
--- a/go.work
+++ b/go.work
@@ -1,3 +1,6 @@
go 1.24.5
-use ./agenthub
+use (
+ ./transport
+ ./agenthub
+)
\ No newline at end of file
diff --git a/go.work.sum b/go.work.sum
new file mode 100644
index 00000000..9fbb984b
--- /dev/null
+++ b/go.work.sum
@@ -0,0 +1,46 @@
+cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=
+cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
+cloud.google.com/go/compute/metadata v0.7.0
h1:PBWF+iiAerVNe8UCHxdOt6eHLVc3ydFeOCw78U8ytSU=
+cloud.google.com/go/compute/metadata v0.7.0/go.mod
h1:j5MvL9PprKL39t166CoB1uVHfQMs4tFQZZcKwksXUjo=
+github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp
v1.27.0 h1:ErKg/3iS1AKcTkf3yixlZ54f9U1rljCkQyEXWUnIUxc=
+github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp
v1.27.0/go.mod h1:yAZHSGnqScoU556rBOVkwLze6WP5N+U11RHuWaGVxwY=
+github.com/cespare/xxhash/v2 v2.3.0
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+github.com/cespare/xxhash/v2 v2.3.0/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443
h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls=
+github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod
h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
+github.com/envoyproxy/go-control-plane v0.13.4
h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M=
+github.com/envoyproxy/go-control-plane v0.13.4/go.mod
h1:kDfuBlDVsSj2MjrLEtRWtHlsWIFcGyB2RMO44Dc5GZA=
+github.com/envoyproxy/go-control-plane/envoy v1.32.4
h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A=
+github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod
h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw=
+github.com/envoyproxy/go-control-plane/ratelimit v0.1.0
h1:/G9QYbddjL25KvtKTv3an9lx6VBE2cnb8wp1vEGNYGI=
+github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod
h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4=
+github.com/envoyproxy/protoc-gen-validate v1.2.1
h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
+github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod
h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
+github.com/go-jose/go-jose/v4 v4.0.5
h1:M6T8+mKZl/+fNNuFHvGIzDz7BTLQPIounk/b9dw3AaE=
+github.com/go-jose/go-jose/v4 v4.0.5/go.mod
h1:s3P1lRrkT8igV8D9OjyL4WRyHvjB6a4JSllnOrmmBOA=
+github.com/golang/glog v1.2.5 h1:DrW6hGnjIhtvhOIiAKT6Psh/Kd/ldepEa81DKeiRJ5I=
+github.com/golang/glog v1.2.5/go.mod
h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
+github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10
h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
+github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod
h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
+github.com/spiffe/go-spiffe/v2 v2.5.0
h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=
+github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod
h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
+github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM=
+github.com/zeebo/errs v1.4.0/go.mod
h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
+go.opentelemetry.io/contrib/detectors/gcp v1.36.0
h1:F7q2tNlCaHY9nMKHR6XH9/qkp8FktLnIcy6jJNyOCQw=
+go.opentelemetry.io/contrib/detectors/gcp v1.36.0/go.mod
h1:IbBN8uAIIx734PTonTPxAxnjc2pQTxWNkwfstZ+6H2k=
+golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8=
+golang.org/x/crypto v0.38.0/go.mod
h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw=
+golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
+golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
+golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI=
+golang.org/x/oauth2 v0.30.0/go.mod
h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU=
+golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
+golang.org/x/sync v0.14.0/go.mod
h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
+golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg=
+golang.org/x/term v0.32.0/go.mod
h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ=
+golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d
h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
+golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod
h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/genproto/googleapis/api v0.0.0-20250528174236-200df99c418a
h1:SGktgSolFCo75dnHJF2yMvnns6jCmHFJ0vE4Vn2JKvQ=
+google.golang.org/genproto/googleapis/api
v0.0.0-20250528174236-200df99c418a/go.mod
h1:a77HrdMjoeKbnd2jmgcWdaS++ZLZAEq3orIOAEIKiVw=
diff --git a/transport/README.md b/transport/README.md
new file mode 100644
index 00000000..b7f6e17c
--- /dev/null
+++ b/transport/README.md
@@ -0,0 +1,331 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+-->
+
+# Seata Go AI Transport Layer Implementation
+
+This package provides a unified transport layer for both JSON-RPC and gRPC
protocols, supporting both streaming and non-streaming operations.
+
+## Features
+
+### Protocols Supported
+- **JSON-RPC 2.0**: HTTP-based with Server-Sent Events (SSE) for streaming
+- **gRPC**: Full support including server streaming RPCs
+
+### Core Capabilities
+- **Unified Interface**: Common abstractions for both protocols
+- **Streaming Support**:
+ - JSON-RPC: SSE-based streaming
+ - gRPC: Server streaming RPCs
+- **Registry Pattern**: Automatic protocol detection and builder registration
+- **Type Safety**: Proper interface contracts with compile-time checks
+- **Error Handling**: Protocol-specific error types with common interface
+- **Extensible**: Easy to add new transport protocols
+
+## Architecture
+
+### Package Structure
+```
+pkg/transport/
+├── common/ # Common interfaces and utilities
+│ ├── types.go # Core types and interfaces
+│ ├── registry.go # Protocol registry and factory
+│ ├── utils.go # Utility functions
+│ └── errors.go # Error definitions
+├── jsonrpc/ # JSON-RPC implementation
+│ ├── types.go # JSON-RPC specific types
+│ ├── client.go # HTTP client implementation
+│ ├── server.go # HTTP server implementation
+│ ├── sse_*.go # Server-Sent Events streaming
+│ ├── builder.go # Factory builders
+│ └── jsonrpc.go # Package exports and registration
+├── grpc/ # gRPC implementation
+│ ├── client.go # gRPC client wrapper
+│ ├── server.go # gRPC server wrapper
+│ ├── stream_*.go # Streaming implementations
+│ ├── builder.go # Factory builders
+│ └── grpc.go # Package exports and registration
+└── transport.go # Main package exports
+```
+
+### Core Interfaces
+
+#### Client Interface
+```go
+type Client interface {
+ Call(ctx context.Context, msg *Message) (*Response, error)
+ Stream(ctx context.Context, msg *Message) (StreamReader, error)
+ Protocol() Protocol
+ Close() error
+}
+```
+
+#### Server Interface
+```go
+type Server interface {
+ Serve() error
+ Stop(ctx context.Context) error
+ RegisterHandler(method string, handler Handler)
+ RegisterStreamHandler(method string, handler StreamHandler)
+ Protocol() Protocol
+}
+```
+
+## Usage Examples
+
+### Basic JSON-RPC Server
+```go
+import "seata-go-ai-transport"
+
+// Create server
+config := &transport.Config{
+ Protocol: transport.ProtocolJSONRPC,
+ Address: ":8080",
+ Options: map[string]interface{}{
+ "streaming": true, // Enable SSE
+ },
+}
+
+server, err := transport.CreateServer(config)
+if err != nil {
+ log.Fatal(err)
+}
+
+// Register handler
+server.RegisterHandler("echo", func(ctx context.Context, req
*transport.Message) (*transport.Response, error) {
+ return transport.CreateResponse(map[string]interface{}{
+ "echo": string(req.Payload),
+ })
+})
+
+// Start server
+go server.Serve()
+```
+
+### Basic JSON-RPC Client
+```go
+// Create client
+config := &transport.Config{
+ Protocol: transport.ProtocolJSONRPC,
+ Address: "http://localhost:8080",
+ Options: map[string]interface{}{
+ "streaming": true,
+ },
+}
+
+client, err := transport.CreateClient(config)
+if err != nil {
+ log.Fatal(err)
+}
+defer client.Close()
+
+// Make call
+msg, _ := transport.MarshalMessage(map[string]string{"hello": "world"})
+msg.Method = "echo"
+
+resp, err := client.Call(context.Background(), msg)
+if err != nil {
+ log.Fatal(err)
+}
+
+var result map[string]interface{}
+transport.UnmarshalResponse(resp, &result)
+fmt.Println(result)
+```
+
+### Streaming Example
+```go
+// Register streaming handler
+server.RegisterStreamHandler("stream", func(ctx context.Context, req
*transport.Message, stream transport.StreamWriter) error {
+ for i := 0; i < 5; i++ {
+ resp, _ := transport.CreateStreamResponse(map[string]int{"count": i},
i == 4)
+ if err := stream.Send(resp); err != nil {
+ return err
+ }
+ time.Sleep(time.Second)
+ }
+ return nil
+})
+
+// Client streaming call
+streamMsg, _ := transport.MarshalMessage(nil)
+streamMsg.Method = "stream"
+
+stream, err := client.Stream(context.Background(), streamMsg)
+if err != nil {
+ log.Fatal(err)
+}
+defer stream.Close()
+
+for {
+ resp, err := stream.Recv()
+ if err != nil {
+ break
+ }
+ if resp.Done {
+ break
+ }
+ fmt.Printf("Stream data: %s\n", string(resp.Data))
+}
+```
+
+### gRPC Usage
+```go
+// Create gRPC server
+config := &transport.Config{
+ Protocol: transport.ProtocolGRPC,
+ Address: ":9090",
+ Options: map[string]interface{}{
+ "insecure": true,
+ },
+}
+
+server, err := transport.CreateServer(config)
+if err != nil {
+ log.Fatal(err)
+}
+
+// Register methods (requires proto definitions)
+grpcServer := server.(*grpc.Server)
+grpcServer.RegisterMethod("Ping", grpc.CreateUnaryMethodInfo(
+ "/service/Ping",
+ &PingRequest{},
+ &PongResponse{},
+))
+
+// Start server
+go server.Serve()
+```
+
+### Direct Usage (Without Registry)
+```go
+import (
+ "seata-go-ai-transport/jsonrpc"
+ "seata-go-ai-transport/grpc"
+)
+
+// Direct JSON-RPC client
+jsonClient := jsonrpc.NewSSEJSONRPCClient("http://localhost:8080", "")
+defer jsonClient.Close()
+
+// Direct gRPC client
+grpcClient, err := grpc.NewGRPCClient("localhost:9090", true)
+if err != nil {
+ log.Fatal(err)
+}
+defer grpcClient.Close()
+```
+
+## Configuration Options
+
+### JSON-RPC Options
+```go
+Options: map[string]interface{}{
+ "endpoint": "http://localhost:8080", // Server endpoint
+ "stream_endpoint": "http://localhost:8080/stream", // SSE endpoint
+ "timeout": "30s", // Request timeout
+ "headers": map[string]string{ // Custom headers
+ "Authorization": "Bearer token",
+ },
+ "streaming": true, // Enable SSE streaming
+ "stream_path": "/stream", // SSE path for server
+ "read_timeout": "30s", // Server read timeout
+ "write_timeout": "30s", // Server write timeout
+ "idle_timeout": "120s", // Server idle timeout
+}
+```
+
+### gRPC Options
+```go
+Options: map[string]interface{}{
+ "target": "localhost:9090", // Server target
+ "insecure": true, // Use insecure connection
+ "timeout": "30s", // Connection timeout
+ "dial_options": []grpc.DialOption{...}, // Custom dial options
+ "server_options": []grpc.ServerOption{...}, // Custom server options
+ "methods": map[string]grpc.MethodInfo{ // Method definitions
+ "Ping": grpc.CreateUnaryMethodInfo(...),
+ },
+}
+```
+
+## Error Handling
+
+The transport layer provides structured error handling:
+
+```go
+// Check if error is transport-specific
+if transport.IsTransportError(err) {
+ if transportErr, ok := transport.GetTransportError(err); ok {
+ fmt.Printf("Protocol: %s, Code: %d, Message: %s\n",
+ transportErr.Protocol, transportErr.Code, transportErr.Message)
+ }
+}
+
+// JSON-RPC specific errors
+if jsonrpc.IsJSONRPCError(err) {
+ if jsonErr, ok := jsonrpc.GetJSONRPCError(err); ok {
+ fmt.Printf("JSON-RPC Error: %d - %s\n", jsonErr.Code, jsonErr.Message)
+ }
+}
+```
+
+## Implementation Details
+
+### JSON-RPC Features
+- **HTTP/1.1 Transport**: Standard HTTP POST requests
+- **SSE Streaming**: Server-Sent Events for real-time streaming
+- **JSON-RPC 2.0 Compliance**: Full specification compliance
+- **Error Propagation**: Standard JSON-RPC error codes
+- **Content Negotiation**: Proper HTTP headers and content types
+
+### gRPC Features
+- **Server Streaming**: Full server streaming RPC support
+- **Protocol Buffers**: Integration with existing protobuf definitions
+- **Connection Management**: Proper connection lifecycle
+- **Metadata Support**: gRPC metadata as transport headers
+- **Graceful Shutdown**: Clean server shutdown with context cancellation
+
+### Streaming Implementations
+- **JSON-RPC**: Uses SSE (Server-Sent Events) over HTTP
+- **gRPC**: Uses native gRPC server streaming RPCs
+- **Common Interface**: Both implement the same `StreamReader`/`StreamWriter`
interfaces
+- **Error Handling**: Stream-specific error propagation
+
+## Thread Safety
+
+All implementations are thread-safe:
+- **Clients**: Safe for concurrent use
+- **Servers**: Handle multiple concurrent connections
+- **Streams**: Individual streams are not thread-safe (by design)
+- **Registry**: Thread-safe protocol registration and lookup
+
+## Extension Points
+
+To add a new transport protocol:
+
+1. Implement the `Client` and `Server` interfaces
+2. Create `ClientBuilder` and `ServerBuilder` implementations
+3. Register builders in package `init()` function
+4. Add protocol constant to `common/types.go`
+
+Example:
+```go
+func init() {
+ common.RegisterClientBuilder(common.ProtocolWebSocket, &WSClientBuilder{})
+ common.RegisterServerBuilder(common.ProtocolWebSocket, &WSServerBuilder{})
+}
+```
\ No newline at end of file
diff --git a/transport/common/errors.go b/transport/common/errors.go
new file mode 100644
index 00000000..4a2c9889
--- /dev/null
+++ b/transport/common/errors.go
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "errors"
+ "fmt"
+)
+
+var (
+ // ErrProtocolNotSupported indicates the protocol is not supported
+ ErrProtocolNotSupported = errors.New("protocol not supported")
+
+ // ErrClientClosed indicates the client is already closed
+ ErrClientClosed = errors.New("client is closed")
+
+ // ErrServerClosed indicates the server is already closed
+ ErrServerClosed = errors.New("server is closed")
+
+ // ErrStreamClosed indicates the stream is already closed
+ ErrStreamClosed = errors.New("stream is closed")
+
+ // ErrInvalidConfig indicates invalid configuration
+ ErrInvalidConfig = errors.New("invalid configuration")
+
+ // ErrHandlerNotFound indicates handler not found for method
+ ErrHandlerNotFound = errors.New("handler not found")
+
+ // ErrTimeout indicates operation timeout
+ ErrTimeout = errors.New("operation timeout")
+
+ // ErrConnectionFailed indicates connection failed
+ ErrConnectionFailed = errors.New("connection failed")
+)
+
+// TransportError represents a transport-specific error
+type TransportError struct {
+ Protocol Protocol
+ Code int
+ Message string
+ Cause error
+}
+
+// Error implements the error interface
+func (e *TransportError) Error() string {
+ if e.Cause != nil {
+ return fmt.Sprintf("%s transport error [%d]: %s: %v",
e.Protocol, e.Code, e.Message, e.Cause)
+ }
+ return fmt.Sprintf("%s transport error [%d]: %s", e.Protocol, e.Code,
e.Message)
+}
+
+// Unwrap returns the cause error
+func (e *TransportError) Unwrap() error {
+ return e.Cause
+}
+
+// NewTransportError creates a new transport error
+func NewTransportError(protocol Protocol, code int, message string, cause
error) *TransportError {
+ return &TransportError{
+ Protocol: protocol,
+ Code: code,
+ Message: message,
+ Cause: cause,
+ }
+}
+
+// IsTransportError checks if an error is a TransportError
+func IsTransportError(err error) bool {
+ var te *TransportError
+ return errors.As(err, &te)
+}
+
+// GetTransportError extracts TransportError from error
+func GetTransportError(err error) (*TransportError, bool) {
+ var te *TransportError
+ ok := errors.As(err, &te)
+ return te, ok
+}
diff --git a/transport/common/registry.go b/transport/common/registry.go
new file mode 100644
index 00000000..86f64a72
--- /dev/null
+++ b/transport/common/registry.go
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "fmt"
+ "sync"
+)
+
+// Registry manages transport builders
+type Registry struct {
+ mu sync.RWMutex
+ clientBuilders map[Protocol]ClientBuilder
+ serverBuilders map[Protocol]ServerBuilder
+}
+
+// NewRegistry creates a new transport registry
+func NewRegistry() *Registry {
+ return &Registry{
+ clientBuilders: make(map[Protocol]ClientBuilder),
+ serverBuilders: make(map[Protocol]ServerBuilder),
+ }
+}
+
+// RegisterClientBuilder registers a client builder for a protocol
+func (r *Registry) RegisterClientBuilder(protocol Protocol, builder
ClientBuilder) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.clientBuilders[protocol] = builder
+}
+
+// RegisterServerBuilder registers a server builder for a protocol
+func (r *Registry) RegisterServerBuilder(protocol Protocol, builder
ServerBuilder) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.serverBuilders[protocol] = builder
+}
+
+// GetClientBuilder gets a client builder for a protocol
+func (r *Registry) GetClientBuilder(protocol Protocol) (ClientBuilder, error) {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
+ builder, exists := r.clientBuilders[protocol]
+ if !exists {
+ return nil, fmt.Errorf("no client builder registered for
protocol %s", protocol)
+ }
+ return builder, nil
+}
+
+// GetServerBuilder gets a server builder for a protocol
+func (r *Registry) GetServerBuilder(protocol Protocol) (ServerBuilder, error) {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
+ builder, exists := r.serverBuilders[protocol]
+ if !exists {
+ return nil, fmt.Errorf("no server builder registered for
protocol %s", protocol)
+ }
+ return builder, nil
+}
+
+// CreateClient creates a client using the registered builder
+func (r *Registry) CreateClient(config *Config) (Client, error) {
+ builder, err := r.GetClientBuilder(config.Protocol)
+ if err != nil {
+ return nil, err
+ }
+ return builder.Build(config)
+}
+
+// CreateServer creates a server using the registered builder
+func (r *Registry) CreateServer(config *Config) (Server, error) {
+ builder, err := r.GetServerBuilder(config.Protocol)
+ if err != nil {
+ return nil, err
+ }
+ return builder.Build(config)
+}
+
+// SupportedProtocols returns all supported protocols
+func (r *Registry) SupportedProtocols() []Protocol {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
+ protocols := make([]Protocol, 0, len(r.clientBuilders))
+ for protocol := range r.clientBuilders {
+ protocols = append(protocols, protocol)
+ }
+ return protocols
+}
+
+// Default registry instance
+var defaultRegistry = NewRegistry()
+
+// RegisterClientBuilder registers a client builder in the default registry
+func RegisterClientBuilder(protocol Protocol, builder ClientBuilder) {
+ defaultRegistry.RegisterClientBuilder(protocol, builder)
+}
+
+// RegisterServerBuilder registers a server builder in the default registry
+func RegisterServerBuilder(protocol Protocol, builder ServerBuilder) {
+ defaultRegistry.RegisterServerBuilder(protocol, builder)
+}
+
+// CreateClient creates a client using the default registry
+func CreateClient(config *Config) (Client, error) {
+ return defaultRegistry.CreateClient(config)
+}
+
+// CreateServer creates a server using the default registry
+func CreateServer(config *Config) (Server, error) {
+ return defaultRegistry.CreateServer(config)
+}
+
+// GetSupportedProtocols returns all supported protocols from default registry
+func GetSupportedProtocols() []Protocol {
+ return defaultRegistry.SupportedProtocols()
+}
diff --git a/transport/common/types.go b/transport/common/types.go
new file mode 100644
index 00000000..8932420e
--- /dev/null
+++ b/transport/common/types.go
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "context"
+ "io"
+)
+
+// Protocol represents the transport protocol type
+type Protocol int
+
+const (
+ ProtocolGRPC Protocol = iota
+ ProtocolJSONRPC
+)
+
+func (p Protocol) String() string {
+ switch p {
+ case ProtocolGRPC:
+ return "grpc"
+ case ProtocolJSONRPC:
+ return "jsonrpc"
+ default:
+ return "unknown"
+ }
+}
+
+// StreamType represents the streaming capability
+type StreamType int
+
+const (
+ StreamTypeUnary StreamType = iota
+ StreamTypeServerStream
+ StreamTypeClientStream
+ StreamTypeBiDirStream
+)
+
+// Message represents a generic transport message
+type Message struct {
+ Method string
+ Payload []byte
+ Headers map[string]string
+}
+
+// Response represents a generic transport response
+type Response struct {
+ Data []byte
+ Headers map[string]string
+ Error error
+}
+
+// StreamResponse represents a streaming response
+type StreamResponse struct {
+ Data []byte
+ Headers map[string]string
+ Done bool
+ Error error
+}
+
+// Client represents a transport client interface
+type Client interface {
+ // Call makes a unary RPC call
+ Call(ctx context.Context, msg *Message) (*Response, error)
+
+ // Stream makes a streaming RPC call
+ Stream(ctx context.Context, msg *Message) (StreamReader, error)
+
+ // Protocol returns the underlying protocol
+ Protocol() Protocol
+
+ // Close closes the client connection
+ Close() error
+}
+
+// Server represents a transport server interface
+type Server interface {
+ // Serve starts the server
+ Serve() error
+
+ // Stop stops the server gracefully
+ Stop(ctx context.Context) error
+
+ // RegisterHandler registers a handler for a method
+ RegisterHandler(method string, handler Handler)
+
+ // RegisterStreamHandler registers a streaming handler
+ RegisterStreamHandler(method string, handler StreamHandler)
+
+ // Protocol returns the underlying protocol
+ Protocol() Protocol
+}
+
+// Handler represents a unary RPC handler
+type Handler func(ctx context.Context, req *Message) (*Response, error)
+
+// StreamHandler represents a streaming RPC handler
+type StreamHandler func(ctx context.Context, req *Message, stream
StreamWriter) error
+
+// StreamReader provides methods to read from a stream
+type StreamReader interface {
+ // Recv receives the next message from the stream
+ Recv() (*StreamResponse, error)
+
+ // Close closes the stream
+ Close() error
+}
+
+// StreamWriter provides methods to write to a stream
+type StreamWriter interface {
+ // Send sends a message to the stream
+ Send(resp *StreamResponse) error
+
+ // Close closes the stream
+ Close() error
+}
+
+// Config represents transport configuration
+type Config struct {
+ Protocol Protocol `json:"protocol"`
+ Address string `json:"address"`
+ Options map[string]interface{} `json:"options,omitempty"`
+}
+
+// ClientBuilder builds transport clients
+type ClientBuilder interface {
+ // Build creates a new client with the given config
+ Build(config *Config) (Client, error)
+
+ // Protocol returns the protocol this builder supports
+ Protocol() Protocol
+}
+
+// ServerBuilder builds transport servers
+type ServerBuilder interface {
+ // Build creates a new server with the given config
+ Build(config *Config) (Server, error)
+
+ // Protocol returns the protocol this builder supports
+ Protocol() Protocol
+}
+
+var _ io.Closer = (StreamReader)(nil)
+var _ io.Closer = (StreamWriter)(nil)
diff --git a/transport/common/utils.go b/transport/common/utils.go
new file mode 100644
index 00000000..a73dc9a2
--- /dev/null
+++ b/transport/common/utils.go
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "encoding/json"
+ "fmt"
+)
+
+// MarshalMessage marshals a message to JSON
+func MarshalMessage(v interface{}) (*Message, error) {
+ data, err := json.Marshal(v)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal message: %w", err)
+ }
+
+ return &Message{
+ Payload: data,
+ Headers: make(map[string]string),
+ }, nil
+}
+
+// UnmarshalResponse unmarshals a response from JSON
+func UnmarshalResponse(resp *Response, v interface{}) error {
+ if resp.Error != nil {
+ return resp.Error
+ }
+
+ if err := json.Unmarshal(resp.Data, v); err != nil {
+ return fmt.Errorf("failed to unmarshal response: %w", err)
+ }
+
+ return nil
+}
+
+// UnmarshalStreamResponse unmarshals a stream response from JSON
+func UnmarshalStreamResponse(resp *StreamResponse, v interface{}) error {
+ if resp.Error != nil {
+ return resp.Error
+ }
+
+ if resp.Done {
+ return nil
+ }
+
+ if err := json.Unmarshal(resp.Data, v); err != nil {
+ return fmt.Errorf("failed to unmarshal stream response: %w",
err)
+ }
+
+ return nil
+}
+
+// CreateResponse creates a response with JSON marshaled data
+func CreateResponse(v interface{}) (*Response, error) {
+ data, err := json.Marshal(v)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal response: %w", err)
+ }
+
+ return &Response{
+ Data: data,
+ Headers: make(map[string]string),
+ }, nil
+}
+
+// CreateStreamResponse creates a stream response with JSON marshaled data
+func CreateStreamResponse(v interface{}, done bool) (*StreamResponse, error) {
+ var data []byte
+ var err error
+
+ if v != nil && !done {
+ data, err = json.Marshal(v)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal stream
response: %w", err)
+ }
+ }
+
+ return &StreamResponse{
+ Data: data,
+ Headers: make(map[string]string),
+ Done: done,
+ }, nil
+}
+
+// CreateErrorResponse creates an error response
+func CreateErrorResponse(err error) *Response {
+ return &Response{
+ Data: nil,
+ Headers: make(map[string]string),
+ Error: err,
+ }
+}
+
+// CreateErrorStreamResponse creates an error stream response
+func CreateErrorStreamResponse(err error) *StreamResponse {
+ return &StreamResponse{
+ Data: nil,
+ Headers: make(map[string]string),
+ Done: true,
+ Error: err,
+ }
+}
+
+// SetHeader sets a header in message
+func (m *Message) SetHeader(key, value string) {
+ if m.Headers == nil {
+ m.Headers = make(map[string]string)
+ }
+ m.Headers[key] = value
+}
+
+// GetHeader gets a header from message
+func (m *Message) GetHeader(key string) string {
+ if m.Headers == nil {
+ return ""
+ }
+ return m.Headers[key]
+}
+
+// SetHeader sets a header in response
+func (r *Response) SetHeader(key, value string) {
+ if r.Headers == nil {
+ r.Headers = make(map[string]string)
+ }
+ r.Headers[key] = value
+}
+
+// GetHeader gets a header from response
+func (r *Response) GetHeader(key string) string {
+ if r.Headers == nil {
+ return ""
+ }
+ return r.Headers[key]
+}
+
+// SetHeader sets a header in stream response
+func (sr *StreamResponse) SetHeader(key, value string) {
+ if sr.Headers == nil {
+ sr.Headers = make(map[string]string)
+ }
+ sr.Headers[key] = value
+}
+
+// GetHeader gets a header from stream response
+func (sr *StreamResponse) GetHeader(key string) string {
+ if sr.Headers == nil {
+ return ""
+ }
+ return sr.Headers[key]
+}
diff --git a/transport/go.mod b/transport/go.mod
new file mode 100644
index 00000000..a3b0eeb1
--- /dev/null
+++ b/transport/go.mod
@@ -0,0 +1,15 @@
+module seata-go-ai-transport
+
+go 1.24.5
+
+require (
+ google.golang.org/grpc v1.74.2
+ google.golang.org/protobuf v1.36.6
+)
+
+require (
+ golang.org/x/net v0.40.0 // indirect
+ golang.org/x/sys v0.33.0 // indirect
+ golang.org/x/text v0.25.0 // indirect
+ google.golang.org/genproto/googleapis/rpc
v0.0.0-20250528174236-200df99c418a // indirect
+)
diff --git a/transport/go.sum b/transport/go.sum
new file mode 100644
index 00000000..e5226a87
--- /dev/null
+++ b/transport/go.sum
@@ -0,0 +1,34 @@
+github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
+github.com/go-logr/logr v1.4.3/go.mod
h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
+github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
+github.com/go-logr/stdr v1.2.2/go.mod
h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/golang/protobuf v1.5.4
h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
+github.com/golang/protobuf v1.5.4/go.mod
h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
+github.com/google/go-cmp v0.7.0/go.mod
h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+go.opentelemetry.io/auto/sdk v1.1.0
h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
+go.opentelemetry.io/auto/sdk v1.1.0/go.mod
h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
+go.opentelemetry.io/otel v1.36.0
h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
+go.opentelemetry.io/otel v1.36.0/go.mod
h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
+go.opentelemetry.io/otel/metric v1.36.0
h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE=
+go.opentelemetry.io/otel/metric v1.36.0/go.mod
h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs=
+go.opentelemetry.io/otel/sdk v1.36.0
h1:b6SYIuLRs88ztox4EyrvRti80uXIFy+Sqzoh9kFULbs=
+go.opentelemetry.io/otel/sdk v1.36.0/go.mod
h1:+lC+mTgD+MUWfjJubi2vvXWcVxyr9rmlshZni72pXeY=
+go.opentelemetry.io/otel/sdk/metric v1.36.0
h1:r0ntwwGosWGaa0CrSt8cuNuTcccMXERFwHX4dThiPis=
+go.opentelemetry.io/otel/sdk/metric v1.36.0/go.mod
h1:qTNOhFDfKRwX0yXOqJYegL5WRaW376QbB7P4Pb0qva4=
+go.opentelemetry.io/otel/trace v1.36.0
h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w=
+go.opentelemetry.io/otel/trace v1.36.0/go.mod
h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA=
+golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY=
+golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
+golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
+golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
+golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4=
+golang.org/x/text v0.25.0/go.mod
h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a
h1:v2PbRU4K3llS09c7zodFpNePeamkAwG3mPrAery9VeE=
+google.golang.org/genproto/googleapis/rpc
v0.0.0-20250528174236-200df99c418a/go.mod
h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
+google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4=
+google.golang.org/grpc v1.74.2/go.mod
h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM=
+google.golang.org/protobuf v1.36.6
h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
+google.golang.org/protobuf v1.36.6/go.mod
h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
diff --git a/transport/grpc/builder.go b/transport/grpc/builder.go
new file mode 100644
index 00000000..bde23031
--- /dev/null
+++ b/transport/grpc/builder.go
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ "seata-go-ai-transport/common"
+)
+
+// ClientBuilder implements common.ClientBuilder for gRPC
+type ClientBuilder struct{}
+
+var _ common.ClientBuilder = (*ClientBuilder)(nil)
+
+// Build creates a new gRPC client with the given config
+func (b *ClientBuilder) Build(config *common.Config) (common.Client, error) {
+ if config.Protocol != common.ProtocolGRPC {
+ return nil, fmt.Errorf("invalid protocol %s for gRPC client",
config.Protocol)
+ }
+
+ clientConfig := &ClientConfig{
+ Target: config.Address,
+ Insecure: true, // Default to insecure for simplicity
+ Timeout: 30 * time.Second,
+ Methods: make(map[string]MethodInfo),
+ }
+
+ var dialOpts []grpc.DialOption
+
+ // Parse options
+ if config.Options != nil {
+ if target, ok := config.Options["target"].(string); ok {
+ clientConfig.Target = target
+ }
+ if ins, ok := config.Options["insecure"].(bool); ok {
+ clientConfig.Insecure = ins
+ }
+ if timeoutStr, ok := config.Options["timeout"].(string); ok {
+ if timeout, err := time.ParseDuration(timeoutStr); err
== nil {
+ clientConfig.Timeout = timeout
+ }
+ }
+ if timeout, ok := config.Options["timeout"].(time.Duration); ok
{
+ clientConfig.Timeout = timeout
+ }
+ if options, ok :=
config.Options["dial_options"].([]grpc.DialOption); ok {
+ dialOpts = append(dialOpts, options...)
+ }
+ if methods, ok :=
config.Options["methods"].(map[string]MethodInfo); ok {
+ clientConfig.Methods = methods
+ }
+ }
+
+ // Use address as target if not specified in options
+ if clientConfig.Target == "" {
+ clientConfig.Target = config.Address
+ }
+
+ clientConfig.Options = dialOpts
+
+ return NewClient(clientConfig)
+}
+
+// Protocol returns the protocol this builder supports
+func (b *ClientBuilder) Protocol() common.Protocol {
+ return common.ProtocolGRPC
+}
+
+// ServerBuilder implements common.ServerBuilder for gRPC
+type ServerBuilder struct{}
+
+var _ common.ServerBuilder = (*ServerBuilder)(nil)
+
+// Build creates a new gRPC server with the given config
+func (b *ServerBuilder) Build(config *common.Config) (common.Server, error) {
+ if config.Protocol != common.ProtocolGRPC {
+ return nil, fmt.Errorf("invalid protocol %s for gRPC server",
config.Protocol)
+ }
+
+ serverConfig := &ServerConfig{
+ Address: config.Address,
+ Methods: make(map[string]MethodInfo),
+ }
+
+ var serverOpts []grpc.ServerOption
+
+ // Parse options
+ if config.Options != nil {
+ if options, ok :=
config.Options["server_options"].([]grpc.ServerOption); ok {
+ serverOpts = append(serverOpts, options...)
+ }
+ if methods, ok :=
config.Options["methods"].(map[string]MethodInfo); ok {
+ serverConfig.Methods = methods
+ }
+ }
+
+ serverConfig.Options = serverOpts
+
+ return NewServer(serverConfig)
+}
+
+// Protocol returns the protocol this builder supports
+func (b *ServerBuilder) Protocol() common.Protocol {
+ return common.ProtocolGRPC
+}
+
+// Helper functions for creating configurations
+
+// NewGRPCClientConfig creates a client config with common options
+func NewGRPCClientConfig(target string, isInsecure bool) *ClientConfig {
+ config := &ClientConfig{
+ Target: target,
+ Insecure: isInsecure,
+ Timeout: 30 * time.Second,
+ Methods: make(map[string]MethodInfo),
+ Options: []grpc.DialOption{},
+ }
+
+ if isInsecure {
+ config.Options = append(config.Options,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ }
+
+ return config
+}
+
+// NewGRPCServerConfig creates a server config with common options
+func NewGRPCServerConfig(address string) *ServerConfig {
+ return &ServerConfig{
+ Address: address,
+ Methods: make(map[string]MethodInfo),
+ Options: []grpc.ServerOption{},
+ }
+}
+
+// WithDialOptions adds dial options to client config
+func (c *ClientConfig) WithDialOptions(opts ...grpc.DialOption) *ClientConfig {
+ c.Options = append(c.Options, opts...)
+ return c
+}
+
+// WithServerOptions adds server options to server config
+func (c *ServerConfig) WithServerOptions(opts ...grpc.ServerOption)
*ServerConfig {
+ c.Options = append(c.Options, opts...)
+ return c
+}
+
+// WithMethods adds method information to config
+func (c *ClientConfig) WithMethods(methods map[string]MethodInfo)
*ClientConfig {
+ if c.Methods == nil {
+ c.Methods = make(map[string]MethodInfo)
+ }
+ for k, v := range methods {
+ c.Methods[k] = v
+ }
+ return c
+}
+
+// WithMethods adds method information to server config
+func (c *ServerConfig) WithMethods(methods map[string]MethodInfo)
*ServerConfig {
+ if c.Methods == nil {
+ c.Methods = make(map[string]MethodInfo)
+ }
+ for k, v := range methods {
+ c.Methods[k] = v
+ }
+ return c
+}
diff --git a/transport/grpc/client.go b/transport/grpc/client.go
new file mode 100644
index 00000000..82587e98
--- /dev/null
+++ b/transport/grpc/client.go
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/protobuf/proto"
+
+ "seata-go-ai-transport/common"
+)
+
+// Client implements the common.Client interface for gRPC
+type Client struct {
+ conn *grpc.ClientConn
+ client interface{} // Generic gRPC client
+ closed int32
+ mu sync.RWMutex
+ methods map[string]MethodInfo
+}
+
+// MethodInfo contains information about a gRPC method
+type MethodInfo struct {
+ FullName string
+ IsStreaming bool
+ InputType proto.Message
+ OutputType proto.Message
+}
+
+// ClientConfig represents gRPC client configuration
+type ClientConfig struct {
+ Target string `json:"target"`
+ Insecure bool `json:"insecure"`
+ Timeout time.Duration `json:"timeout"`
+ Options []grpc.DialOption `json:"-"`
+ Methods map[string]MethodInfo `json:"-"`
+}
+
+var _ common.Client = (*Client)(nil)
+
+// NewClient creates a new gRPC client
+func NewClient(config *ClientConfig) (*Client, error) {
+ opts := config.Options
+ if opts == nil {
+ opts = []grpc.DialOption{}
+ }
+
+ if config.Insecure {
+ opts = append(opts,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ }
+
+ if config.Timeout > 0 {
+ opts = append(opts, grpc.WithTimeout(config.Timeout))
+ }
+
+ conn, err := grpc.NewClient(config.Target, opts...)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create gRPC connection: %w",
err)
+ }
+
+ client := &Client{
+ conn: conn,
+ methods: make(map[string]MethodInfo),
+ }
+
+ if config.Methods != nil {
+ for method, info := range config.Methods {
+ client.methods[method] = info
+ }
+ }
+
+ return client, nil
+}
+
+// Call makes a unary RPC call
+func (c *Client) Call(ctx context.Context, msg *common.Message)
(*common.Response, error) {
+ if atomic.LoadInt32(&c.closed) == 1 {
+ return nil, common.ErrClientClosed
+ }
+
+ // Get method info
+ c.mu.RLock()
+ methodInfo, exists := c.methods[msg.Method]
+ c.mu.RUnlock()
+
+ if !exists {
+ return nil, fmt.Errorf("method %s not registered", msg.Method)
+ }
+
+ if methodInfo.IsStreaming {
+ return nil, fmt.Errorf("method %s is a streaming method, use
Stream() instead", msg.Method)
+ }
+
+ // Create input message
+ input := proto.Clone(methodInfo.InputType)
+ if len(msg.Payload) > 0 {
+ if err := json.Unmarshal(msg.Payload, input); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal request:
%w", err)
+ }
+ }
+
+ // Create output message
+ output := proto.Clone(methodInfo.OutputType)
+
+ // Make the call
+ err := c.conn.Invoke(ctx, methodInfo.FullName, input, output)
+ if err != nil {
+ return &common.Response{
+ Headers: make(map[string]string),
+ Error: err,
+ }, nil
+ }
+
+ // Marshal response
+ respData, err := json.Marshal(output)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal response: %w", err)
+ }
+
+ return &common.Response{
+ Data: respData,
+ Headers: make(map[string]string),
+ }, nil
+}
+
+// Stream makes a streaming RPC call
+func (c *Client) Stream(ctx context.Context, msg *common.Message)
(common.StreamReader, error) {
+ if atomic.LoadInt32(&c.closed) == 1 {
+ return nil, common.ErrClientClosed
+ }
+
+ // Get method info
+ c.mu.RLock()
+ methodInfo, exists := c.methods[msg.Method]
+ c.mu.RUnlock()
+
+ if !exists {
+ return nil, fmt.Errorf("method %s not registered", msg.Method)
+ }
+
+ if !methodInfo.IsStreaming {
+ return nil, fmt.Errorf("method %s is not a streaming method,
use Call() instead", msg.Method)
+ }
+
+ // Create input message
+ input := proto.Clone(methodInfo.InputType)
+ if len(msg.Payload) > 0 {
+ if err := json.Unmarshal(msg.Payload, input); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal request:
%w", err)
+ }
+ }
+
+ // Create streaming description
+ streamDesc := &grpc.StreamDesc{
+ StreamName: extractMethodName(methodInfo.FullName),
+ ServerStreams: true,
+ ClientStreams: false,
+ }
+
+ // Start stream
+ stream, err := c.conn.NewStream(ctx, streamDesc, methodInfo.FullName)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create stream: %w", err)
+ }
+
+ // Send input message
+ if err := stream.SendMsg(input); err != nil {
+ _ = stream.CloseSend()
+ return nil, fmt.Errorf("failed to send message: %w", err)
+ }
+
+ // Close sending side
+ if err := stream.CloseSend(); err != nil {
+ return nil, fmt.Errorf("failed to close send: %w", err)
+ }
+
+ return NewGRPCStreamReader(ctx, stream, methodInfo.OutputType), nil
+}
+
+// Protocol returns the underlying protocol
+func (c *Client) Protocol() common.Protocol {
+ return common.ProtocolGRPC
+}
+
+// Close closes the client connection
+func (c *Client) Close() error {
+ if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
+ return common.ErrClientClosed
+ }
+
+ return c.conn.Close()
+}
+
+// RegisterMethod registers a method with the client
+func (c *Client) RegisterMethod(method string, info MethodInfo) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.methods[method] = info
+}
+
+// extractMethodName extracts the method name from full method name
+func extractMethodName(fullMethod string) string {
+ // Extract method name from full method path like
"/package.Service/Method"
+ for i := len(fullMethod) - 1; i >= 0; i-- {
+ if fullMethod[i] == '/' {
+ return fullMethod[i+1:]
+ }
+ }
+ return fullMethod
+}
+
+// SetClient sets the underlying gRPC client (for custom clients)
+func (c *Client) SetClient(client interface{}) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.client = client
+}
+
+// GetClient gets the underlying gRPC client
+func (c *Client) GetClient() interface{} {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.client
+}
+
+// GetConnection gets the underlying gRPC connection
+func (c *Client) GetConnection() *grpc.ClientConn {
+ return c.conn
+}
diff --git a/transport/grpc/grpc.go b/transport/grpc/grpc.go
new file mode 100644
index 00000000..015a5ac2
--- /dev/null
+++ b/transport/grpc/grpc.go
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+ "google.golang.org/protobuf/proto"
+
+ "seata-go-ai-transport/common"
+)
+
+// Package initialization - register builders with the common registry
+func init() {
+ common.RegisterClientBuilder(common.ProtocolGRPC, &ClientBuilder{})
+ common.RegisterServerBuilder(common.ProtocolGRPC, &ServerBuilder{})
+}
+
+// Public constructors for direct usage
+
+// NewGRPCClient creates a new gRPC client directly
+func NewGRPCClient(target string, isInsecure bool) (*Client, error) {
+ config := NewGRPCClientConfig(target, isInsecure)
+ return NewClient(config)
+}
+
+// NewGRPCServer creates a new gRPC server directly
+func NewGRPCServer(address string) (*Server, error) {
+ config := NewGRPCServerConfig(address)
+ return NewServer(config)
+}
+
+// Utility functions for method registration
+
+// CreateMethodInfo creates MethodInfo for a gRPC method
+func CreateMethodInfo(fullName string, isStreaming bool, inputType, outputType
proto.Message) MethodInfo {
+ return MethodInfo{
+ FullName: fullName,
+ IsStreaming: isStreaming,
+ InputType: inputType,
+ OutputType: outputType,
+ }
+}
+
+// CreateUnaryMethodInfo creates MethodInfo for a unary method
+func CreateUnaryMethodInfo(fullName string, inputType, outputType
proto.Message) MethodInfo {
+ return CreateMethodInfo(fullName, false, inputType, outputType)
+}
+
+// CreateStreamingMethodInfo creates MethodInfo for a streaming method
+func CreateStreamingMethodInfo(fullName string, inputType, outputType
proto.Message) MethodInfo {
+ return CreateMethodInfo(fullName, true, inputType, outputType)
+}
+
+// Helper functions for error handling
+
+// IsGRPCError checks if an error is a gRPC error
+func IsGRPCError(err error) bool {
+ // This can be extended to check for specific gRPC error types
+ if err == nil {
+ return false
+ }
+ // Check if it's a gRPC status error
+ _, ok := err.(interface {
+ GRPCStatus() interface{}
+ })
+ return ok
+}
+
+// CreateTransportError creates a transport error from gRPC error
+func CreateTransportError(err error) *common.TransportError {
+ return common.NewTransportError(
+ common.ProtocolGRPC,
+ -1, // gRPC errors don't have simple integer codes
+ err.Error(),
+ err,
+ )
+}
diff --git a/transport/grpc/server.go b/transport/grpc/server.go
new file mode 100644
index 00000000..275981c5
--- /dev/null
+++ b/transport/grpc/server.go
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net"
+ "sync"
+ "sync/atomic"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/protobuf/proto"
+
+ "seata-go-ai-transport/common"
+)
+
+// Server implements the common.Server interface for gRPC
+type Server struct {
+ server *grpc.Server
+ address string
+ listener net.Listener
+ handlers map[string]common.Handler
+ streamHandlers map[string]common.StreamHandler
+ methods map[string]MethodInfo
+ mu sync.RWMutex
+ started int32
+ closed int32
+}
+
+// ServerConfig represents gRPC server configuration
+type ServerConfig struct {
+ Address string `json:"address"`
+ Options []grpc.ServerOption `json:"-"`
+ Methods map[string]MethodInfo `json:"-"`
+}
+
+var _ common.Server = (*Server)(nil)
+
+// NewServer creates a new gRPC server
+func NewServer(config *ServerConfig) (*Server, error) {
+ opts := config.Options
+ if opts == nil {
+ opts = []grpc.ServerOption{}
+ }
+
+ server := &Server{
+ server: grpc.NewServer(opts...),
+ address: config.Address,
+ handlers: make(map[string]common.Handler),
+ streamHandlers: make(map[string]common.StreamHandler),
+ methods: make(map[string]MethodInfo),
+ }
+
+ if config.Methods != nil {
+ for method, info := range config.Methods {
+ server.methods[method] = info
+ }
+ }
+
+ return server, nil
+}
+
+// Serve starts the server
+func (s *Server) Serve() error {
+ if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
+ return fmt.Errorf("server already started")
+ }
+
+ listener, err := net.Listen("tcp", s.address)
+ if err != nil {
+ atomic.StoreInt32(&s.started, 0)
+ return fmt.Errorf("failed to listen on %s: %w", s.address, err)
+ }
+
+ s.listener = listener
+ return s.server.Serve(listener)
+}
+
+// Stop stops the server gracefully
+func (s *Server) Stop(ctx context.Context) error {
+ if !atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
+ return common.ErrServerClosed
+ }
+
+ // Create a channel to signal when graceful stop is complete
+ done := make(chan struct{})
+
+ go func() {
+ s.server.GracefulStop()
+ close(done)
+ }()
+
+ // Wait for graceful stop or context cancellation
+ select {
+ case <-done:
+ return nil
+ case <-ctx.Done():
+ // Force stop if context is cancelled
+ s.server.Stop()
+ return ctx.Err()
+ }
+}
+
+// RegisterHandler registers a handler for a method
+func (s *Server) RegisterHandler(method string, handler common.Handler) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.handlers[method] = handler
+}
+
+// RegisterStreamHandler registers a streaming handler
+func (s *Server) RegisterStreamHandler(method string, handler
common.StreamHandler) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.streamHandlers[method] = handler
+}
+
+// Protocol returns the underlying protocol
+func (s *Server) Protocol() common.Protocol {
+ return common.ProtocolGRPC
+}
+
+// RegisterMethod registers a method with the server
+func (s *Server) RegisterMethod(method string, info MethodInfo) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.methods[method] = info
+}
+
+// CreateUnaryHandler creates a gRPC unary handler from a common handler
+func (s *Server) CreateUnaryHandler(method string, handler common.Handler)
grpc.UnaryHandler {
+ return func(ctx context.Context, req interface{}) (interface{}, error) {
+ // Get method info
+ s.mu.RLock()
+ methodInfo, exists := s.methods[method]
+ s.mu.RUnlock()
+
+ if !exists {
+ return nil, fmt.Errorf("method %s not registered",
method)
+ }
+
+ // Convert proto message to JSON
+ reqProto, ok := req.(proto.Message)
+ if !ok {
+ return nil, fmt.Errorf("request is not a proto message")
+ }
+
+ reqData, err := json.Marshal(reqProto)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal request: %w",
err)
+ }
+
+ // Create common message
+ msg := &common.Message{
+ Method: method,
+ Payload: reqData,
+ Headers: extractMetadata(ctx),
+ }
+
+ // Call handler
+ resp, err := handler(ctx, msg)
+ if err != nil {
+ return nil, err
+ }
+
+ if resp.Error != nil {
+ return nil, resp.Error
+ }
+
+ // Create output message
+ output := proto.Clone(methodInfo.OutputType)
+ if len(resp.Data) > 0 {
+ if err := json.Unmarshal(resp.Data, output); err != nil
{
+ return nil, fmt.Errorf("failed to unmarshal
response: %w", err)
+ }
+ }
+
+ return output, nil
+ }
+}
+
+// CreateStreamHandler creates a gRPC stream handler from a common stream
handler
+func (s *Server) CreateStreamHandler(method string, handler
common.StreamHandler) grpc.StreamHandler {
+ return func(srv interface{}, stream grpc.ServerStream) error {
+ // Get method info
+ s.mu.RLock()
+ methodInfo, exists := s.methods[method]
+ s.mu.RUnlock()
+
+ if !exists {
+ return fmt.Errorf("method %s not registered", method)
+ }
+
+ // Receive request
+ input := proto.Clone(methodInfo.InputType)
+ if err := stream.RecvMsg(input); err != nil {
+ return fmt.Errorf("failed to receive message: %w", err)
+ }
+
+ // Convert to JSON
+ reqData, err := json.Marshal(input)
+ if err != nil {
+ return fmt.Errorf("failed to marshal request: %w", err)
+ }
+
+ // Create common message
+ msg := &common.Message{
+ Method: method,
+ Payload: reqData,
+ Headers: extractMetadata(stream.Context()),
+ }
+
+ // Create stream writer
+ streamWriter := NewGRPCStreamWriter(stream,
methodInfo.OutputType)
+
+ // Call handler
+ return handler(stream.Context(), msg, streamWriter)
+ }
+}
+
+// GetServer returns the underlying gRPC server
+func (s *Server) GetServer() *grpc.Server {
+ return s.server
+}
+
+// extractMetadata extracts gRPC metadata as headers
+func extractMetadata(ctx context.Context) map[string]string {
+ headers := make(map[string]string)
+
+ if md, ok := metadata.FromIncomingContext(ctx); ok {
+ for k, values := range md {
+ if len(values) > 0 {
+ headers[k] = values[0]
+ }
+ }
+ }
+
+ return headers
+}
diff --git a/transport/grpc/stream_reader.go b/transport/grpc/stream_reader.go
new file mode 100644
index 00000000..b1fb7ce1
--- /dev/null
+++ b/transport/grpc/stream_reader.go
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "sync/atomic"
+
+ "google.golang.org/grpc"
+ "google.golang.org/protobuf/proto"
+
+ "seata-go-ai-transport/common"
+)
+
+// GRPCStreamReader implements common.StreamReader for gRPC streams
+type GRPCStreamReader struct {
+ ctx context.Context
+ stream grpc.ClientStream
+ outputType proto.Message
+ closed int32
+}
+
+var _ common.StreamReader = (*GRPCStreamReader)(nil)
+
+// NewGRPCStreamReader creates a new gRPC stream reader
+func NewGRPCStreamReader(ctx context.Context, stream grpc.ClientStream,
outputType proto.Message) *GRPCStreamReader {
+ return &GRPCStreamReader{
+ ctx: ctx,
+ stream: stream,
+ outputType: outputType,
+ }
+}
+
+// Recv receives the next message from the stream
+func (r *GRPCStreamReader) Recv() (*common.StreamResponse, error) {
+ if atomic.LoadInt32(&r.closed) == 1 {
+ return nil, common.ErrStreamClosed
+ }
+
+ // Create output message instance
+ output := proto.Clone(r.outputType)
+
+ // Receive message from stream
+ err := r.stream.RecvMsg(output)
+ if err != nil {
+ if err == io.EOF {
+ return &common.StreamResponse{
+ Done: true,
+ }, nil
+ }
+ return &common.StreamResponse{
+ Error: err,
+ Done: true,
+ }, nil
+ }
+
+ // Marshal message to JSON
+ data, err := json.Marshal(output)
+ if err != nil {
+ return &common.StreamResponse{
+ Error: fmt.Errorf("failed to marshal response: %w",
err),
+ Done: true,
+ }, nil
+ }
+
+ return &common.StreamResponse{
+ Data: data,
+ Headers: r.extractHeaders(),
+ }, nil
+}
+
+// Close closes the stream
+func (r *GRPCStreamReader) Close() error {
+ if !atomic.CompareAndSwapInt32(&r.closed, 0, 1) {
+ return common.ErrStreamClosed
+ }
+
+ return r.stream.CloseSend()
+}
+
+// extractHeaders extracts headers from the gRPC stream
+func (r *GRPCStreamReader) extractHeaders() map[string]string {
+ headers := make(map[string]string)
+
+ // Extract metadata from stream
+ if md, err := r.stream.Header(); err == nil {
+ for k, values := range md {
+ if len(values) > 0 {
+ headers[k] = values[0]
+ }
+ }
+ }
+
+ return headers
+}
diff --git a/transport/grpc/stream_writer.go b/transport/grpc/stream_writer.go
new file mode 100644
index 00000000..f57c481e
--- /dev/null
+++ b/transport/grpc/stream_writer.go
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+ "encoding/json"
+ "fmt"
+ "sync"
+ "sync/atomic"
+
+ "google.golang.org/grpc"
+ "google.golang.org/protobuf/proto"
+
+ "seata-go-ai-transport/common"
+)
+
+// GRPCStreamWriter implements common.StreamWriter for gRPC streams
+type GRPCStreamWriter struct {
+ stream grpc.ServerStream
+ outputType proto.Message
+ closed int32
+ mu sync.Mutex
+}
+
+var _ common.StreamWriter = (*GRPCStreamWriter)(nil)
+
+// NewGRPCStreamWriter creates a new gRPC stream writer
+func NewGRPCStreamWriter(stream grpc.ServerStream, outputType proto.Message)
*GRPCStreamWriter {
+ return &GRPCStreamWriter{
+ stream: stream,
+ outputType: outputType,
+ }
+}
+
+// Send sends a message to the stream
+func (w *GRPCStreamWriter) Send(resp *common.StreamResponse) error {
+ if atomic.LoadInt32(&w.closed) == 1 {
+ return common.ErrStreamClosed
+ }
+
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ // Handle error response
+ if resp.Error != nil {
+ return resp.Error
+ }
+
+ // If marked as done with no data, don't send anything
+ if resp.Done && len(resp.Data) == 0 {
+ return nil
+ }
+
+ // Create output message
+ output := proto.Clone(w.outputType)
+
+ // Unmarshal response data if present
+ if len(resp.Data) > 0 {
+ if err := json.Unmarshal(resp.Data, output); err != nil {
+ return fmt.Errorf("failed to unmarshal response data:
%w", err)
+ }
+ }
+
+ // Send message
+ if err := w.stream.SendMsg(output); err != nil {
+ return fmt.Errorf("failed to send message: %w", err)
+ }
+
+ return nil
+}
+
+// Close closes the stream
+func (w *GRPCStreamWriter) Close() error {
+ if !atomic.CompareAndSwapInt32(&w.closed, 0, 1) {
+ return common.ErrStreamClosed
+ }
+
+ // gRPC server streams are automatically closed when the handler returns
+ // No explicit close action needed
+ return nil
+}
+
+// SendError sends an error through the stream
+func (w *GRPCStreamWriter) SendError(err error) error {
+ return w.Send(&common.StreamResponse{
+ Error: err,
+ Done: true,
+ })
+}
+
+// SendData sends data through the stream
+func (w *GRPCStreamWriter) SendData(data interface{}, done bool) error {
+ var jsonData []byte
+ var err error
+
+ if data != nil {
+ jsonData, err = json.Marshal(data)
+ if err != nil {
+ return fmt.Errorf("failed to marshal data: %w", err)
+ }
+ }
+
+ return w.Send(&common.StreamResponse{
+ Data: jsonData,
+ Done: done,
+ })
+}
diff --git a/transport/jsonrpc/builder.go b/transport/jsonrpc/builder.go
new file mode 100644
index 00000000..74807bd1
--- /dev/null
+++ b/transport/jsonrpc/builder.go
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "seata-go-ai-transport/common"
+)
+
+// ClientBuilder implements common.ClientBuilder for JSON-RPC
+type ClientBuilder struct{}
+
+var _ common.ClientBuilder = (*ClientBuilder)(nil)
+
+// Build creates a new JSON-RPC client with the given config
+func (b *ClientBuilder) Build(config *common.Config) (common.Client, error) {
+ if config.Protocol != common.ProtocolJSONRPC {
+ return nil, fmt.Errorf("invalid protocol %s for JSON-RPC
client", config.Protocol)
+ }
+
+ clientConfig := &ClientConfig{
+ Endpoint: config.Address,
+ Timeout: 30 * time.Second,
+ Headers: make(map[string]string),
+ }
+
+ // Parse options
+ if config.Options != nil {
+ if endpoint, ok := config.Options["endpoint"].(string); ok {
+ clientConfig.Endpoint = endpoint
+ }
+ if timeoutStr, ok := config.Options["timeout"].(string); ok {
+ if timeout, err := time.ParseDuration(timeoutStr); err
== nil {
+ clientConfig.Timeout = timeout
+ }
+ }
+ if timeout, ok := config.Options["timeout"].(time.Duration); ok
{
+ clientConfig.Timeout = timeout
+ }
+ if headers, ok :=
config.Options["headers"].(map[string]interface{}); ok {
+ for k, v := range headers {
+ if s, ok := v.(string); ok {
+ clientConfig.Headers[k] = s
+ }
+ }
+ }
+ if headers, ok :=
config.Options["headers"].(map[string]string); ok {
+ for k, v := range headers {
+ clientConfig.Headers[k] = v
+ }
+ }
+ }
+
+ // Use address as endpoint if not specified in options
+ if clientConfig.Endpoint == "" {
+ clientConfig.Endpoint = config.Address
+ }
+
+ return NewClient(clientConfig), nil
+}
+
+// Protocol returns the protocol this builder supports
+func (b *ClientBuilder) Protocol() common.Protocol {
+ return common.ProtocolJSONRPC
+}
+
+// ServerBuilder implements common.ServerBuilder for JSON-RPC
+type ServerBuilder struct{}
+
+var _ common.ServerBuilder = (*ServerBuilder)(nil)
+
+// Build creates a new JSON-RPC server with the given config
+func (b *ServerBuilder) Build(config *common.Config) (common.Server, error) {
+ if config.Protocol != common.ProtocolJSONRPC {
+ return nil, fmt.Errorf("invalid protocol %s for JSON-RPC
server", config.Protocol)
+ }
+
+ serverConfig := &ServerConfig{
+ Address: config.Address,
+ ReadTimeout: 30 * time.Second,
+ WriteTimeout: 30 * time.Second,
+ IdleTimeout: 120 * time.Second,
+ }
+
+ // Parse options
+ if config.Options != nil {
+ if readTimeoutStr, ok :=
config.Options["read_timeout"].(string); ok {
+ if timeout, err := time.ParseDuration(readTimeoutStr);
err == nil {
+ serverConfig.ReadTimeout = timeout
+ }
+ }
+ if readTimeout, ok :=
config.Options["read_timeout"].(time.Duration); ok {
+ serverConfig.ReadTimeout = readTimeout
+ }
+ if writeTimeoutStr, ok :=
config.Options["write_timeout"].(string); ok {
+ if timeout, err := time.ParseDuration(writeTimeoutStr);
err == nil {
+ serverConfig.WriteTimeout = timeout
+ }
+ }
+ if writeTimeout, ok :=
config.Options["write_timeout"].(time.Duration); ok {
+ serverConfig.WriteTimeout = writeTimeout
+ }
+ if idleTimeoutStr, ok :=
config.Options["idle_timeout"].(string); ok {
+ if timeout, err := time.ParseDuration(idleTimeoutStr);
err == nil {
+ serverConfig.IdleTimeout = timeout
+ }
+ }
+ if idleTimeout, ok :=
config.Options["idle_timeout"].(time.Duration); ok {
+ serverConfig.IdleTimeout = idleTimeout
+ }
+ }
+
+ return NewServer(serverConfig), nil
+}
+
+// Protocol returns the protocol this builder supports
+func (b *ServerBuilder) Protocol() common.Protocol {
+ return common.ProtocolJSONRPC
+}
+
+// NewClientConfig creates a client config from JSON
+func NewClientConfig(data []byte) (*ClientConfig, error) {
+ var config ClientConfig
+ if err := json.Unmarshal(data, &config); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal client config: %w",
err)
+ }
+ return &config, nil
+}
+
+// NewServerConfig creates a server config from JSON
+func NewServerConfig(data []byte) (*ServerConfig, error) {
+ var config ServerConfig
+ if err := json.Unmarshal(data, &config); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal server config: %w",
err)
+ }
+ return &config, nil
+}
diff --git a/transport/jsonrpc/client.go b/transport/jsonrpc/client.go
new file mode 100644
index 00000000..df1f7e16
--- /dev/null
+++ b/transport/jsonrpc/client.go
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "seata-go-ai-transport/common"
+)
+
+// Client implements the common.Client interface for JSON-RPC
+type Client struct {
+ endpoint string
+ httpClient *http.Client
+ headers map[string]string
+ idCounter int64
+ closed int32
+ mu sync.RWMutex
+}
+
+// ClientConfig represents JSON-RPC client configuration
+type ClientConfig struct {
+ Endpoint string `json:"endpoint"`
+ Timeout time.Duration `json:"timeout"`
+ Headers map[string]string `json:"headers"`
+}
+
+var _ common.Client = (*Client)(nil)
+
+// NewClient creates a new JSON-RPC client
+func NewClient(config *ClientConfig) *Client {
+ timeout := config.Timeout
+ if timeout == 0 {
+ timeout = 30 * time.Second
+ }
+
+ headers := make(map[string]string)
+ if config.Headers != nil {
+ for k, v := range config.Headers {
+ headers[k] = v
+ }
+ }
+
+ // Set default content type if not specified
+ if _, exists := headers["Content-Type"]; !exists {
+ headers["Content-Type"] = "application/json"
+ }
+
+ return &Client{
+ endpoint: config.Endpoint,
+ httpClient: &http.Client{
+ Timeout: timeout,
+ },
+ headers: headers,
+ }
+}
+
+// Call makes a unary RPC call
+func (c *Client) Call(ctx context.Context, msg *common.Message)
(*common.Response, error) {
+ if atomic.LoadInt32(&c.closed) == 1 {
+ return nil, common.ErrClientClosed
+ }
+
+ // Generate unique ID for the request
+ id := atomic.AddInt64(&c.idCounter, 1)
+
+ // Create JSON-RPC request
+ var params interface{}
+ if len(msg.Payload) > 0 {
+ if err := json.Unmarshal(msg.Payload, ¶ms); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal payload:
%w", err)
+ }
+ }
+
+ req, err := NewRequest(msg.Method, params, id)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create request: %w", err)
+ }
+
+ // Marshal request to JSON
+ reqData, err := json.Marshal(req)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal request: %w", err)
+ }
+
+ // Create HTTP request
+ httpReq, err := http.NewRequestWithContext(ctx, "POST", c.endpoint,
bytes.NewReader(reqData))
+ if err != nil {
+ return nil, fmt.Errorf("failed to create HTTP request: %w", err)
+ }
+
+ // Set headers
+ c.mu.RLock()
+ for k, v := range c.headers {
+ httpReq.Header.Set(k, v)
+ }
+ // Add message headers
+ for k, v := range msg.Headers {
+ httpReq.Header.Set(k, v)
+ }
+ c.mu.RUnlock()
+
+ // Send request
+ httpResp, err := c.httpClient.Do(httpReq)
+ if err != nil {
+ return nil, fmt.Errorf("failed to send request: %w", err)
+ }
+ defer func(Body io.ReadCloser) {
+ _ = Body.Close()
+ }(httpResp.Body)
+
+ // Read response body
+ respData, err := io.ReadAll(httpResp.Body)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read response: %w", err)
+ }
+
+ // Check HTTP status
+ if httpResp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("HTTP error: %d %s",
httpResp.StatusCode, string(respData))
+ }
+
+ // Parse JSON-RPC response
+ var jsonResp JSONRPCResponse
+ if err := json.Unmarshal(respData, &jsonResp); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal response: %w", err)
+ }
+
+ // Create common response
+ resp := &common.Response{
+ Headers: make(map[string]string),
+ }
+
+ // Copy HTTP headers to response headers
+ for k, values := range httpResp.Header {
+ if len(values) > 0 {
+ resp.Headers[k] = values[0]
+ }
+ }
+
+ // Handle JSON-RPC error
+ if jsonResp.Error != nil {
+ resp.Error = jsonResp.Error
+ return resp, nil
+ }
+
+ // Set response data
+ resp.Data = jsonResp.Result
+
+ return resp, nil
+}
+
+// Stream makes a streaming RPC call (not supported for basic JSON-RPC)
+func (c *Client) Stream(ctx context.Context, msg *common.Message)
(common.StreamReader, error) {
+ return nil, fmt.Errorf("streaming not supported by basic JSON-RPC
client, use SSE client instead")
+}
+
+// Protocol returns the underlying protocol
+func (c *Client) Protocol() common.Protocol {
+ return common.ProtocolJSONRPC
+}
+
+// Close closes the client connection
+func (c *Client) Close() error {
+ if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
+ return common.ErrClientClosed
+ }
+
+ c.httpClient.CloseIdleConnections()
+ return nil
+}
+
+// SetHeader sets a header for all requests
+func (c *Client) SetHeader(key, value string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.headers[key] = value
+}
+
+// RemoveHeader removes a header
+func (c *Client) RemoveHeader(key string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ delete(c.headers, key)
+}
diff --git a/transport/jsonrpc/jsonrpc.go b/transport/jsonrpc/jsonrpc.go
new file mode 100644
index 00000000..fcc5915b
--- /dev/null
+++ b/transport/jsonrpc/jsonrpc.go
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+ "errors"
+ "seata-go-ai-transport/common"
+)
+
+// Package initialization - register builders with the common registry
+func init() {
+ // Register SSE-enabled builders as the default for JSON-RPC
+ // These support both regular and streaming operations
+ common.RegisterClientBuilder(common.ProtocolJSONRPC,
&SSEClientBuilder{})
+ common.RegisterServerBuilder(common.ProtocolJSONRPC,
&SSEServerBuilder{})
+}
+
+// Public constructors for direct usage
+
+// NewJSONRPCClient creates a new JSON-RPC client directly
+func NewJSONRPCClient(endpoint string) *Client {
+ return NewClient(&ClientConfig{
+ Endpoint: endpoint,
+ Headers: make(map[string]string),
+ })
+}
+
+// NewJSONRPCServer creates a new JSON-RPC server directly
+func NewJSONRPCServer(address string) *Server {
+ return NewServer(&ServerConfig{
+ Address: address,
+ })
+}
+
+// Utility functions for working with JSON-RPC
+
+// IsJSONRPCError checks if an error is a JSON-RPC error
+func IsJSONRPCError(err error) bool {
+ var JSONRPCError *JSONRPCError
+ ok := errors.As(err, &JSONRPCError)
+ return ok
+}
+
+// GetJSONRPCError extracts JSON-RPC error from error
+func GetJSONRPCError(err error) (*JSONRPCError, bool) {
+ var jsonErr *JSONRPCError
+ ok := errors.As(err, &jsonErr)
+ return jsonErr, ok
+}
+
+// CreateTransportError creates a transport error from JSON-RPC error
+func CreateTransportError(jsonErr *JSONRPCError) *common.TransportError {
+ return common.NewTransportError(
+ common.ProtocolJSONRPC,
+ jsonErr.Code,
+ jsonErr.Message,
+ jsonErr,
+ )
+}
diff --git a/transport/jsonrpc/server.go b/transport/jsonrpc/server.go
new file mode 100644
index 00000000..e6d7a771
--- /dev/null
+++ b/transport/jsonrpc/server.go
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "seata-go-ai-transport/common"
+)
+
+// Server implements the common.Server interface for JSON-RPC
+type Server struct {
+ address string
+ server *http.Server
+ handlers map[string]common.Handler
+ streamHandlers map[string]common.StreamHandler
+ mu sync.RWMutex
+ started int32
+ closed int32
+}
+
+// ServerConfig represents JSON-RPC server configuration
+type ServerConfig struct {
+ Address string `json:"address"`
+ ReadTimeout time.Duration `json:"read_timeout"`
+ WriteTimeout time.Duration `json:"write_timeout"`
+ IdleTimeout time.Duration `json:"idle_timeout"`
+}
+
+var _ common.Server = (*Server)(nil)
+
+// NewServer creates a new JSON-RPC server
+func NewServer(config *ServerConfig) *Server {
+ mux := http.NewServeMux()
+ server := &Server{
+ address: config.Address,
+ handlers: make(map[string]common.Handler),
+ streamHandlers: make(map[string]common.StreamHandler),
+ server: &http.Server{
+ Addr: config.Address,
+ Handler: mux,
+ ReadTimeout: config.ReadTimeout,
+ WriteTimeout: config.WriteTimeout,
+ IdleTimeout: config.IdleTimeout,
+ },
+ }
+
+ // Register JSON-RPC endpoint
+ mux.HandleFunc("/", server.handleJSONRPC)
+
+ return server
+}
+
+// Serve starts the server
+func (s *Server) Serve() error {
+ if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
+ return fmt.Errorf("server already started")
+ }
+
+ listener, err := net.Listen("tcp", s.address)
+ if err != nil {
+ atomic.StoreInt32(&s.started, 0)
+ return fmt.Errorf("failed to listen on %s: %w", s.address, err)
+ }
+
+ return s.server.Serve(listener)
+}
+
+// Stop stops the server gracefully
+func (s *Server) Stop(ctx context.Context) error {
+ if !atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
+ return common.ErrServerClosed
+ }
+
+ return s.server.Shutdown(ctx)
+}
+
+// RegisterHandler registers a handler for a method
+func (s *Server) RegisterHandler(method string, handler common.Handler) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.handlers[method] = handler
+}
+
+// RegisterStreamHandler registers a streaming handler
+func (s *Server) RegisterStreamHandler(method string, handler
common.StreamHandler) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.streamHandlers[method] = handler
+}
+
+// Protocol returns the underlying protocol
+func (s *Server) Protocol() common.Protocol {
+ return common.ProtocolJSONRPC
+}
+
+// handleJSONRPC handles JSON-RPC requests
+func (s *Server) handleJSONRPC(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPost {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ // Set response headers
+ w.Header().Set("Content-Type", "application/json")
+
+ // Read request body
+ body, err := io.ReadAll(r.Body)
+ if err != nil {
+ s.writeErrorResponse(w, ParseError, nil)
+ return
+ }
+
+ // Parse JSON-RPC request
+ var req JSONRPCRequest
+ if err := json.Unmarshal(body, &req); err != nil {
+ s.writeErrorResponse(w, ParseError, nil)
+ return
+ }
+
+ // Validate JSON-RPC version
+ if req.JSONRPC != Version {
+ s.writeErrorResponse(w, InvalidRequest, req.ID)
+ return
+ }
+
+ // Handle request
+ s.handleRequest(w, r, &req)
+}
+
+// handleRequest handles a single JSON-RPC request
+func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request, req
*JSONRPCRequest) {
+ // Get handler
+ s.mu.RLock()
+ handler, exists := s.handlers[req.Method]
+ s.mu.RUnlock()
+
+ if !exists {
+ if !req.IsNotification() {
+ s.writeErrorResponse(w, MethodNotFound, req.ID)
+ }
+ return
+ }
+
+ // Create common message
+ msg := &common.Message{
+ Method: req.Method,
+ Payload: req.Params,
+ Headers: make(map[string]string),
+ }
+
+ // Copy HTTP headers
+ for k, values := range r.Header {
+ if len(values) > 0 {
+ msg.Headers[k] = values[0]
+ }
+ }
+
+ // Call handler
+ resp, err := handler(r.Context(), msg)
+ if err != nil {
+ if !req.IsNotification() {
+ // Convert error to JSON-RPC error
+ var jsonErr *JSONRPCError
+ if transportErr, ok := common.GetTransportError(err);
ok {
+ jsonErr = NewJSONRPCError(transportErr.Code,
transportErr.Message, transportErr.Cause)
+ } else {
+ jsonErr = NewJSONRPCError(InternalErrorCode,
err.Error(), nil)
+ }
+ s.writeErrorResponse(w, jsonErr, req.ID)
+ }
+ return
+ }
+
+ // For notifications, don't send response
+ if req.IsNotification() {
+ w.WriteHeader(http.StatusNoContent)
+ return
+ }
+
+ // Create response
+ var result interface{}
+ if len(resp.Data) > 0 {
+ if err := json.Unmarshal(resp.Data, &result); err != nil {
+ s.writeErrorResponse(w,
NewJSONRPCError(InternalErrorCode, "Failed to unmarshal result", err.Error()),
req.ID)
+ return
+ }
+ }
+
+ jsonResp, err := NewResponse(result, req.ID)
+ if err != nil {
+ s.writeErrorResponse(w, NewJSONRPCError(InternalErrorCode,
"Failed to create response", err.Error()), req.ID)
+ return
+ }
+
+ // Set response headers
+ for k, v := range resp.Headers {
+ w.Header().Set(k, v)
+ }
+
+ // Write response
+ respData, err := json.Marshal(jsonResp)
+ if err != nil {
+ s.writeErrorResponse(w, NewJSONRPCError(InternalErrorCode,
"Failed to marshal response", err.Error()), req.ID)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
+ w.Write(respData)
+}
+
+// writeErrorResponse writes a JSON-RPC error response
+func (s *Server) writeErrorResponse(w http.ResponseWriter, jsonErr
*JSONRPCError, id interface{}) {
+ resp := NewErrorResponse(jsonErr, id)
+ data, err := json.Marshal(resp)
+ if err != nil {
+ // If we can't marshal the error response, send a basic HTTP
error
+ http.Error(w, "Internal Server Error",
http.StatusInternalServerError)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK) // JSON-RPC errors are still HTTP 200
+ w.Write(data)
+}
diff --git a/transport/jsonrpc/sse_builder.go b/transport/jsonrpc/sse_builder.go
new file mode 100644
index 00000000..ed081b7d
--- /dev/null
+++ b/transport/jsonrpc/sse_builder.go
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+ "fmt"
+ "time"
+
+ "seata-go-ai-transport/common"
+)
+
+// SSEClientBuilder implements common.ClientBuilder for SSE-enabled JSON-RPC
+type SSEClientBuilder struct{}
+
+var _ common.ClientBuilder = (*SSEClientBuilder)(nil)
+
+// Build creates a new SSE JSON-RPC client with the given config
+func (b *SSEClientBuilder) Build(config *common.Config) (common.Client, error)
{
+ if config.Protocol != common.ProtocolJSONRPC {
+ return nil, fmt.Errorf("invalid protocol %s for SSE JSON-RPC
client", config.Protocol)
+ }
+
+ clientConfig := &ClientConfig{
+ Endpoint: config.Address,
+ Timeout: 30 * time.Second,
+ Headers: make(map[string]string),
+ }
+
+ sseConfig := &SSEClientConfig{
+ ClientConfig: clientConfig,
+ }
+
+ // Parse options
+ if config.Options != nil {
+ if endpoint, ok := config.Options["endpoint"].(string); ok {
+ clientConfig.Endpoint = endpoint
+ }
+ if streamEndpoint, ok :=
config.Options["stream_endpoint"].(string); ok {
+ sseConfig.StreamEndpoint = streamEndpoint
+ }
+ if timeoutStr, ok := config.Options["timeout"].(string); ok {
+ if timeout, err := time.ParseDuration(timeoutStr); err
== nil {
+ clientConfig.Timeout = timeout
+ }
+ }
+ if timeout, ok := config.Options["timeout"].(time.Duration); ok
{
+ clientConfig.Timeout = timeout
+ }
+ if headers, ok :=
config.Options["headers"].(map[string]interface{}); ok {
+ for k, v := range headers {
+ if s, ok := v.(string); ok {
+ clientConfig.Headers[k] = s
+ }
+ }
+ }
+ if headers, ok :=
config.Options["headers"].(map[string]string); ok {
+ for k, v := range headers {
+ clientConfig.Headers[k] = v
+ }
+ }
+ // Support streaming flag to determine if SSE should be used
+ if streaming, ok := config.Options["streaming"].(bool); ok &&
streaming {
+ return NewSSEClient(sseConfig), nil
+ }
+ }
+
+ // Use address as endpoint if not specified in options
+ if clientConfig.Endpoint == "" {
+ clientConfig.Endpoint = config.Address
+ }
+
+ // Return SSE client by default for JSON-RPC with streaming capability
+ return NewSSEClient(sseConfig), nil
+}
+
+// Protocol returns the protocol this builder supports
+func (b *SSEClientBuilder) Protocol() common.Protocol {
+ return common.ProtocolJSONRPC
+}
+
+// SSEServerBuilder implements common.ServerBuilder for SSE-enabled JSON-RPC
+type SSEServerBuilder struct{}
+
+var _ common.ServerBuilder = (*SSEServerBuilder)(nil)
+
+// Build creates a new SSE JSON-RPC server with the given config
+func (b *SSEServerBuilder) Build(config *common.Config) (common.Server, error)
{
+ if config.Protocol != common.ProtocolJSONRPC {
+ return nil, fmt.Errorf("invalid protocol %s for SSE JSON-RPC
server", config.Protocol)
+ }
+
+ serverConfig := &ServerConfig{
+ Address: config.Address,
+ ReadTimeout: 30 * time.Second,
+ WriteTimeout: 30 * time.Second,
+ IdleTimeout: 120 * time.Second,
+ }
+
+ sseConfig := &SSEServerConfig{
+ ServerConfig: serverConfig,
+ }
+
+ // Parse options
+ if config.Options != nil {
+ if streamPath, ok := config.Options["stream_path"].(string); ok
{
+ sseConfig.StreamPath = streamPath
+ }
+ if readTimeoutStr, ok :=
config.Options["read_timeout"].(string); ok {
+ if timeout, err := time.ParseDuration(readTimeoutStr);
err == nil {
+ serverConfig.ReadTimeout = timeout
+ }
+ }
+ if readTimeout, ok :=
config.Options["read_timeout"].(time.Duration); ok {
+ serverConfig.ReadTimeout = readTimeout
+ }
+ if writeTimeoutStr, ok :=
config.Options["write_timeout"].(string); ok {
+ if timeout, err := time.ParseDuration(writeTimeoutStr);
err == nil {
+ serverConfig.WriteTimeout = timeout
+ }
+ }
+ if writeTimeout, ok :=
config.Options["write_timeout"].(time.Duration); ok {
+ serverConfig.WriteTimeout = writeTimeout
+ }
+ if idleTimeoutStr, ok :=
config.Options["idle_timeout"].(string); ok {
+ if timeout, err := time.ParseDuration(idleTimeoutStr);
err == nil {
+ serverConfig.IdleTimeout = timeout
+ }
+ }
+ if idleTimeout, ok :=
config.Options["idle_timeout"].(time.Duration); ok {
+ serverConfig.IdleTimeout = idleTimeout
+ }
+ // Support streaming flag to determine if SSE should be used
+ if streaming, ok := config.Options["streaming"].(bool); ok &&
streaming {
+ return NewSSEServer(sseConfig), nil
+ }
+ }
+
+ // Return SSE server by default for JSON-RPC with streaming capability
+ return NewSSEServer(sseConfig), nil
+}
+
+// Protocol returns the protocol this builder supports
+func (b *SSEServerBuilder) Protocol() common.Protocol {
+ return common.ProtocolJSONRPC
+}
+
+// Convenience constructors
+
+// NewSSEJSONRPCClient creates a new SSE JSON-RPC client directly
+func NewSSEJSONRPCClient(endpoint string, streamEndpoint string) *SSEClient {
+ config := &SSEClientConfig{
+ ClientConfig: &ClientConfig{
+ Endpoint: endpoint,
+ Headers: make(map[string]string),
+ },
+ StreamEndpoint: streamEndpoint,
+ }
+ return NewSSEClient(config)
+}
+
+// NewSSEJSONRPCServer creates a new SSE JSON-RPC server directly
+func NewSSEJSONRPCServer(address string, streamPath string) *SSEServer {
+ config := &SSEServerConfig{
+ ServerConfig: &ServerConfig{
+ Address: address,
+ },
+ StreamPath: streamPath,
+ }
+ return NewSSEServer(config)
+}
diff --git a/transport/jsonrpc/sse_client.go b/transport/jsonrpc/sse_client.go
new file mode 100644
index 00000000..4f22916c
--- /dev/null
+++ b/transport/jsonrpc/sse_client.go
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "sync"
+ "sync/atomic"
+
+ "seata-go-ai-transport/common"
+)
+
+// SSEClient implements streaming JSON-RPC over Server-Sent Events
+type SSEClient struct {
+ *Client
+ streamEndpoint string
+}
+
+// SSEClientConfig extends ClientConfig with streaming endpoint
+type SSEClientConfig struct {
+ *ClientConfig
+ StreamEndpoint string `json:"stream_endpoint"`
+}
+
+var _ common.Client = (*SSEClient)(nil)
+
+// NewSSEClient creates a new SSE-enabled JSON-RPC client
+func NewSSEClient(config *SSEClientConfig) *SSEClient {
+ client := NewClient(config.ClientConfig)
+
+ streamEndpoint := config.StreamEndpoint
+ if streamEndpoint == "" {
+ streamEndpoint = config.Endpoint + "/stream"
+ }
+
+ return &SSEClient{
+ Client: client,
+ streamEndpoint: streamEndpoint,
+ }
+}
+
+// Stream makes a streaming RPC call using SSE
+func (c *SSEClient) Stream(ctx context.Context, msg *common.Message)
(common.StreamReader, error) {
+ if atomic.LoadInt32(&c.closed) == 1 {
+ return nil, common.ErrClientClosed
+ }
+
+ // Generate unique ID for the request
+ id := atomic.AddInt64(&c.idCounter, 1)
+
+ // Create JSON-RPC request
+ var params interface{}
+ if len(msg.Payload) > 0 {
+ if err := json.Unmarshal(msg.Payload, ¶ms); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal payload:
%w", err)
+ }
+ }
+
+ req, err := NewRequest(msg.Method, params, id)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create request: %w", err)
+ }
+
+ // Marshal request to JSON
+ reqData, err := json.Marshal(req)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal request: %w", err)
+ }
+
+ // Create HTTP request for streaming
+ httpReq, err := http.NewRequestWithContext(ctx, "POST",
c.streamEndpoint, bytes.NewReader(reqData))
+ if err != nil {
+ return nil, fmt.Errorf("failed to create HTTP request: %w", err)
+ }
+
+ // Set headers for SSE
+ c.mu.RLock()
+ for k, v := range c.headers {
+ httpReq.Header.Set(k, v)
+ }
+ // Add message headers
+ for k, v := range msg.Headers {
+ httpReq.Header.Set(k, v)
+ }
+ c.mu.RUnlock()
+
+ httpReq.Header.Set("Accept", "text/event-stream")
+ httpReq.Header.Set("Cache-Control", "no-cache")
+
+ // Send request
+ httpResp, err := c.httpClient.Do(httpReq)
+ if err != nil {
+ return nil, fmt.Errorf("failed to send request: %w", err)
+ }
+
+ // Check HTTP status
+ if httpResp.StatusCode != http.StatusOK {
+ _ = httpResp.Body.Close()
+ return nil, fmt.Errorf("HTTP error: %d", httpResp.StatusCode)
+ }
+
+ // Create stream reader
+ return NewSSEStreamReader(ctx, httpResp), nil
+}
+
+// SSEStreamReader implements common.StreamReader for SSE
+type SSEStreamReader struct {
+ ctx context.Context
+ resp *http.Response
+ parser *SSEParser
+ closed int32
+ mu sync.Mutex
+}
+
+var _ common.StreamReader = (*SSEStreamReader)(nil)
+
+// NewSSEStreamReader creates a new SSE stream reader
+func NewSSEStreamReader(ctx context.Context, resp *http.Response)
*SSEStreamReader {
+ return &SSEStreamReader{
+ ctx: ctx,
+ resp: resp,
+ parser: NewSSEParser(ctx, resp.Body),
+ }
+}
+
+// Recv receives the next message from the stream
+func (r *SSEStreamReader) Recv() (*common.StreamResponse, error) {
+ if atomic.LoadInt32(&r.closed) == 1 {
+ return nil, common.ErrStreamClosed
+ }
+
+ event, err := r.parser.NextEvent()
+ if err != nil {
+ return &common.StreamResponse{
+ Done: true,
+ Error: err,
+ }, nil
+ }
+
+ // Parse the SSE data as JSON-RPC response
+ var jsonResp JSONRPCResponse
+ if err := json.Unmarshal([]byte(event.Data), &jsonResp); err != nil {
+ return &common.StreamResponse{
+ Error: fmt.Errorf("failed to unmarshal SSE data: %w",
err),
+ }, nil
+ }
+
+ // Create stream response
+ streamResp := &common.StreamResponse{
+ Headers: make(map[string]string),
+ }
+
+ // Copy headers from HTTP response
+ for k, values := range r.resp.Header {
+ if len(values) > 0 {
+ streamResp.Headers[k] = values[0]
+ }
+ }
+
+ // Add SSE-specific headers
+ if event.ID != "" {
+ streamResp.Headers["SSE-Event-ID"] = event.ID
+ }
+ if event.Event != "" {
+ streamResp.Headers["SSE-Event-Type"] = event.Event
+ }
+
+ // Handle JSON-RPC error
+ if jsonResp.Error != nil {
+ streamResp.Error = jsonResp.Error
+ return streamResp, nil
+ }
+
+ // Set response data
+ streamResp.Data = jsonResp.Result
+
+ // Check for end of stream marker
+ if event.Event == "end" || event.Event == "close" {
+ streamResp.Done = true
+ }
+
+ return streamResp, nil
+}
+
+// Close closes the stream
+func (r *SSEStreamReader) Close() error {
+ if !atomic.CompareAndSwapInt32(&r.closed, 0, 1) {
+ return common.ErrStreamClosed
+ }
+
+ return r.resp.Body.Close()
+}
diff --git a/transport/jsonrpc/sse_server.go b/transport/jsonrpc/sse_server.go
new file mode 100644
index 00000000..91af3227
--- /dev/null
+++ b/transport/jsonrpc/sse_server.go
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "sync"
+ "sync/atomic"
+
+ "seata-go-ai-transport/common"
+)
+
+// SSEServer extends the basic JSON-RPC server with SSE streaming support
+type SSEServer struct {
+ *Server // Embed regular server
+}
+
+// SSEServerConfig extends ServerConfig
+type SSEServerConfig struct {
+ *ServerConfig
+ StreamPath string `json:"stream_path"`
+}
+
+var _ common.Server = (*SSEServer)(nil)
+
+// NewSSEServer creates a new SSE-enabled JSON-RPC server
+func NewSSEServer(config *SSEServerConfig) *SSEServer {
+ server := NewServer(config.ServerConfig)
+
+ streamPath := config.StreamPath
+ if streamPath == "" {
+ streamPath = "/stream"
+ }
+
+ sseServer := &SSEServer{
+ Server: server,
+ }
+
+ // Add SSE streaming endpoint
+ mux := server.server.Handler.(*http.ServeMux)
+ mux.HandleFunc(streamPath, sseServer.handleSSEStream)
+
+ return sseServer
+}
+
+// handleSSEStream handles SSE streaming requests
+func (s *SSEServer) handleSSEStream(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPost {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ // Set SSE headers
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("Connection", "keep-alive")
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Headers", "Cache-Control")
+
+ // Read request body
+ body, err := io.ReadAll(r.Body)
+ if err != nil {
+ s.writeSSEError(w, ParseError, nil)
+ return
+ }
+
+ // Parse JSON-RPC request
+ var req JSONRPCRequest
+ if err := json.Unmarshal(body, &req); err != nil {
+ s.writeSSEError(w, ParseError, nil)
+ return
+ }
+
+ // Validate JSON-RPC version
+ if req.JSONRPC != Version {
+ s.writeSSEError(w, InvalidRequest, req.ID)
+ return
+ }
+
+ // Handle streaming request
+ s.handleStreamRequest(w, r, &req)
+}
+
+// handleStreamRequest handles a single streaming JSON-RPC request
+func (s *SSEServer) handleStreamRequest(w http.ResponseWriter, r
*http.Request, req *JSONRPCRequest) {
+ // Get streaming handler
+ s.mu.RLock()
+ handler, exists := s.streamHandlers[req.Method]
+ s.mu.RUnlock()
+
+ if !exists {
+ s.writeSSEError(w, MethodNotFound, req.ID)
+ return
+ }
+
+ // Create common message
+ msg := &common.Message{
+ Method: req.Method,
+ Payload: req.Params,
+ Headers: make(map[string]string),
+ }
+
+ // Copy HTTP headers
+ for k, values := range r.Header {
+ if len(values) > 0 {
+ msg.Headers[k] = values[0]
+ }
+ }
+
+ // Create stream writer
+ streamWriter := NewSSEStreamWriter(w, req.ID)
+
+ // Call streaming handler
+ if err := handler(r.Context(), msg, streamWriter); err != nil {
+ s.writeSSEError(w, NewJSONRPCError(InternalErrorCode,
err.Error(), nil), req.ID)
+ return
+ }
+
+ // Send end event
+ streamWriter.sendEndEvent()
+}
+
+// writeSSEError writes a JSON-RPC error as SSE event
+func (s *SSEServer) writeSSEError(w http.ResponseWriter, jsonErr
*JSONRPCError, id interface{}) {
+ resp := NewErrorResponse(jsonErr, id)
+ data, err := json.Marshal(resp)
+ if err != nil {
+ // If we can't marshal the error response, send a basic error
event
+ event := &SSEEvent{
+ Event: "error",
+ Data: "Internal Server Error",
+ }
+ fmt.Fprint(w, event.String())
+ return
+ }
+
+ event := &SSEEvent{
+ Event: "error",
+ Data: string(data),
+ }
+ fmt.Fprint(w, event.String())
+
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
+ }
+}
+
+// SSEStreamWriter implements common.StreamWriter for SSE
+type SSEStreamWriter struct {
+ writer http.ResponseWriter
+ flusher http.Flusher
+ id interface{}
+ eventID int64
+ closed int32
+ mu sync.Mutex
+}
+
+var _ common.StreamWriter = (*SSEStreamWriter)(nil)
+
+// NewSSEStreamWriter creates a new SSE stream writer
+func NewSSEStreamWriter(w http.ResponseWriter, id interface{})
*SSEStreamWriter {
+ flusher, _ := w.(http.Flusher)
+ return &SSEStreamWriter{
+ writer: w,
+ flusher: flusher,
+ id: id,
+ }
+}
+
+// Send sends a message to the stream
+func (w *SSEStreamWriter) Send(resp *common.StreamResponse) error {
+ if atomic.LoadInt32(&w.closed) == 1 {
+ return common.ErrStreamClosed
+ }
+
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ // Handle error response
+ if resp.Error != nil {
+ var jsonErr *JSONRPCError
+ var jsonRpcErr *JSONRPCError
+ if errors.As(resp.Error, &jsonRpcErr) {
+ jsonErr = jsonRpcErr
+ }
+
+ errorResp := NewErrorResponse(jsonErr, w.id)
+ data, err := json.Marshal(errorResp)
+ if err != nil {
+ return fmt.Errorf("failed to marshal error response:
%w", err)
+ }
+
+ event := &SSEEvent{
+ ID: fmt.Sprintf("%d", atomic.AddInt64(&w.eventID,
1)),
+ Event: "error",
+ Data: string(data),
+ }
+
+ if _, err := fmt.Fprint(w.writer, event.String()); err != nil {
+ return fmt.Errorf("failed to write SSE event: %w", err)
+ }
+
+ if w.flusher != nil {
+ w.flusher.Flush()
+ }
+ return nil
+ }
+
+ // Create JSON-RPC response
+ var result interface{}
+ if len(resp.Data) > 0 {
+ if err := json.Unmarshal(resp.Data, &result); err != nil {
+ return fmt.Errorf("failed to unmarshal response data:
%w", err)
+ }
+ }
+
+ jsonResp, err := NewResponse(result, w.id)
+ if err != nil {
+ return fmt.Errorf("failed to create JSON-RPC response: %w", err)
+ }
+
+ data, err := json.Marshal(jsonResp)
+ if err != nil {
+ return fmt.Errorf("failed to marshal JSON-RPC response: %w",
err)
+ }
+
+ // Determine event type
+ eventType := "data"
+ if resp.Done {
+ eventType = "end"
+ }
+
+ event := &SSEEvent{
+ ID: fmt.Sprintf("%d", atomic.AddInt64(&w.eventID, 1)),
+ Event: eventType,
+ Data: string(data),
+ }
+
+ if _, err := fmt.Fprint(w.writer, event.String()); err != nil {
+ return fmt.Errorf("failed to write SSE event: %w", err)
+ }
+
+ if w.flusher != nil {
+ w.flusher.Flush()
+ }
+
+ return nil
+}
+
+// Close closes the stream
+func (w *SSEStreamWriter) Close() error {
+ if !atomic.CompareAndSwapInt32(&w.closed, 0, 1) {
+ return common.ErrStreamClosed
+ }
+
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ // Send close event
+ event := &SSEEvent{
+ ID: fmt.Sprintf("%d", atomic.AddInt64(&w.eventID, 1)),
+ Event: "close",
+ Data: "",
+ }
+
+ fmt.Fprint(w.writer, event.String())
+
+ if w.flusher != nil {
+ w.flusher.Flush()
+ }
+
+ return nil
+}
+
+// sendEndEvent sends an end event to mark the stream as complete
+func (w *SSEStreamWriter) sendEndEvent() {
+ if atomic.LoadInt32(&w.closed) == 1 {
+ return
+ }
+
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ event := &SSEEvent{
+ ID: fmt.Sprintf("%d", atomic.AddInt64(&w.eventID, 1)),
+ Event: "end",
+ Data: "",
+ }
+
+ fmt.Fprint(w.writer, event.String())
+
+ if w.flusher != nil {
+ w.flusher.Flush()
+ }
+}
diff --git a/transport/jsonrpc/sse_types.go b/transport/jsonrpc/sse_types.go
new file mode 100644
index 00000000..2be57088
--- /dev/null
+++ b/transport/jsonrpc/sse_types.go
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "io"
+ "strings"
+)
+
+// SSEEvent represents a Server-Sent Event
+type SSEEvent struct {
+ ID string `json:"id,omitempty"`
+ Event string `json:"event,omitempty"`
+ Data string `json:"data"`
+ Retry int `json:"retry,omitempty"`
+}
+
+// String formats the SSE event for transmission
+func (e *SSEEvent) String() string {
+ var sb strings.Builder
+
+ if e.ID != "" {
+ sb.WriteString(fmt.Sprintf("id: %s\n", e.ID))
+ }
+ if e.Event != "" {
+ sb.WriteString(fmt.Sprintf("event: %s\n", e.Event))
+ }
+ if e.Retry > 0 {
+ sb.WriteString(fmt.Sprintf("retry: %d\n", e.Retry))
+ }
+
+ // Handle multi-line data
+ lines := strings.Split(e.Data, "\n")
+ for _, line := range lines {
+ sb.WriteString(fmt.Sprintf("data: %s\n", line))
+ }
+
+ sb.WriteString("\n")
+ return sb.String()
+}
+
+// SSEParser parses Server-Sent Events from a reader
+type SSEParser struct {
+ scanner *bufio.Scanner
+ ctx context.Context
+}
+
+// NewSSEParser creates a new SSE parser
+func NewSSEParser(ctx context.Context, reader io.Reader) *SSEParser {
+ return &SSEParser{
+ scanner: bufio.NewScanner(reader),
+ ctx: ctx,
+ }
+}
+
+// NextEvent reads the next SSE event
+func (p *SSEParser) NextEvent() (*SSEEvent, error) {
+ event := &SSEEvent{}
+ var dataLines []string
+
+ for {
+ select {
+ case <-p.ctx.Done():
+ return nil, p.ctx.Err()
+ default:
+ }
+
+ if !p.scanner.Scan() {
+ if err := p.scanner.Err(); err != nil {
+ return nil, fmt.Errorf("scanner error: %w", err)
+ }
+ // EOF
+ if len(dataLines) > 0 || event.ID != "" || event.Event
!= "" {
+ event.Data = strings.Join(dataLines, "\n")
+ return event, nil
+ }
+ return nil, io.EOF
+ }
+
+ line := p.scanner.Text()
+
+ // Empty line indicates end of event
+ if line == "" {
+ if len(dataLines) > 0 || event.ID != "" || event.Event
!= "" {
+ event.Data = strings.Join(dataLines, "\n")
+ return event, nil
+ }
+ continue
+ }
+
+ // Skip comments
+ if strings.HasPrefix(line, ":") {
+ continue
+ }
+
+ // Parse field
+ colonIndex := strings.Index(line, ":")
+ if colonIndex == -1 {
+ // Field name only
+ switch line {
+ case "data":
+ dataLines = append(dataLines, "")
+ }
+ continue
+ }
+
+ field := line[:colonIndex]
+ value := line[colonIndex+1:]
+
+ // Remove leading space from value
+ if len(value) > 0 && value[0] == ' ' {
+ value = value[1:]
+ }
+
+ switch field {
+ case "id":
+ event.ID = value
+ case "event":
+ event.Event = value
+ case "data":
+ dataLines = append(dataLines, value)
+ case "retry":
+ // Parse retry value (not implemented in this basic
version)
+ }
+ }
+}
diff --git a/transport/jsonrpc/types.go b/transport/jsonrpc/types.go
new file mode 100644
index 00000000..f766c5d1
--- /dev/null
+++ b/transport/jsonrpc/types.go
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jsonrpc
+
+import (
+ "encoding/json"
+ "fmt"
+)
+
+// Version JSONRPC version constant
+const Version = "2.0"
+
+// JSONRPCRequest represents a JSON-RPC 2.0 request
+type JSONRPCRequest struct {
+ JSONRPC string `json:"jsonrpc"`
+ Method string `json:"method"`
+ Params json.RawMessage `json:"params,omitempty"`
+ ID interface{} `json:"id,omitempty"`
+}
+
+// JSONRPCResponse represents a JSON-RPC 2.0 response
+type JSONRPCResponse struct {
+ JSONRPC string `json:"jsonrpc"`
+ Result json.RawMessage `json:"result,omitempty"`
+ Error *JSONRPCError `json:"error,omitempty"`
+ ID interface{} `json:"id,omitempty"`
+}
+
+// JSONRPCError represents a JSON-RPC 2.0 error
+type JSONRPCError struct {
+ Code int `json:"code"`
+ Message string `json:"message"`
+ Data interface{} `json:"data,omitempty"`
+}
+
+// Error implements the error interface
+func (e *JSONRPCError) Error() string {
+ if e.Data != nil {
+ return fmt.Sprintf("jsonrpc error [%d]: %s (data: %v)", e.Code,
e.Message, e.Data)
+ }
+ return fmt.Sprintf("jsonrpc error [%d]: %s", e.Code, e.Message)
+}
+
+// Standard JSON-RPC error codes
+const (
+ ParseErrorCode = -32700
+ InvalidRequestCode = -32600
+ MethodNotFoundCode = -32601
+ InvalidParamsCode = -32602
+ InternalErrorCode = -32603
+ ServerErrorCode = -32000 // to -32099 range for server errors
+)
+
+// Standard JSON-RPC errors
+var (
+ ParseError = &JSONRPCError{Code: ParseErrorCode, Message: "Parse
error"}
+ InvalidRequest = &JSONRPCError{Code: InvalidRequestCode, Message:
"Invalid Request"}
+ MethodNotFound = &JSONRPCError{Code: MethodNotFoundCode, Message:
"Method not found"}
+ InvalidParams = &JSONRPCError{Code: InvalidParamsCode, Message:
"Invalid params"}
+ InternalError = &JSONRPCError{Code: InternalErrorCode, Message:
"Internal error"}
+)
+
+// NewJSONRPCError creates a new JSON-RPC error
+func NewJSONRPCError(code int, message string, data interface{}) *JSONRPCError
{
+ return &JSONRPCError{
+ Code: code,
+ Message: message,
+ Data: data,
+ }
+}
+
+// NewRequest creates a new JSON-RPC request
+func NewRequest(method string, params interface{}, id interface{})
(*JSONRPCRequest, error) {
+ var paramsBytes json.RawMessage
+ if params != nil {
+ data, err := json.Marshal(params)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal params: %w",
err)
+ }
+ paramsBytes = data
+ }
+
+ return &JSONRPCRequest{
+ JSONRPC: Version,
+ Method: method,
+ Params: paramsBytes,
+ ID: id,
+ }, nil
+}
+
+// NewResponse creates a new JSON-RPC response with result
+func NewResponse(result interface{}, id interface{}) (*JSONRPCResponse, error)
{
+ var resultBytes json.RawMessage
+ if result != nil {
+ data, err := json.Marshal(result)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal result: %w",
err)
+ }
+ resultBytes = data
+ }
+
+ return &JSONRPCResponse{
+ JSONRPC: Version,
+ Result: resultBytes,
+ ID: id,
+ }, nil
+}
+
+// NewErrorResponse creates a new JSON-RPC error response
+func NewErrorResponse(err *JSONRPCError, id interface{}) *JSONRPCResponse {
+ return &JSONRPCResponse{
+ JSONRPC: Version,
+ Error: err,
+ ID: id,
+ }
+}
+
+// IsNotification checks if the request is a notification (no ID)
+func (r *JSONRPCRequest) IsNotification() bool {
+ return r.ID == nil
+}
+
+// UnmarshalParams unmarshals the request params into the given value
+func (r *JSONRPCRequest) UnmarshalParams(v interface{}) error {
+ if len(r.Params) == 0 {
+ return nil
+ }
+ return json.Unmarshal(r.Params, v)
+}
+
+// UnmarshalResult unmarshals the response result into the given value
+func (r *JSONRPCResponse) UnmarshalResult(v interface{}) error {
+ if r.Error != nil {
+ return r.Error
+ }
+ if len(r.Result) == 0 {
+ return nil
+ }
+ return json.Unmarshal(r.Result, v)
+}
diff --git a/transport/transport.go b/transport/transport.go
new file mode 100644
index 00000000..39434fa0
--- /dev/null
+++ b/transport/transport.go
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+ "seata-go-ai-transport/common"
+ _ "seata-go-ai-transport/grpc"
+ _ "seata-go-ai-transport/jsonrpc"
+)
+
+// Re-export common types for convenience
+type (
+ Protocol = common.Protocol
+ StreamType = common.StreamType
+ Message = common.Message
+ Response = common.Response
+ StreamResponse = common.StreamResponse
+ Client = common.Client
+ Server = common.Server
+ Handler = common.Handler
+ StreamHandler = common.StreamHandler
+ StreamReader = common.StreamReader
+ StreamWriter = common.StreamWriter
+ Config = common.Config
+ ClientBuilder = common.ClientBuilder
+ ServerBuilder = common.ServerBuilder
+)
+
+// Re-export protocol constants
+const (
+ ProtocolGRPC = common.ProtocolGRPC
+ ProtocolJSONRPC = common.ProtocolJSONRPC
+)
+
+// Re-export stream type constants
+const (
+ StreamTypeUnary = common.StreamTypeUnary
+ StreamTypeServerStream = common.StreamTypeServerStream
+ StreamTypeClientStream = common.StreamTypeClientStream
+ StreamTypeBiDirStream = common.StreamTypeBiDirStream
+)
+
+// Re-export registry functions
+var (
+ RegisterClientBuilder = common.RegisterClientBuilder
+ RegisterServerBuilder = common.RegisterServerBuilder
+ CreateClient = common.CreateClient
+ CreateServer = common.CreateServer
+ GetSupportedProtocols = common.GetSupportedProtocols
+)
+
+// Re-export utility functions
+var (
+ MarshalMessage = common.MarshalMessage
+ UnmarshalResponse = common.UnmarshalResponse
+ UnmarshalStreamResponse = common.UnmarshalStreamResponse
+ CreateResponse = common.CreateResponse
+ CreateStreamResponse = common.CreateStreamResponse
+ CreateErrorResponse = common.CreateErrorResponse
+ CreateErrorStreamResponse = common.CreateErrorStreamResponse
+)
+
+// Re-export common errors
+var (
+ ErrProtocolNotSupported = common.ErrProtocolNotSupported
+ ErrClientClosed = common.ErrClientClosed
+ ErrServerClosed = common.ErrServerClosed
+ ErrStreamClosed = common.ErrStreamClosed
+ ErrInvalidConfig = common.ErrInvalidConfig
+ ErrHandlerNotFound = common.ErrHandlerNotFound
+ ErrTimeout = common.ErrTimeout
+ ErrConnectionFailed = common.ErrConnectionFailed
+)
+
+// Re-export error functions
+var (
+ NewTransportError = common.NewTransportError
+ IsTransportError = common.IsTransportError
+ GetTransportError = common.GetTransportError
+)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]