This is an automated email from the ASF dual-hosted git repository.

liuhongyu pushed a commit to branch chore/message
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git

commit 28759c8cf97ddfaed97649028901fdb92b15ec49
Author: liuhy <[email protected]>
AuthorDate: Wed Sep 24 19:54:18 2025 +0800

    feat(grpc): enhance message processing with logging and asynchronous 
handling
---
 internal/transport/netty_client.go | 192 ++++++++++++++++++++++++++++++++-----
 internal/transport/processors.go   |   5 +-
 2 files changed, 173 insertions(+), 24 deletions(-)

diff --git a/internal/transport/netty_client.go 
b/internal/transport/netty_client.go
index 3abaf00..1ea090e 100644
--- a/internal/transport/netty_client.go
+++ b/internal/transport/netty_client.go
@@ -23,6 +23,7 @@ import (
        "compress/gzip"
        "context"
        "encoding/binary"
+       "encoding/json"
        "errors"
        "fmt"
        "io"
@@ -581,40 +582,138 @@ func (f *TransportClientFactory) CreateClient(protocol, 
addr string) (TransportC
 func RegisterDefaultNettyProcessors(client *NettyClient) {
        client.RegisterProcessor(MessageTypeHeartbeat, func(msg interface{}) 
(interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
+                       // Heartbeat processor returns nil, nil (no response 
needed)
+                       // This is expected behavior for heartbeat messages
                        processor := &HeartbeatProcessor{}
-                       return processor.Process(pbMsg)
+                       _, err := processor.Process(pbMsg)
+                       return nil, err // Return nil response explicitly
                }
                return nil, fmt.Errorf("invalid message type")
        })
 
        client.RegisterProcessor(MessageTypeGoOnline, func(msg interface{}) 
(interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := &GoOnlineProcessor{}
-                       return processor.Process(pbMsg)
+                       fmt.Printf("=== GO ONLINE MESSAGE RECEIVED ===\n")
+                       fmt.Printf("Message Type: %d (GO_ONLINE)\n", pbMsg.Type)
+                       fmt.Printf("Message Content: %s\n", string(pbMsg.Msg))
+                       fmt.Printf("Message Identity: %s\n", pbMsg.Identity)
+                       fmt.Printf("==================================\n")
+
+                       // Process go online message
+                       go func() {
+                               fmt.Printf("=== PROCESSING GO ONLINE ===\n")
+
+                               // TODO: Implement go online logic
+                               // This should:
+                               // 1. Initialize collector state
+                               // 2. Start background services
+                               // 3. Register with manager
+
+                               fmt.Printf("Collector is now online\n")
+                               fmt.Printf("========================\n")
+                       }()
+
+                       // Return ACK
+                       return &pb.Message{
+                               Type:      pb.MessageType_GO_ONLINE,
+                               Direction: pb.Direction_RESPONSE,
+                               Identity:  pbMsg.Identity,
+                               Msg:       []byte("go online ack"),
+                       }, nil
                }
                return nil, fmt.Errorf("invalid message type")
        })
 
        client.RegisterProcessor(MessageTypeGoOffline, func(msg interface{}) 
(interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := NewGoOfflineProcessor(client)
-                       return processor.Process(pbMsg)
+                       fmt.Printf("=== GO OFFLINE MESSAGE RECEIVED ===\n")
+                       fmt.Printf("Message Type: %d (GO_OFFLINE)\n", 
pbMsg.Type)
+                       fmt.Printf("Message Content: %s\n", string(pbMsg.Msg))
+                       fmt.Printf("Message Identity: %s\n", pbMsg.Identity)
+                       fmt.Printf("====================================\n")
+
+                       // Process go offline message
+                       go func() {
+                               fmt.Printf("=== PROCESSING GO OFFLINE ===\n")
+
+                               // TODO: Implement go offline logic
+                               // This should:
+                               // 1. Stop all collection tasks
+                               // 2. Clean up resources
+                               // 3. Notify manager of offline status
+
+                               fmt.Printf("Collector is now offline\n")
+                               fmt.Printf("=========================\n")
+                       }()
+
+                       // Return ACK
+                       return &pb.Message{
+                               Type:      pb.MessageType_GO_OFFLINE,
+                               Direction: pb.Direction_RESPONSE,
+                               Identity:  pbMsg.Identity,
+                               Msg:       []byte("go offline ack"),
+                       }, nil
                }
                return nil, fmt.Errorf("invalid message type")
        })
 
        client.RegisterProcessor(MessageTypeGoClose, func(msg interface{}) 
(interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := &GoCloseProcessor{client: nil} // Netty 
client doesn't need shutdown
-                       return processor.Process(pbMsg)
+                       fmt.Printf("=== GO CLOSE MESSAGE RECEIVED ===\n")
+                       fmt.Printf("Message Type: %d (GO_CLOSE)\n", pbMsg.Type)
+                       fmt.Printf("Message Content: %s\n", string(pbMsg.Msg))
+                       fmt.Printf("Message Identity: %s\n", pbMsg.Identity)
+                       fmt.Printf("==================================\n")
+
+                       // Process go close message
+                       go func() {
+                               fmt.Printf("=== PROCESSING GO CLOSE ===\n")
+
+                               // TODO: Implement go close logic
+                               // This should:
+                               // 1. Stop all services
+                               // 2. Clean up all resources
+                               // 3. Shutdown the collector
+
+                               fmt.Printf("Collector is shutting down\n")
+                               fmt.Printf("===========================\n")
+                       }()
+
+                       // Return ACK
+                       return &pb.Message{
+                               Type:      pb.MessageType_GO_CLOSE,
+                               Direction: pb.Direction_RESPONSE,
+                               Identity:  pbMsg.Identity,
+                               Msg:       []byte("go close ack"),
+                       }, nil
                }
                return nil, fmt.Errorf("invalid message type")
        })
 
        client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       // Handle cyclic task message
