fuziye01 commented on a change in pull request #773: URL: https://github.com/apache/servicecomb-service-center/pull/773#discussion_r535905431
########## File path: syncer/server/server.go ########## @@ -313,96 +305,51 @@ func (s *Server) addToQueue(event *dump.WatchInstanceChangedEvent) { } } - s.queueLock.Lock() - s.eventQueue = append(s.eventQueue, event) - log.Debugf("success add instance event to queue:%s len:%s", event, len(s.eventQueue)) - s.queueLock.Unlock() -} - -func instFromOtherSC(instance *dump.Instance, m *pb.MappingEntry) bool { - if instance.Value.InstanceId == m.CurInstanceID && m.OrgInstanceID != "" { - return true - } - return false -} - -func (s *Server) getRevision(addr string) int64 { - s.mapLock.RLock() - value, ok := s.revisionMap[addr] - s.mapLock.RUnlock() - if ok { - return value.revision + for _, ch := range s.channelMap { + select { + case ch <- event: + log.Info("add event to queue") + default: + log.Info("channel buffer is full") + } } - return -1 } -func (s *Server) getAction(addr string) string { - s.mapLock.RLock() - value, ok := s.revisionMap[addr] - s.mapLock.RUnlock() +func (s *Server) getSyncDataLength(addr string) (response *pb.DeclareResponse) { + ch, ok := s.channelMap[addr] + var length int64 if ok { - return value.action + length = int64(len(ch)) + } else { + length = 0 + log.Error("fail to find the specific channel according to the addr", utils.ErrChannelSearch) } - return "" -} - -func (s *Server) getSyncDataLength(addr string) (response *pb.DeclareResponse) { response = &pb.DeclareResponse{ - SyncDataLength: int64(len(s.GetIncrementQueue(addr))), + SyncDataLength: length, } return response } -func (s *Server) updateRevisionMap(addr string, incrementQueue []*dump.WatchInstanceChangedEvent) { - if len(incrementQueue) == 0 { - log.Info("incrementQueue is empty, no need to update RevisionMap") - return - } - - log.Debug(fmt.Sprintf("update RevisionMap, addr = %s", addr)) - s.mapLock.Lock() - s.revisionMap[addr] = record{ - incrementQueue[len(incrementQueue)-1].Revision, - incrementQueue[len(incrementQueue)-1].Action, - } - s.mapLock.Unlock() -} - -func (s *Server) GetIncrementQueue(addr string) []*dump.WatchInstanceChangedEvent { - revision := s.getRevision(addr) - action := s.getAction(addr) - - s.queueLock.RLock() - defer s.queueLock.RUnlock() - - length := len(s.eventQueue) - if length == 0 { - log.Info("eventQueue is empty") +func (s *Server) GetIncrementQueue(addr string) (queue []*dump.WatchInstanceChangedEvent) { + ch, ok := s.channelMap[addr] + if !ok { + log.Debug("fail to find the queue according to the addr") return nil } - if revision == -1 { - return s.eventQueue - } - - index := 0 - for _, event := range s.eventQueue { - if event.Revision == revision && event.Action == action { - break + queue = make([]*dump.WatchInstanceChangedEvent, 0, len(ch)) + for { + select { + case temp, ok := <-ch: + if !ok { + log.Debug("channel closed") + return + } + queue = append(queue, temp) Review comment: 此处取出的长度,是否与syncDatalength长度相同? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org