Copilot commented on code in PR #888:
URL: 
https://github.com/apache/skywalking-banyandb/pull/888#discussion_r2603133841


##########
banyand/liaison/grpc/stream.go:
##########
@@ -74,33 +76,149 @@ func (s *streamService) activeQueryAccessLog(root string, 
sampled bool) (err err
        return nil
 }
 
-func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest) 
error {
-       if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err != 
nil {
-               s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the 
element time is invalid")
-               return err
+func (s *streamService) validateWriteRequest(writeEntity 
*streamv1.WriteRequest,
+       metadata *commonv1.Metadata, stream streamv1.StreamService_WriteServer,
+) modelv1.Status {
+       if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp); 
errTime != nil {
+               s.l.Error().Err(errTime).Stringer("written", 
writeEntity).Msg("the element time is invalid")
+               s.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP, 
writeEntity.GetMessageId(), stream)
+               return modelv1.Status_STATUS_INVALID_TIMESTAMP
        }
-       return nil
-}
 
-func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest) 
error {
-       if writeEntity.Metadata.ModRevision > 0 {
-               streamCache, existed := 
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+       if metadata.ModRevision > 0 {
+               streamCache, existed := s.entityRepo.getLocator(getID(metadata))
                if !existed {
-                       return errors.New("stream schema not found")
+                       s.l.Error().Stringer("written", 
writeEntity).Msg("stream schema not found")
+                       s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND, 
writeEntity.GetMessageId(), stream)
+                       return modelv1.Status_STATUS_NOT_FOUND
                }
-               if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
-                       return errors.New("expired stream schema")
+               if metadata.ModRevision != streamCache.ModRevision {
+                       s.l.Error().Stringer("written", writeEntity).Msg("the 
stream schema is expired")
+                       s.sendReply(metadata, 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream)
+                       return modelv1.Status_STATUS_EXPIRED_SCHEMA
+               }
+       }
+
+       return modelv1.Status_STATUS_SUCCEED
+}
+
+func (s *streamService) navigate(metadata *commonv1.Metadata,
+       writeRequest *streamv1.WriteRequest, spec []*streamv1.TagFamilySpec,
+) (pbv1.EntityValues, common.ShardID, error) {
+       tagFamilies := writeRequest.GetElement().GetTagFamilies()
+       if spec == nil {
+               return s.navigateByLocator(metadata, tagFamilies)
+       }
+       return s.navigateByTagSpec(metadata, spec, tagFamilies)
+}
+
+func (s *streamService) navigateByTagSpec(
+       metadata *commonv1.Metadata, spec []*streamv1.TagFamilySpec, 
tagFamilies []*modelv1.TagFamilyForWrite,
+) (pbv1.EntityValues, common.ShardID, error) {
+       shardNum, existed := s.groupRepo.shardNum(metadata.Group)
+       if !existed {
+               return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding the shard num by: %v", metadata)
+       }
+       id := getID(metadata)
+       stream, ok := s.entityRepo.getStream(id)
+       if !ok {
+               return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding stream schema by: %v", metadata)
+       }
+       specFamilyMap, specTagMaps := s.buildSpecMaps(spec)
+
+       entityValues := s.findTagValuesByNames(
+               metadata.Name,
+               stream.GetTagFamilies(),
+               tagFamilies,
+               stream.GetEntity().GetTagNames(),
+               specFamilyMap,
+               specTagMaps,
+       )
+       entity, err := entityValues.ToEntity()
+       if err != nil {
+               return nil, common.ShardID(0), err
+       }
+
+       shardID, err := partition.ShardID(entity.Marshal(), shardNum)
+       if err != nil {
+               return nil, common.ShardID(0), err
+       }
+       return entityValues, common.ShardID(shardID), nil
+}
+
+func (s *streamService) buildSpecMaps(spec []*streamv1.TagFamilySpec) 
(map[string]int, map[string]map[string]int) {
+       specFamilyMap := make(map[string]int)
+       specTagMaps := make(map[string]map[string]int)
+       for i, specFamily := range spec {
+               specFamilyMap[specFamily.GetName()] = i
+               tagMap := make(map[string]int)

Review Comment:
   Map initialized without capacity hint when size is known. Change 
`make(map[string]int)` to `make(map[string]int, len(specFamily.GetTagNames()))` 
to avoid reallocation.
   ```suggestion
                tagMap := make(map[string]int, len(specFamily.GetTagNames()))
   ```



##########
banyand/liaison/grpc/stream.go:
##########
@@ -74,33 +76,149 @@ func (s *streamService) activeQueryAccessLog(root string, 
sampled bool) (err err
        return nil
 }
 
-func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest) 
error {
-       if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err != 
nil {
-               s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the 
element time is invalid")
-               return err
+func (s *streamService) validateWriteRequest(writeEntity 
*streamv1.WriteRequest,
+       metadata *commonv1.Metadata, stream streamv1.StreamService_WriteServer,
+) modelv1.Status {
+       if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp); 
errTime != nil {
+               s.l.Error().Err(errTime).Stringer("written", 
writeEntity).Msg("the element time is invalid")
+               s.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP, 
writeEntity.GetMessageId(), stream)
+               return modelv1.Status_STATUS_INVALID_TIMESTAMP
        }
-       return nil
-}
 
-func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest) 
error {
-       if writeEntity.Metadata.ModRevision > 0 {
-               streamCache, existed := 
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+       if metadata.ModRevision > 0 {
+               streamCache, existed := s.entityRepo.getLocator(getID(metadata))
                if !existed {
-                       return errors.New("stream schema not found")
+                       s.l.Error().Stringer("written", 
writeEntity).Msg("stream schema not found")
+                       s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND, 
writeEntity.GetMessageId(), stream)
+                       return modelv1.Status_STATUS_NOT_FOUND
                }
-               if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
-                       return errors.New("expired stream schema")
+               if metadata.ModRevision != streamCache.ModRevision {
+                       s.l.Error().Stringer("written", writeEntity).Msg("the 
stream schema is expired")
+                       s.sendReply(metadata, 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream)
+                       return modelv1.Status_STATUS_EXPIRED_SCHEMA
+               }
+       }
+
+       return modelv1.Status_STATUS_SUCCEED
+}
+
+func (s *streamService) navigate(metadata *commonv1.Metadata,
+       writeRequest *streamv1.WriteRequest, spec []*streamv1.TagFamilySpec,
+) (pbv1.EntityValues, common.ShardID, error) {
+       tagFamilies := writeRequest.GetElement().GetTagFamilies()
+       if spec == nil {
+               return s.navigateByLocator(metadata, tagFamilies)
+       }
+       return s.navigateByTagSpec(metadata, spec, tagFamilies)
+}
+
+func (s *streamService) navigateByTagSpec(
+       metadata *commonv1.Metadata, spec []*streamv1.TagFamilySpec, 
tagFamilies []*modelv1.TagFamilyForWrite,
+) (pbv1.EntityValues, common.ShardID, error) {
+       shardNum, existed := s.groupRepo.shardNum(metadata.Group)
+       if !existed {
+               return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding the shard num by: %v", metadata)
+       }
+       id := getID(metadata)
+       stream, ok := s.entityRepo.getStream(id)
+       if !ok {
+               return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding stream schema by: %v", metadata)
+       }
+       specFamilyMap, specTagMaps := s.buildSpecMaps(spec)
+
+       entityValues := s.findTagValuesByNames(
+               metadata.Name,
+               stream.GetTagFamilies(),
+               tagFamilies,
+               stream.GetEntity().GetTagNames(),
+               specFamilyMap,
+               specTagMaps,
+       )
+       entity, err := entityValues.ToEntity()
+       if err != nil {
+               return nil, common.ShardID(0), err
+       }
+
+       shardID, err := partition.ShardID(entity.Marshal(), shardNum)
+       if err != nil {
+               return nil, common.ShardID(0), err
+       }
+       return entityValues, common.ShardID(shardID), nil
+}
+
+func (s *streamService) buildSpecMaps(spec []*streamv1.TagFamilySpec) 
(map[string]int, map[string]map[string]int) {
+       specFamilyMap := make(map[string]int)
+       specTagMaps := make(map[string]map[string]int)

Review Comment:
   Maps initialized without capacity hints when size is known. Initialize 
`specFamilyMap` with capacity `len(spec)` and `specTagMaps` with capacity 
`len(spec)` to avoid reallocation:
   ```go
   specFamilyMap := make(map[string]int, len(spec))
   specTagMaps := make(map[string]map[string]int, len(spec))
   ```
   ```suggestion
        specFamilyMap := make(map[string]int, len(spec))
        specTagMaps := make(map[string]map[string]int, len(spec))
   ```



##########
banyand/liaison/grpc/discovery.go:
##########
@@ -255,11 +257,17 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
        e.RWMutex.Lock()
        defer e.RWMutex.Unlock()
        e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators, 
ModRevision: modRevision}
-       if schemaMetadata.Kind == schema.KindMeasure {
+       switch schemaMetadata.Kind {
+       case schema.KindMeasure:
                measure := schemaMetadata.Spec.(*databasev1.Measure)
                e.measureMap[id] = measure
-       } else {
-               delete(e.measureMap, id) // Ensure measure is not stored for 
streams
+       case schema.KindStream:
+               stream := schemaMetadata.Spec.(*databasev1.Stream)
+               e.streamMap[id] = stream
+       case schema.KindTrace:
+               trace := schemaMetadata.Spec.(*databasev1.Trace)
+               e.traceMap[id] = trace

Review Comment:
   Trace handling is duplicated in the switch statement. The code handles `case 
schema.KindTrace` twice - once at line 236-252 (with early return) and again at 
line 267-269. The second case at line 267-269 is unreachable due to the early 
return in the first case. Consider removing the unreachable code or 
consolidating the logic.
   ```suggestion
   
   ```



##########
banyand/stream/write_standalone.go:
##########
@@ -327,6 +347,41 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
        return
 }
 
+func buildSpecMaps(spec []*streamv1.TagFamilySpec) (map[string]int, 
map[string]map[string]int) {
+       if spec == nil {
+               return nil, nil
+       }
+       specFamilyMap := make(map[string]int)
+       specTagMaps := make(map[string]map[string]int)

Review Comment:
   Maps initialized without capacity hints when size is known. Initialize 
`specFamilyMap` with capacity `len(spec)` and `specTagMaps` with capacity 
`len(spec)` to avoid reallocation:
   ```go
   specFamilyMap := make(map[string]int, len(spec))
   specTagMaps := make(map[string]map[string]int, len(spec))
   ```
   ```suggestion
        specFamilyMap := make(map[string]int, len(spec))
        specTagMaps := make(map[string]map[string]int, len(spec))
   ```



##########
banyand/stream/write_standalone.go:
##########
@@ -327,6 +347,41 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
        return
 }
 
+func buildSpecMaps(spec []*streamv1.TagFamilySpec) (map[string]int, 
map[string]map[string]int) {
+       if spec == nil {
+               return nil, nil
+       }
+       specFamilyMap := make(map[string]int)
+       specTagMaps := make(map[string]map[string]int)
+       for i, specFamily := range spec {
+               specFamilyMap[specFamily.GetName()] = i
+               tagMap := make(map[string]int)

Review Comment:
   Map initialized without capacity hint when size is known. Change 
`make(map[string]int)` to `make(map[string]int, len(specFamily.GetTagNames()))` 
to avoid reallocation.
   ```suggestion
                tagMap := make(map[string]int, len(specFamily.GetTagNames()))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to