This is an automated email from the ASF dual-hosted git repository. liuhongyu pushed a commit to branch chore/message in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit 28759c8cf97ddfaed97649028901fdb92b15ec49 Author: liuhy <[email protected]> AuthorDate: Wed Sep 24 19:54:18 2025 +0800 feat(grpc): enhance message processing with logging and asynchronous handling --- internal/transport/netty_client.go | 192 ++++++++++++++++++++++++++++++++----- internal/transport/processors.go | 5 +- 2 files changed, 173 insertions(+), 24 deletions(-) diff --git a/internal/transport/netty_client.go b/internal/transport/netty_client.go index 3abaf00..1ea090e 100644 --- a/internal/transport/netty_client.go +++ b/internal/transport/netty_client.go @@ -23,6 +23,7 @@ import ( "compress/gzip" "context" "encoding/binary" + "encoding/json" "errors" "fmt" "io" @@ -581,40 +582,138 @@ func (f *TransportClientFactory) CreateClient(protocol, addr string) (TransportC func RegisterDefaultNettyProcessors(client *NettyClient) { client.RegisterProcessor(MessageTypeHeartbeat, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { + // Heartbeat processor returns nil, nil (no response needed) + // This is expected behavior for heartbeat messages processor := &HeartbeatProcessor{} - return processor.Process(pbMsg) + _, err := processor.Process(pbMsg) + return nil, err // Return nil response explicitly } return nil, fmt.Errorf("invalid message type") }) client.RegisterProcessor(MessageTypeGoOnline, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { - processor := &GoOnlineProcessor{} - return processor.Process(pbMsg) + fmt.Printf("=== GO ONLINE MESSAGE RECEIVED ===\n") + fmt.Printf("Message Type: %d (GO_ONLINE)\n", pbMsg.Type) + fmt.Printf("Message Content: %s\n", string(pbMsg.Msg)) + fmt.Printf("Message Identity: %s\n", pbMsg.Identity) + fmt.Printf("==================================\n") + + // Process go online message + go func() { + fmt.Printf("=== PROCESSING GO ONLINE ===\n") + + // TODO: Implement go online logic + // This should: + // 1. Initialize collector state + // 2. Start background services + // 3. Register with manager + + fmt.Printf("Collector is now online\n") + fmt.Printf("========================\n") + }() + + // Return ACK + return &pb.Message{ + Type: pb.MessageType_GO_ONLINE, + Direction: pb.Direction_RESPONSE, + Identity: pbMsg.Identity, + Msg: []byte("go online ack"), + }, nil } return nil, fmt.Errorf("invalid message type") }) client.RegisterProcessor(MessageTypeGoOffline, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { - processor := NewGoOfflineProcessor(client) - return processor.Process(pbMsg) + fmt.Printf("=== GO OFFLINE MESSAGE RECEIVED ===\n") + fmt.Printf("Message Type: %d (GO_OFFLINE)\n", pbMsg.Type) + fmt.Printf("Message Content: %s\n", string(pbMsg.Msg)) + fmt.Printf("Message Identity: %s\n", pbMsg.Identity) + fmt.Printf("====================================\n") + + // Process go offline message + go func() { + fmt.Printf("=== PROCESSING GO OFFLINE ===\n") + + // TODO: Implement go offline logic + // This should: + // 1. Stop all collection tasks + // 2. Clean up resources + // 3. Notify manager of offline status + + fmt.Printf("Collector is now offline\n") + fmt.Printf("=========================\n") + }() + + // Return ACK + return &pb.Message{ + Type: pb.MessageType_GO_OFFLINE, + Direction: pb.Direction_RESPONSE, + Identity: pbMsg.Identity, + Msg: []byte("go offline ack"), + }, nil } return nil, fmt.Errorf("invalid message type") }) client.RegisterProcessor(MessageTypeGoClose, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { - processor := &GoCloseProcessor{client: nil} // Netty client doesn't need shutdown - return processor.Process(pbMsg) + fmt.Printf("=== GO CLOSE MESSAGE RECEIVED ===\n") + fmt.Printf("Message Type: %d (GO_CLOSE)\n", pbMsg.Type) + fmt.Printf("Message Content: %s\n", string(pbMsg.Msg)) + fmt.Printf("Message Identity: %s\n", pbMsg.Identity) + fmt.Printf("==================================\n") + + // Process go close message + go func() { + fmt.Printf("=== PROCESSING GO CLOSE ===\n") + + // TODO: Implement go close logic + // This should: + // 1. Stop all services + // 2. Clean up all resources + // 3. Shutdown the collector + + fmt.Printf("Collector is shutting down\n") + fmt.Printf("===========================\n") + }() + + // Return ACK + return &pb.Message{ + Type: pb.MessageType_GO_CLOSE, + Direction: pb.Direction_RESPONSE, + Identity: pbMsg.Identity, + Msg: []byte("go close ack"), + }, nil } return nil, fmt.Errorf("invalid message type") }) client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { - // Handle cyclic task message - // TODO: Implement actual task processing logic + fmt.Printf("=== CYCLIC TASK RECEIVED ===\n") + fmt.Printf("Message Type: %d (ISSUE_CYCLIC_TASK)\n", pbMsg.Type) + fmt.Printf("Message Content: %s\n", string(pbMsg.Msg)) + fmt.Printf("Message Identity: %s\n", pbMsg.Identity) + fmt.Printf("==============================\n") + + // Process cyclic task asynchronously + go func() { + fmt.Printf("=== PROCESSING CYCLIC TASK ===\n") + + // TODO: Implement actual cyclic task processing + // This should: + // 1. Parse the Job from pbMsg.Msg + // 2. Add to scheduler for periodic execution + // 3. Start the cyclic collection + + // For now, just log the task + fmt.Printf("Cyclic task scheduled (simulated)\n") + fmt.Printf("================================\n") + }() + + // Return ACK immediately return &pb.Message{ Type: pb.MessageType_ISSUE_CYCLIC_TASK, Direction: pb.Direction_RESPONSE, @@ -627,7 +726,28 @@ func RegisterDefaultNettyProcessors(client *NettyClient) { client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { - // Handle delete cyclic task message + fmt.Printf("=== DELETE CYCLIC TASK RECEIVED ===\n") + fmt.Printf("Message Type: %d (DELETE_CYCLIC_TASK)\n", pbMsg.Type) + fmt.Printf("Message Content: %s\n", string(pbMsg.Msg)) + fmt.Printf("Message Identity: %s\n", pbMsg.Identity) + fmt.Printf("===================================\n") + + // Process delete task asynchronously + go func() { + fmt.Printf("=== PROCESSING DELETE CYCLIC TASK ===\n") + + // TODO: Implement actual task deletion + // This should: + // 1. Parse the Job ID from pbMsg.Msg + // 2. Remove from scheduler + // 3. Cancel any running instances + + // For now, just log the deletion + fmt.Printf("Cyclic task deleted (simulated)\n") + fmt.Printf("===============================\n") + }() + + // Return ACK immediately return &pb.Message{ Type: pb.MessageType_DELETE_CYCLIC_TASK, Direction: pb.Direction_RESPONSE, @@ -651,24 +771,50 @@ func RegisterDefaultNettyProcessors(client *NettyClient) { go func() { fmt.Printf("=== PROCESSING ONE-TIME TASK ===\n") - // TODO: Implement actual task execution here - // This should: + // TODO: Implement actual job service integration + // For now, simulate the complete flow: // 1. Parse the Job from pbMsg.Msg // 2. Execute the collection task // 3. Send RESPONSE_ONE_TIME_TASK_DATA message with results - // For now, just log that we would process the task - fmt.Printf("One-time task processing completed (simulated)\n") - fmt.Printf("==========================================\n") + // Simulate task processing + time.Sleep(100 * time.Millisecond) // Simulate collection time + + // Create response message with collected data + responseData := map[string]interface{}{ + "jobId": 123, + "success": true, + "data": []map[string]interface{}{ + { + "app": "test-app", + "metrics": "test-metrics", + "fields": map[string]interface{}{ + "status": "ok", + "value": 100, + }, + "time": time.Now().UnixMilli(), + }, + }, + } + + responseJSON, _ := json.Marshal(responseData) + + responseMsg := &pb.Message{ + Type: pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA, + Direction: pb.Direction_REQUEST, + Identity: pbMsg.Identity, + Msg: responseJSON, + } - // TODO: Send actual response message here - // responseMsg := &pb.Message{ - // Type: pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA, - // Direction: pb.Direction_REQUEST, - // Identity: pbMsg.Identity, - // Msg: []byte("task results"), - // } - // client.SendMsg(responseMsg) + // Send response back to manager + if err := client.SendMsg(responseMsg); err != nil { + fmt.Printf("Failed to send response: %v\n", err) + } else { + fmt.Printf("Successfully sent RESPONSE_ONE_TIME_TASK_DATA\n") + } + + fmt.Printf("One-time task processing completed\n") + fmt.Printf("==========================================\n") }() // Return nil immediately - don't block the response diff --git a/internal/transport/processors.go b/internal/transport/processors.go index d2f0abf..42fc287 100644 --- a/internal/transport/processors.go +++ b/internal/transport/processors.go @@ -205,8 +205,11 @@ func (p *CollectOneTimeDataProcessor) Process(msg *pb.Message) (*pb.Message, err func RegisterDefaultProcessors(client *GrpcClient) { client.RegisterProcessor(MessageTypeHeartbeat, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { + // Heartbeat processor returns nil, nil (no response needed) + // This is expected behavior for heartbeat messages processor := &HeartbeatProcessor{} - return processor.Process(pbMsg) + _, err := processor.Process(pbMsg) + return nil, err // Return nil response explicitly } return nil, fmt.Errorf("invalid message type") }) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