-                       // TODO: Implement actual task processing logic
+                       fmt.Printf("=== CYCLIC TASK RECEIVED ===\n")
+                       fmt.Printf("Message Type: %d (ISSUE_CYCLIC_TASK)\n", 
pbMsg.Type)
+                       fmt.Printf("Message Content: %s\n", string(pbMsg.Msg))
+                       fmt.Printf("Message Identity: %s\n", pbMsg.Identity)
+                       fmt.Printf("==============================\n")
+
+                       // Process cyclic task asynchronously
+                       go func() {
+                               fmt.Printf("=== PROCESSING CYCLIC TASK ===\n")
+
+                               // TODO: Implement actual cyclic task processing
+                               // This should:
+                               // 1. Parse the Job from pbMsg.Msg
+                               // 2. Add to scheduler for periodic execution
+                               // 3. Start the cyclic collection
+
+                               // For now, just log the task
+                               fmt.Printf("Cyclic task scheduled 
(simulated)\n")
+                               fmt.Printf("================================\n")
+                       }()
+
+                       // Return ACK immediately
                        return &pb.Message{
                                Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
                                Direction: pb.Direction_RESPONSE,
@@ -627,7 +726,28 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
 
        client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       // Handle delete cyclic task message
+                       fmt.Printf("=== DELETE CYCLIC TASK RECEIVED ===\n")
+                       fmt.Printf("Message Type: %d (DELETE_CYCLIC_TASK)\n", 
pbMsg.Type)
+                       fmt.Printf("Message Content: %s\n", string(pbMsg.Msg))
+                       fmt.Printf("Message Identity: %s\n", pbMsg.Identity)
+                       fmt.Printf("===================================\n")
+
+                       // Process delete task asynchronously
+                       go func() {
+                               fmt.Printf("=== PROCESSING DELETE CYCLIC TASK 
===\n")
+
+                               // TODO: Implement actual task deletion
+                               // This should:
+                               // 1. Parse the Job ID from pbMsg.Msg
+                               // 2. Remove from scheduler
+                               // 3. Cancel any running instances
+
+                               // For now, just log the deletion
+                               fmt.Printf("Cyclic task deleted (simulated)\n")
+                               fmt.Printf("===============================\n")
+                       }()
+
+                       // Return ACK immediately
                        return &pb.Message{
                                Type:      pb.MessageType_DELETE_CYCLIC_TASK,
                                Direction: pb.Direction_RESPONSE,
@@ -651,24 +771,50 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
                        go func() {
                                fmt.Printf("=== PROCESSING ONE-TIME TASK ===\n")
 
-                               // TODO: Implement actual task execution here
-                               // This should:
+                               // TODO: Implement actual job service 
integration
+                               // For now, simulate the complete flow:
                                // 1. Parse the Job from pbMsg.Msg
                                // 2. Execute the collection task
                                // 3. Send RESPONSE_ONE_TIME_TASK_DATA message 
with results
 
-                               // For now, just log that we would process the 
task
-                               fmt.Printf("One-time task processing completed 
(simulated)\n")
-                               
fmt.Printf("==========================================\n")
+                               // Simulate task processing
+                               time.Sleep(100 * time.Millisecond) // Simulate 
collection time
+
+                               // Create response message with collected data
+                               responseData := map[string]interface{}{
+                                       "jobId":   123,
+                                       "success": true,
+                                       "data": []map[string]interface{}{
+                                               {
+                                                       "app":     "test-app",
+                                                       "metrics": 
"test-metrics",
+                                                       "fields": 
map[string]interface{}{
+                                                               "status": "ok",
+                                                               "value":  100,
+                                                       },
+                                                       "time": 
time.Now().UnixMilli(),
+                                               },
+                                       },
+                               }
+
+                               responseJSON, _ := json.Marshal(responseData)
+
+                               responseMsg := &pb.Message{
+                                       Type:      
pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA,
+                                       Direction: pb.Direction_REQUEST,
+                                       Identity:  pbMsg.Identity,
+                                       Msg:       responseJSON,
+                               }
 
-                               // TODO: Send actual response message here
-                               // responseMsg := &pb.Message{
-                               //     Type:      
pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA,
-                               //     Direction: pb.Direction_REQUEST,
-                               //     Identity:  pbMsg.Identity,
-                               //     Msg:       []byte("task results"),
-                               // }
-                               // client.SendMsg(responseMsg)
+                               // Send response back to manager
+                               if err := client.SendMsg(responseMsg); err != 
nil {
+                                       fmt.Printf("Failed to send response: 
%v\n", err)
+                               } else {
+                                       fmt.Printf("Successfully sent 
RESPONSE_ONE_TIME_TASK_DATA\n")
+                               }
+
+                               fmt.Printf("One-time task processing 
completed\n")
+                               
fmt.Printf("==========================================\n")
                        }()
 
                        // Return nil immediately - don't block the response
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index d2f0abf..42fc287 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -205,8 +205,11 @@ func (p *CollectOneTimeDataProcessor) Process(msg 
*pb.Message) (*pb.Message, err
 func RegisterDefaultProcessors(client *GrpcClient) {
        client.RegisterProcessor(MessageTypeHeartbeat, func(msg interface{}) 
(interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
+                       // Heartbeat processor returns nil, nil (no response 
needed)
+                       // This is expected behavior for heartbeat messages
                        processor := &HeartbeatProcessor{}
-                       return processor.Process(pbMsg)
+                       _, err := processor.Process(pbMsg)
+                       return nil, err // Return nil response explicitly
                }
                return nil, fmt.Errorf("invalid message type")
        })


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to