TJxiaobao commented on code in PR #11:
URL: 
https://github.com/apache/hertzbeat-collector-go/pull/11#discussion_r2330430077


##########
internal/collector/common/job/job_server.go:
##########
@@ -17,51 +17,191 @@
  * under the License.
  */
 
-package transport
+package job
 
 import (
        "context"
+       "fmt"
+       "sync"
 
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/dispatch"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher"
        clrServer 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
 )
 
+// TimeDispatcher interface defines time-based job scheduling
+type TimeDispatcher interface {
+       AddJob(job *jobtypes.Job) error
+       RemoveJob(jobID int64) error
+       Start(ctx context.Context) error
+       Stop() error
+}
+
+// Config represents job service configuration
 type Config struct {
        clrServer.Server
 }
 
+// Runner implements the service runner interface
 type Runner struct {
        Config
+       timeDispatch TimeDispatcher
+       mu           sync.RWMutex
+       runningJobs  map[int64]*jobtypes.Job
+       ctx          context.Context
+       cancel       context.CancelFunc
+}
+
+// AddAsyncCollectJob adds a job to async collection scheduling
+func (r *Runner) AddAsyncCollectJob(job *jobtypes.Job) error {
+       if job == nil {
+               r.Logger.Error(nil, "job cannot be nil")
+               return fmt.Errorf("job cannot be nil")
+       }
+
+       r.Logger.Info("adding async collect job",
+               "jobID", job.ID,
+               "monitorID", job.MonitorID,
+               "app", job.App,
+               "isCyclic", job.IsCyclic)
+
+       r.mu.Lock()
+       defer r.mu.Unlock()
+
+       // Store job in running jobs map
+       r.runningJobs[job.ID] = job
+
+       // Add job to time dispatcher for scheduling
+       if err := r.timeDispatch.AddJob(job); err != nil {
+               delete(r.runningJobs, job.ID)
+               r.Logger.Error(err, "failed to add job to time dispatcher", 
"jobID", job.ID)
+               return fmt.Errorf("failed to add job to time dispatcher: %w", 
err)
+       }
+
+       r.Logger.Info("successfully added job to scheduler", "jobID", job.ID)
+       return nil
+}
+
+// RemoveAsyncCollectJob removes a job from scheduling
+func (r *Runner) RemoveAsyncCollectJob(jobID int64) error {
+       r.Logger.Info("removing async collect job", "jobID", jobID)
+
+       r.mu.Lock()
+       defer r.mu.Unlock()
+
+       // Remove from running jobs
+       delete(r.runningJobs, jobID)
+
+       // Remove from time dispatcher
+       if err := r.timeDispatch.RemoveJob(jobID); err != nil {
+               r.Logger.Error(err, "failed to remove job from time 
dispatcher", "jobID", jobID)
+               return fmt.Errorf("failed to remove job from time dispatcher: 
%w", err)
+       }
+
+       r.Logger.Info("successfully removed job from scheduler", "jobID", jobID)
+       return nil
+}
+
+// RunningJobs returns a copy of currently running jobs
+func (r *Runner) RunningJobs() map[int64]*jobtypes.Job {
+       r.mu.RLock()
+       defer r.mu.RUnlock()
+
+       result := make(map[int64]*jobtypes.Job)
+       for id, job := range r.runningJobs {
+               result[id] = job
+       }
+       return result
 }
 
+// New creates a new job service runner with all components initialized
 func New(srv *Config) *Runner {
+       ctx, cancel := context.WithCancel(context.Background())
 
-       return &Runner{
-               Config: *srv,
+       // Create result handler
+       resultHandler := collect.NewResultHandler(srv.Logger)
+
+       // Create metrics collector
+       metricsCollector := dispatch.NewMetricsCollector(srv.Logger)
+
+       // Create common dispatcher
+       commonDispatcher := dispatcher.NewCommonDispatcher(srv.Logger, 
metricsCollector, resultHandler)
+
+       // Create time dispatcher
+       timeDispatch := dispatcher.NewTimeDispatch(srv.Logger, commonDispatcher)
+
+       runner := &Runner{
+               Config:       *srv,
+               timeDispatch: timeDispatch,
+               runningJobs:  make(map[int64]*jobtypes.Job),
+               ctx:          ctx,
+               cancel:       cancel,
        }
+
+       return runner
 }
 
+// Start starts the job service runner
 func (r *Runner) Start(ctx context.Context) error {
-
        r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", 
r.Info().Name)
+       r.Logger.Info("Starting job service runner")
+
+       // Start the time dispatcher
+       if r.timeDispatch != nil {
+               if err := r.timeDispatch.Start(ctx); err != nil {
+                       r.Logger.Error(err, "failed to start time dispatcher")
+                       return fmt.Errorf("failed to start time dispatcher: 
%w", err)
+               }
+       } else {
+               return fmt.Errorf("time dispatcher is not initialized")
+       }
 
-       r.Logger.Info("Starting job server")
+       r.Logger.Info("job service runner started successfully")
 
        select {
        case <-ctx.Done():
+               r.Logger.Info("job service runner stopped by context")
+               if r.timeDispatch != nil {
+                       if err := r.timeDispatch.Stop(); err != nil {
+                               r.Logger.Error(err, "error stopping time 
dispatcher")
+                       }
+               }
                return nil
        }
 }
 
+// Info returns runner information
 func (r *Runner) Info() collector.Info {
-
        return collector.Info{
-               Name: "job",
+               Name: "job-service",
        }
 }
 
+// Close closes the job service runner and all its components
 func (r *Runner) Close() error {
+       r.Logger.Info("closing job service runner")
+
+       r.cancel()
+
+       // Stop time dispatcher
+       if r.timeDispatch != nil {
+               if err := r.timeDispatch.Stop(); err != nil {
+                       r.Logger.Error(err, "error stopping time dispatcher")
+                       // Continue cleanup despite error
+               }
+       }
+
+       // Stop common dispatcher (via time dispatcher's commonDispatcher field)
+       // This is handled internally by the time dispatcher's Stop method
+
+       // Clear running jobs
+       r.mu.Lock()
+       r.runningJobs = make(map[int64]*jobtypes.Job)
+       r.mu.Unlock()

Review Comment:
   Ok, I will correct this problem. Thank you for your suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to