Copilot commented on code in PR #873:
URL:
https://github.com/apache/skywalking-banyandb/pull/873#discussion_r2617666218
##########
banyand/measure/topn.go:
##########
@@ -604,19 +607,21 @@ func (manager *topNProcessorManager)
buildMapper(fieldName string, groupByNames
type groupTagsLocator []partition.TagLocator
// newGroupLocator generates a groupTagsLocator which strictly preserve the
order of groupByNames.
-func newGroupLocator(m *databasev1.Measure, groupByNames []string)
(groupTagsLocator, error) {
+func newGroupLocator(m *databasev1.Measure, groupByNames []string)
(groupTagsLocator, []string) {
groupTags := make([]partition.TagLocator, 0, len(groupByNames))
+ var removedTags []string
for _, groupByName := range groupByNames {
fIdx, tIdx, spec := pbv1.FindTagByName(m.GetTagFamilies(),
groupByName)
if spec == nil {
- return nil, fmt.Errorf("tag %s is not found",
groupByName)
+ removedTags = append(removedTags, groupByName)
+ continue
}
groupTags = append(groupTags, partition.TagLocator{
FamilyOffset: fIdx,
TagOffset: tIdx,
})
}
- return groupTags, nil
+ return groupTags, removedTags
Review Comment:
The function `newGroupLocator` has inconsistent error handling compared to
similar functions in the codebase. While it now returns removed tags as a
string slice instead of an error, the function signature change does not have
proper documentation explaining this behavior. The caller in `buildMapper` logs
warnings for removed tags but continues processing, which could lead to
incorrect TopN aggregation results if critical groupBy tags are missing.
Consider returning an error if any groupBy tag is not found, as groupBy tags
are essential for the aggregation to work correctly.
##########
banyand/trace/query.go:
##########
@@ -134,6 +147,41 @@ func validateTraceQueryOptions(tqo
model.TraceQueryOptions) error {
return nil
}
+func (t *trace) filterTagProjection(tagProjection *model.TagProjection)
*model.TagProjection {
+ if tagProjection == nil || len(tagProjection.Names) == 0 {
+ return tagProjection
+ }
+
+ is := t.indexSchema.Load()
+ if is == nil {
+ return tagProjection
+ }
+ tagMap := is.(indexSchema).tagMap
+ if len(tagMap) == 0 {
+ return tagProjection
+ }
+
+ filteredNames := make([]string, 0, len(tagProjection.Names))
+ for _, name := range tagProjection.Names {
+ if _, exists := tagMap[name]; exists {
+ filteredNames = append(filteredNames, name)
+ }
+ }
+
+ if len(filteredNames) == len(tagProjection.Names) {
+ return tagProjection
+ }
+
+ if len(filteredNames) == 0 {
+ return nil
+ }
+
+ return &model.TagProjection{
+ Family: tagProjection.Family,
+ Names: filteredNames,
+ }
+}
Review Comment:
The function `filterTagProjection` has a potential concurrency issue. The
`indexSchema` is loaded atomically, but once loaded, the `tagMap` field is
accessed directly without synchronization. If the schema is updated while this
function is executing, there could be a race condition. While atomic.Value
protects against reading partially-written pointers, the map itself could be
modified concurrently. Consider documenting that the indexSchema should be
immutable once stored, or implement copy-on-write semantics for schema updates.
##########
banyand/metadata/schema/stream.go:
##########
@@ -90,26 +90,45 @@ func (e *etcdSchemaRegistry) UpdateStream(ctx
context.Context, stream *databasev
})
}
-func validateEqualExceptAppendTags(prevStream, newStream *databasev1.Stream)
error {
+func validateStreamUpdate(prevStream, newStream *databasev1.Stream) error {
if prevStream.GetEntity().String() != newStream.GetEntity().String() {
return fmt.Errorf("entity is different: %s != %s",
prevStream.GetEntity().String(), newStream.GetEntity().String())
}
- if len(prevStream.GetTagFamilies()) > len(newStream.GetTagFamilies()) {
- return fmt.Errorf("number of tag families is less in the new
stream")
- }
- for i, tagFamily := range prevStream.GetTagFamilies() {
- if tagFamily.Name != newStream.GetTagFamilies()[i].Name {
- return fmt.Errorf("tag family name is different: %s !=
%s", tagFamily.Name, newStream.GetTagFamilies()[i].Name)
+
+ entityTagSet := make(map[string]struct{})
+ for _, tagName := range newStream.GetEntity().GetTagNames() {
+ entityTagSet[tagName] = struct{}{}
+ }
+ newTagFamilyMap := make(map[string]map[string]*databasev1.TagSpec)
+ for _, tf := range newStream.GetTagFamilies() {
+ tagMap := make(map[string]*databasev1.TagSpec)
+ for _, tag := range tf.GetTags() {
+ tagMap[tag.GetName()] = tag
}
- if len(tagFamily.Tags) >
len(newStream.GetTagFamilies()[i].Tags) {
- return fmt.Errorf("number of tags in tag family %s is
less in the new stream", tagFamily.Name)
+ newTagFamilyMap[tf.GetName()] = tagMap
+ }
+
+ for _, prevTagFamily := range prevStream.GetTagFamilies() {
+ newTagMap, familyExists :=
newTagFamilyMap[prevTagFamily.GetName()]
+ if !familyExists {
+ for _, tag := range prevTagFamily.GetTags() {
+ if _, isEntity := entityTagSet[tag.GetName()];
isEntity {
+ return fmt.Errorf("cannot delete tag
family %s: it contains entity tag %s", prevTagFamily.GetName(), tag.GetName())
+ }
+ }
+ continue
}
Review Comment:
The validation logic for stream updates allows deleting entire tag families
if they don't contain entity tags. However, there is no validation to ensure
that after deletion, the stream still has at least one tag family or that
required tag families (like "data") are preserved. This could potentially
result in streams with no queryable data after schema updates.
##########
banyand/metadata/schema/measure.go:
##########
@@ -139,30 +139,56 @@ func validateEqualExceptAppendTagsAndFields(prevMeasure,
newMeasure *databasev1.
if prevMeasure.GetIndexMode() != newMeasure.GetIndexMode() {
return fmt.Errorf("index mode is different: %v != %v",
prevMeasure.GetIndexMode(), newMeasure.GetIndexMode())
}
- if len(prevMeasure.GetTagFamilies()) > len(newMeasure.GetTagFamilies())
{
- return fmt.Errorf("number of tag families is less in the new
measure")
- }
- if len(prevMeasure.GetFields()) > len(newMeasure.GetFields()) {
- return fmt.Errorf("number of fields is less in the new measure")
- }
- for i, tagFamily := range prevMeasure.GetTagFamilies() {
- if tagFamily.Name != newMeasure.GetTagFamilies()[i].Name {
- return fmt.Errorf("tag family name is different: %s !=
%s", tagFamily.Name, newMeasure.GetTagFamilies()[i].Name)
+
+ entityTagSet := make(map[string]struct{})
+ for _, tagName := range newMeasure.GetEntity().GetTagNames() {
+ entityTagSet[tagName] = struct{}{}
+ }
+ newTagFamilyMap := make(map[string]map[string]*databasev1.TagSpec)
+ for _, tf := range newMeasure.GetTagFamilies() {
+ tagMap := make(map[string]*databasev1.TagSpec)
+ for _, tag := range tf.GetTags() {
+ tagMap[tag.GetName()] = tag
}
- if len(tagFamily.Tags) >
len(newMeasure.GetTagFamilies()[i].Tags) {
- return fmt.Errorf("number of tags in tag family %s is
less in the new measure", tagFamily.Name)
+ newTagFamilyMap[tf.GetName()] = tagMap
+ }
+
+ for _, prevTagFamily := range prevMeasure.GetTagFamilies() {
+ newTagMap, familyExists :=
newTagFamilyMap[prevTagFamily.GetName()]
+ if !familyExists {
+ for _, tag := range prevTagFamily.GetTags() {
+ if _, isEntity := entityTagSet[tag.GetName()];
isEntity {
+ return fmt.Errorf("cannot delete tag
family %s: it contains entity tag %s", prevTagFamily.GetName(), tag.GetName())
+ }
+ }
+ continue
}
Review Comment:
The validation logic for measure updates allows deleting entire tag families
if they don't contain entity tags. Similar to streams, there is no validation
to ensure that after deletion, the measure still has at least one tag family.
This could potentially result in measures with incomplete schema after updates.
##########
banyand/stream/block.go:
##########
@@ -526,8 +534,13 @@ func (bc *blockCursor) copyTo(r *model.StreamResult) {
logger.Panicf("unexpected number of tags: got %d; want
%d", len(r.TagFamilies[i].Tags), len(bc.tagProjection[i].Names))
}
for i2, c := range cf.tags {
+ schemaType, hasSchemaType := bc.schemaTagTypes[c.name]
if len(c.values) > bc.idx {
- r.TagFamilies[i].Tags[i2].Values =
append(r.TagFamilies[i].Tags[i2].Values, mustDecodeTagValue(c.valueType,
c.values[bc.idx]))
+ if !hasSchemaType || c.valueType == schemaType {
+ r.TagFamilies[i].Tags[i2].Values =
append(r.TagFamilies[i].Tags[i2].Values, mustDecodeTagValue(c.valueType,
c.values[bc.idx]))
+ } else {
+ r.TagFamilies[i].Tags[i2].Values =
append(r.TagFamilies[i].Tags[i2].Values, pbv1.NullTagValue)
+ }
} else {
r.TagFamilies[i].Tags[i2].Values =
append(r.TagFamilies[i].Tags[i2].Values, pbv1.NullTagValue)
}
Review Comment:
The loop variable name `i2` is unclear and doesn't follow Go naming
conventions. Consider renaming it to `tagIdx` or `j` for better readability, as
it represents the index of a tag within a tag family.
##########
pkg/pb/v1/value.go:
##########
@@ -89,6 +89,26 @@ func MustTagValueSpecToValueType(tag databasev1.TagType)
ValueType {
}
}
+// TagValueSpecToValueType converts databasev1.TagType to ValueType.
+func TagValueSpecToValueType(tag databasev1.TagType) ValueType {
+ switch tag {
+ case databasev1.TagType_TAG_TYPE_STRING:
+ return ValueTypeStr
+ case databasev1.TagType_TAG_TYPE_INT:
+ return ValueTypeInt64
+ case databasev1.TagType_TAG_TYPE_DATA_BINARY:
+ return ValueTypeBinaryData
+ case databasev1.TagType_TAG_TYPE_STRING_ARRAY:
+ return ValueTypeStrArr
+ case databasev1.TagType_TAG_TYPE_INT_ARRAY:
+ return ValueTypeInt64Arr
+ case databasev1.TagType_TAG_TYPE_TIMESTAMP:
+ return ValueTypeTimestamp
+ default:
+ return ValueTypeUnknown
+ }
+}
Review Comment:
The function `TagValueSpecToValueType` is duplicated functionality with
`MustTagValueSpecToValueType`. The only difference is that this function
returns `ValueTypeUnknown` on unknown types while `MustTagValueSpecToValueType`
panics. Consider adding a comment explaining when to use this non-panicking
version versus the panicking version, or consolidate them into a single
function with a boolean return value indicating success.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]