Copilot commented on code in PR #888:
URL:
https://github.com/apache/skywalking-banyandb/pull/888#discussion_r2603133841
##########
banyand/liaison/grpc/stream.go:
##########
@@ -74,33 +76,149 @@ func (s *streamService) activeQueryAccessLog(root string,
sampled bool) (err err
return nil
}
-func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest)
error {
- if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err !=
nil {
- s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the
element time is invalid")
- return err
+func (s *streamService) validateWriteRequest(writeEntity
*streamv1.WriteRequest,
+ metadata *commonv1.Metadata, stream streamv1.StreamService_WriteServer,
+) modelv1.Status {
+ if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp);
errTime != nil {
+ s.l.Error().Err(errTime).Stringer("written",
writeEntity).Msg("the element time is invalid")
+ s.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP,
writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_INVALID_TIMESTAMP
}
- return nil
-}
-func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest)
error {
- if writeEntity.Metadata.ModRevision > 0 {
- streamCache, existed :=
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+ if metadata.ModRevision > 0 {
+ streamCache, existed := s.entityRepo.getLocator(getID(metadata))
if !existed {
- return errors.New("stream schema not found")
+ s.l.Error().Stringer("written",
writeEntity).Msg("stream schema not found")
+ s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND,
writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_NOT_FOUND
}
- if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
- return errors.New("expired stream schema")
+ if metadata.ModRevision != streamCache.ModRevision {
+ s.l.Error().Stringer("written", writeEntity).Msg("the
stream schema is expired")
+ s.sendReply(metadata,
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_EXPIRED_SCHEMA
+ }
+ }
+
+ return modelv1.Status_STATUS_SUCCEED
+}
+
+func (s *streamService) navigate(metadata *commonv1.Metadata,
+ writeRequest *streamv1.WriteRequest, spec []*streamv1.TagFamilySpec,
+) (pbv1.EntityValues, common.ShardID, error) {
+ tagFamilies := writeRequest.GetElement().GetTagFamilies()
+ if spec == nil {
+ return s.navigateByLocator(metadata, tagFamilies)
+ }
+ return s.navigateByTagSpec(metadata, spec, tagFamilies)
+}
+
+func (s *streamService) navigateByTagSpec(
+ metadata *commonv1.Metadata, spec []*streamv1.TagFamilySpec,
tagFamilies []*modelv1.TagFamilyForWrite,
+) (pbv1.EntityValues, common.ShardID, error) {
+ shardNum, existed := s.groupRepo.shardNum(metadata.Group)
+ if !existed {
+ return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding the shard num by: %v", metadata)
+ }
+ id := getID(metadata)
+ stream, ok := s.entityRepo.getStream(id)
+ if !ok {
+ return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding stream schema by: %v", metadata)
+ }
+ specFamilyMap, specTagMaps := s.buildSpecMaps(spec)
+
+ entityValues := s.findTagValuesByNames(
+ metadata.Name,
+ stream.GetTagFamilies(),
+ tagFamilies,
+ stream.GetEntity().GetTagNames(),
+ specFamilyMap,
+ specTagMaps,
+ )
+ entity, err := entityValues.ToEntity()
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+
+ shardID, err := partition.ShardID(entity.Marshal(), shardNum)
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+ return entityValues, common.ShardID(shardID), nil
+}
+
+func (s *streamService) buildSpecMaps(spec []*streamv1.TagFamilySpec)
(map[string]int, map[string]map[string]int) {
+ specFamilyMap := make(map[string]int)
+ specTagMaps := make(map[string]map[string]int)
+ for i, specFamily := range spec {
+ specFamilyMap[specFamily.GetName()] = i
+ tagMap := make(map[string]int)
Review Comment:
Map initialized without capacity hint when size is known. Change
`make(map[string]int)` to `make(map[string]int, len(specFamily.GetTagNames()))`
to avoid reallocation.
```suggestion
tagMap := make(map[string]int, len(specFamily.GetTagNames()))
```
##########
banyand/liaison/grpc/stream.go:
##########
@@ -74,33 +76,149 @@ func (s *streamService) activeQueryAccessLog(root string,
sampled bool) (err err
return nil
}
-func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest)
error {
- if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err !=
nil {
- s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the
element time is invalid")
- return err
+func (s *streamService) validateWriteRequest(writeEntity
*streamv1.WriteRequest,
+ metadata *commonv1.Metadata, stream streamv1.StreamService_WriteServer,
+) modelv1.Status {
+ if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp);
errTime != nil {
+ s.l.Error().Err(errTime).Stringer("written",
writeEntity).Msg("the element time is invalid")
+ s.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP,
writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_INVALID_TIMESTAMP
}
- return nil
-}
-func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest)
error {
- if writeEntity.Metadata.ModRevision > 0 {
- streamCache, existed :=
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+ if metadata.ModRevision > 0 {
+ streamCache, existed := s.entityRepo.getLocator(getID(metadata))
if !existed {
- return errors.New("stream schema not found")
+ s.l.Error().Stringer("written",
writeEntity).Msg("stream schema not found")
+ s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND,
writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_NOT_FOUND
}
- if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
- return errors.New("expired stream schema")
+ if metadata.ModRevision != streamCache.ModRevision {
+ s.l.Error().Stringer("written", writeEntity).Msg("the
stream schema is expired")
+ s.sendReply(metadata,
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_EXPIRED_SCHEMA
+ }
+ }
+
+ return modelv1.Status_STATUS_SUCCEED
+}
+
+func (s *streamService) navigate(metadata *commonv1.Metadata,
+ writeRequest *streamv1.WriteRequest, spec []*streamv1.TagFamilySpec,
+) (pbv1.EntityValues, common.ShardID, error) {
+ tagFamilies := writeRequest.GetElement().GetTagFamilies()
+ if spec == nil {
+ return s.navigateByLocator(metadata, tagFamilies)
+ }
+ return s.navigateByTagSpec(metadata, spec, tagFamilies)
+}
+
+func (s *streamService) navigateByTagSpec(
+ metadata *commonv1.Metadata, spec []*streamv1.TagFamilySpec,
tagFamilies []*modelv1.TagFamilyForWrite,
+) (pbv1.EntityValues, common.ShardID, error) {
+ shardNum, existed := s.groupRepo.shardNum(metadata.Group)
+ if !existed {
+ return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding the shard num by: %v", metadata)
+ }
+ id := getID(metadata)
+ stream, ok := s.entityRepo.getStream(id)
+ if !ok {
+ return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding stream schema by: %v", metadata)
+ }
+ specFamilyMap, specTagMaps := s.buildSpecMaps(spec)
+
+ entityValues := s.findTagValuesByNames(
+ metadata.Name,
+ stream.GetTagFamilies(),
+ tagFamilies,
+ stream.GetEntity().GetTagNames(),
+ specFamilyMap,
+ specTagMaps,
+ )
+ entity, err := entityValues.ToEntity()
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+
+ shardID, err := partition.ShardID(entity.Marshal(), shardNum)
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+ return entityValues, common.ShardID(shardID), nil
+}
+
+func (s *streamService) buildSpecMaps(spec []*streamv1.TagFamilySpec)
(map[string]int, map[string]map[string]int) {
+ specFamilyMap := make(map[string]int)
+ specTagMaps := make(map[string]map[string]int)
Review Comment:
Maps initialized without capacity hints when size is known. Initialize
`specFamilyMap` with capacity `len(spec)` and `specTagMaps` with capacity
`len(spec)` to avoid reallocation:
```go
specFamilyMap := make(map[string]int, len(spec))
specTagMaps := make(map[string]map[string]int, len(spec))
```
```suggestion
specFamilyMap := make(map[string]int, len(spec))
specTagMaps := make(map[string]map[string]int, len(spec))
```
##########
banyand/liaison/grpc/discovery.go:
##########
@@ -255,11 +257,17 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata
schema.Metadata) {
e.RWMutex.Lock()
defer e.RWMutex.Unlock()
e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators,
ModRevision: modRevision}
- if schemaMetadata.Kind == schema.KindMeasure {
+ switch schemaMetadata.Kind {
+ case schema.KindMeasure:
measure := schemaMetadata.Spec.(*databasev1.Measure)
e.measureMap[id] = measure
- } else {
- delete(e.measureMap, id) // Ensure measure is not stored for
streams
+ case schema.KindStream:
+ stream := schemaMetadata.Spec.(*databasev1.Stream)
+ e.streamMap[id] = stream
+ case schema.KindTrace:
+ trace := schemaMetadata.Spec.(*databasev1.Trace)
+ e.traceMap[id] = trace
Review Comment:
Trace handling is duplicated in the switch statement. The code handles `case
schema.KindTrace` twice - once at line 236-252 (with early return) and again at
line 267-269. The second case at line 267-269 is unreachable due to the early
return in the first case. Consider removing the unreachable code or
consolidating the logic.
```suggestion
```
##########
banyand/stream/write_standalone.go:
##########
@@ -327,6 +347,41 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
return
}
+func buildSpecMaps(spec []*streamv1.TagFamilySpec) (map[string]int,
map[string]map[string]int) {
+ if spec == nil {
+ return nil, nil
+ }
+ specFamilyMap := make(map[string]int)
+ specTagMaps := make(map[string]map[string]int)
Review Comment:
Maps initialized without capacity hints when size is known. Initialize
`specFamilyMap` with capacity `len(spec)` and `specTagMaps` with capacity
`len(spec)` to avoid reallocation:
```go
specFamilyMap := make(map[string]int, len(spec))
specTagMaps := make(map[string]map[string]int, len(spec))
```
```suggestion
specFamilyMap := make(map[string]int, len(spec))
specTagMaps := make(map[string]map[string]int, len(spec))
```
##########
banyand/stream/write_standalone.go:
##########
@@ -327,6 +347,41 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
return
}
+func buildSpecMaps(spec []*streamv1.TagFamilySpec) (map[string]int,
map[string]map[string]int) {
+ if spec == nil {
+ return nil, nil
+ }
+ specFamilyMap := make(map[string]int)
+ specTagMaps := make(map[string]map[string]int)
+ for i, specFamily := range spec {
+ specFamilyMap[specFamily.GetName()] = i
+ tagMap := make(map[string]int)
Review Comment:
Map initialized without capacity hint when size is known. Change
`make(map[string]int)` to `make(map[string]int, len(specFamily.GetTagNames()))`
to avoid reallocation.
```suggestion
tagMap := make(map[string]int, len(specFamily.GetTagNames()))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]