This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch index-int in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 46a6c82b9713142d68483145b8a3d366f4fc249d Merge: bb43e206 06ae0cf3 Author: Gao Hongtao <[email protected]> AuthorDate: Tue Dec 3 20:02:27 2024 +0800 Merge remote-tracking branch 'origin/main' into index-int CHANGES.md | 3 + banyand/internal/storage/index.go | 64 ++++---- banyand/internal/storage/index_test.go | 3 +- banyand/internal/storage/storage.go | 10 +- banyand/measure/block.go | 29 +++- banyand/measure/block_test.go | 2 +- banyand/measure/column.go | 6 + banyand/measure/measure_suite_test.go | 5 + banyand/measure/metadata_test.go | 241 ++++++++++++++++++++++++++++- banyand/measure/query.go | 67 ++++---- banyand/measure/write.go | 3 +- banyand/metadata/schema/error.go | 2 + banyand/metadata/schema/measure.go | 14 +- banyand/metadata/schema/stream.go | 18 ++- banyand/stream/block.go | 53 ++++--- banyand/stream/block_test.go | 2 +- banyand/stream/metadata_test.go | 205 +++++++++++++++++++++++- banyand/stream/stream_suite_test.go | 6 + banyand/stream/tag.go | 6 + banyand/stream/write.go | 26 +++- bydbctl/internal/cmd/measure_test.go | 6 +- bydbctl/internal/cmd/stream_test.go | 4 +- dist/LICENSE | 2 +- docs/interacting/bydbctl/schema/measure.md | 2 + docs/interacting/bydbctl/schema/stream.md | 2 + go.mod | 2 +- go.sum | 4 +- pkg/index/index.go | 48 +++--- pkg/index/inverted/inverted.go | 2 + pkg/index/inverted/inverted_series.go | 52 +++++-- pkg/index/inverted/inverted_series_test.go | 22 +-- pkg/query/logical/expr_literal.go | 12 +- pkg/query/logical/stream/index_filter.go | 3 +- 33 files changed, 745 insertions(+), 181 deletions(-) diff --cc CHANGES.md index 85b36cba,f029a6ee..13753a46 --- a/CHANGES.md +++ b/CHANGES.md @@@ -10,6 -10,6 +10,7 @@@ Release Notes - Index: Remove sortable field from the stored field. If a field is sortable only, it won't be stored. - Index: Support InsertIfAbsent functionality which ensures documents are only inserted if their docIDs are not already present in the current index. There is a exception for the documents with extra index fields more than the entity's index fields. - Measure: Introduce "index_mode" to save data exclusively in the series index, ideal for non-timeseries measures. ++- Index: Use numeric index type to support Int and Float ### Bug Fixes diff --cc banyand/measure/metadata_test.go index d07b091d,494ac5c2..329d72cf --- a/banyand/measure/metadata_test.go +++ b/banyand/measure/metadata_test.go @@@ -103,8 -112,24 +112,9 @@@ var _ = Describe("Metadata", func() }).WithTimeout(flags.EventuallyTimeout).Should(MatchError(measure.ErrMeasureNotExist)) }) -- Context("Update a measure", func() { - var measureSchema *databasev1.Measure - - BeforeEach(func() { - var err error - measureSchema, err = svcs.metadataService.MeasureRegistry().GetMeasure(context.TODO(), &commonv1.Metadata{ - Name: "service_cpm_minute", - Group: "sw_metric", - }) - - Expect(err).ShouldNot(HaveOccurred()) - Expect(measureSchema).ShouldNot(BeNil()) - }) - }) - + Context("Add tags and fields to the measure", func() { var measureSchema *databasev1.Measure + size := 3 BeforeEach(func() { var err error @@@ -135,9 -166,130 +151,130 @@@ return false } - return newEntityTag == val.GetSchema().Entity.TagNames[0] + return len(val.GetSchema().TagFamilies[0].Tags) == 3 && len(val.GetSchema().Fields) == 3 }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue()) }) + + When("a tag in the data file", func() { + It("returns nil for the new added tags", func() { + dp := queryAllMeasurements(svcs, size, []string{"new_tag"}, nil) + for i := range dp { + Expect(dp[i].TagFamilies[0].Tags[2].Key).Should(Equal("new_tag")) + Expect(dp[i].TagFamilies[0].Tags[2].Value.GetValue()).Should(BeAssignableToTypeOf(&modelv1.TagValue_Null{})) + } + }) + It("get new values for the new added tags", func() { + writeData(svcs, size, []string{"test1", "test2", "test3"}, nil) + dp := queryAllMeasurements(svcs, size*2, []string{"new_tag"}, nil) + for i := 0; i < size; i++ { + Expect(dp[i].TagFamilies[0].Tags[2].Key).Should(Equal("new_tag")) + Expect(dp[i].TagFamilies[0].Tags[2].Value.GetValue()).Should(BeAssignableToTypeOf(&modelv1.TagValue_Null{})) + } + for i := size; i < size*2; i++ { + Expect(dp[i].TagFamilies[0].Tags[2].Key).Should(Equal("new_tag")) + Expect(dp[i].TagFamilies[0].Tags[2].Value.GetStr().Value).Should(Equal("test" + strconv.Itoa(i%3+1))) + } + }) + }) + When("a field in the data file", func() { + It("returns nil for the new added fields", func() { + dp := queryAllMeasurements(svcs, size, nil, []string{"new_field"}) + for i := range dp { + Expect(dp[i].Fields[2].Name).Should(Equal("new_field")) + Expect(dp[i].Fields[2].Value.GetValue()).Should(BeAssignableToTypeOf(&modelv1.FieldValue_Null{})) + } + }) + It("get new values for the new added fields", func() { + writeData(svcs, size, nil, []int{1, 2, 3}) + dp := queryAllMeasurements(svcs, size*2, nil, []string{"new_field"}) + for i := 0; i < size; i++ { + Expect(dp[i].Fields[2].Name).Should(Equal("new_field")) + Expect(dp[i].Fields[2].Value.GetValue()).Should(BeAssignableToTypeOf(&modelv1.FieldValue_Null{})) + } + for i := size; i < size*2; i++ { + Expect(dp[i].Fields[2].Name).Should(Equal("new_field")) + Expect(dp[i].Fields[2].Value.GetInt().Value).Should(Equal(int64(i%3 + 1))) + } + }) + }) + When("a tag in the index file", func() { + BeforeEach(func() { + indexRule := &databasev1.IndexRule{ + Metadata: &commonv1.Metadata{ + Name: "new_tag", + Group: "sw_metric", + }, + Tags: []string{"new_tag"}, + Type: databasev1.IndexRule_TYPE_INVERTED, + } + err := svcs.metadataService.IndexRuleRegistry().CreateIndexRule(context.TODO(), indexRule) + Expect(err).ShouldNot(HaveOccurred()) + indexRuleBinding := &databasev1.IndexRuleBinding{ + Metadata: &commonv1.Metadata{ + Name: "service_cpm_minute_new_tag", + Group: "sw_metric", + }, + Subject: &databasev1.Subject{ + Name: "service_cpm_minute", + Catalog: commonv1.Catalog_CATALOG_MEASURE, + }, + Rules: []string{"new_tag"}, + BeginAt: timestamppb.New(timestamp.NowMilli().Add(-time.Hour)), + ExpireAt: timestamppb.New(timestamp.NowMilli().Add(time.Hour)), + } + err = svcs.metadataService.IndexRuleBindingRegistry().CreateIndexRuleBinding(context.TODO(), indexRuleBinding) + Expect(err).ShouldNot(HaveOccurred()) + + Eventually(func() bool { + val, err := svcs.measure.Measure(&commonv1.Metadata{ + Name: "service_cpm_minute", + Group: "sw_metric", + }) + if err != nil { + return false + } + rr := val.GetIndexRules() + for i := range rr { + if rr[i].Metadata.Name == "new_tag" { + return true + } + } + return false + }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue()) + }) + + It("returns nil for the new added tags", func() { + dp := queryAllMeasurements(svcs, size, []string{"new_tag"}, nil) + for i := range dp { + Expect(dp[i].TagFamilies[0].Tags[2].Key).Should(Equal("new_tag")) + Expect(dp[i].TagFamilies[0].Tags[2].Value.GetValue()).Should(BeAssignableToTypeOf(&modelv1.TagValue_Null{})) + } + }) + + It("get new values for the new added tags", func() { - writeData(svcs, size, []string{"test1", "test2", "test3"}, nil) + Eventually(func() bool { ++ writeData(svcs, size, []string{"test1", "test2", "test3"}, nil) + dp := queryAllMeasurements(svcs, size*2, []string{"new_tag"}, nil) + for i := 0; i < size; i++ { + if dp[i].TagFamilies[0].Tags[2].Value.GetStr() == nil { + GinkgoWriter.Printf("actual: %s", dp[i].TagFamilies[0].Tags[2]) + return false + } + Expect(dp[i].TagFamilies[0].Tags[2].Key).Should(Equal("new_tag")) + Expect(dp[i].TagFamilies[0].Tags[2].Value.GetStr().Value).Should(Equal("test" + strconv.Itoa(i+1))) + } + for i := size; i < size*2; i++ { + Expect(dp[i].TagFamilies[0].Tags[2].Key).Should(Equal("new_tag")) + if dp[i].TagFamilies[0].Tags[2].Value.GetStr() == nil { + GinkgoWriter.Printf("actual: %s", dp[i].TagFamilies[0].Tags[2]) + return false + } + Expect(dp[i].TagFamilies[0].Tags[2].Value.GetStr().Value).Should(Equal("test" + strconv.Itoa(i%3+1))) + } + return true + }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue()) + }) + }) }) }) }) diff --cc banyand/stream/write.go index 66ff64d8,12ee5af4..9fa2c431 --- a/banyand/stream/write.go +++ b/banyand/stream/write.go @@@ -165,12 -165,34 +165,12 @@@ func (w *writeCallback) handle(dst map[ } t := tagFamilySpec.Tags[j] - encodeTagValue := encodeTagValue( - t.Name, - t.Type, - tagValue) -- if r, ok := tfr[t.Name]; ok { - if encodeTagValue.value != nil { - fields = append(fields, index.Field{ - Key: index.FieldKey{ - IndexRuleID: r.GetMetadata().GetId(), - Analyzer: r.Analyzer, - SeriesID: series.ID, - }, - Term: encodeTagValue.value, - NoSort: r.GetNoSort(), - }) - } else { - for _, val := range encodeTagValue.valueArr { - fields = append(fields, index.Field{ - Key: index.FieldKey{ - IndexRuleID: r.GetMetadata().GetId(), - Analyzer: r.Analyzer, - SeriesID: series.ID, - }, - Term: val, - NoSort: r.GetNoSort(), - }) - } - } ++ if r, ok := tfr[t.Name]; ok && tagValue != pbv1.NullTagValue { + fields = appendField(fields, index.FieldKey{ + IndexRuleID: r.GetMetadata().GetId(), + Analyzer: r.Analyzer, + SeriesID: series.ID, + }, t.Type, tagValue, r.GetNoSort()) } _, isEntity := stm.indexRuleLocators.EntitySet[tagFamilySpec.Tags[j].Name] if tagFamilySpec.Tags[j].IndexedOnly || isEntity { @@@ -300,35 -319,3 +300,53 @@@ func encodeTagValue(name string, tagTyp } return tv } + +func appendField(dest []index.Field, fieldKey index.FieldKey, tagType databasev1.TagType, tagVal *modelv1.TagValue, noSort bool) []index.Field { + switch tagType { + case databasev1.TagType_TAG_TYPE_INT: - f := index.NewIntField(fieldKey, tagVal.GetInt().Value) ++ v := tagVal.GetInt() ++ if v == nil { ++ return dest ++ } ++ f := index.NewIntField(fieldKey, v.Value) + f.NoSort = noSort + dest = append(dest, f) + case databasev1.TagType_TAG_TYPE_STRING: - f := index.NewStringField(fieldKey, tagVal.GetStr().Value) ++ v := tagVal.GetStr() ++ if v == nil { ++ return dest ++ } ++ f := index.NewStringField(fieldKey, v.Value) + f.NoSort = noSort + dest = append(dest, f) + case databasev1.TagType_TAG_TYPE_DATA_BINARY: - f := index.NewBytesField(fieldKey, tagVal.GetBinaryData()) ++ v := tagVal.GetBinaryData() ++ if v == nil { ++ return dest ++ } ++ f := index.NewBytesField(fieldKey, v) + f.NoSort = noSort + dest = append(dest, f) + case databasev1.TagType_TAG_TYPE_INT_ARRAY: ++ if tagVal.GetIntArray() == nil { ++ return dest ++ } + for i := range tagVal.GetIntArray().Value { + f := index.NewIntField(fieldKey, tagVal.GetIntArray().Value[i]) + f.NoSort = noSort + dest = append(dest, f) + } + case databasev1.TagType_TAG_TYPE_STRING_ARRAY: ++ if tagVal.GetStrArray() == nil { ++ return dest ++ } + for i := range tagVal.GetStrArray().Value { + f := index.NewStringField(fieldKey, tagVal.GetStrArray().Value[i]) + f.NoSort = noSort + dest = append(dest, f) + } + default: + logger.Panicf("unsupported tag value type: %T", tagVal.GetValue()) + } + return dest +} diff --cc dist/LICENSE index f90ac0ca,b48af084..8d373f94 --- a/dist/LICENSE +++ b/dist/LICENSE @@@ -178,7 -178,7 +178,7 @@@ Apache-2.0 licenses ======================================================================== - github.com/SkyAPM/bluge v0.0.0-20241111124917-c317df1af201 Apache-2.0 - github.com/SkyAPM/bluge v0.0.0-20241202124911-7db485ccafc7 Apache-2.0 ++ github.com/SkyAPM/bluge v0.0.0-20241204041252-279547fc72c6 Apache-2.0 github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 Apache-2.0 github.com/apache/skywalking-cli v0.0.0-20240227151024-ee371a210afe Apache-2.0 github.com/blevesearch/segment v0.9.1 Apache-2.0 diff --cc go.mod index 57454f71,716d004b..09b2729c --- a/go.mod +++ b/go.mod @@@ -157,7 -157,7 +157,7 @@@ require replace ( github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 - github.com/blugelabs/bluge => github.com/SkyAPM/bluge v0.0.0-20241111124917-c317df1af201 - github.com/blugelabs/bluge => github.com/SkyAPM/bluge v0.0.0-20241202124911-7db485ccafc7 ++ github.com/blugelabs/bluge => github.com/SkyAPM/bluge v0.0.0-20241204041252-279547fc72c6 github.com/blugelabs/bluge_segment_api => github.com/zinclabs/bluge_segment_api v1.0.0 github.com/blugelabs/ice => github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 ) diff --cc go.sum index 5c140a3b,8e2b6904..18e3dffd --- a/go.sum +++ b/go.sum @@@ -6,8 -6,8 +6,8 @@@ github.com/OneOfOne/xxhash v1.2.2/go.mo github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ= github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= - github.com/SkyAPM/bluge v0.0.0-20241111124917-c317df1af201 h1:QX/WvtL8j5Zrbs68EVEiOE2nFQSvoT5oTkOFh2uNSpg= - github.com/SkyAPM/bluge v0.0.0-20241111124917-c317df1af201/go.mod h1:6o9wC3xO3qb5Q7VmD1x0r54qQBDpO9+ghGAQvuOHsCU= -github.com/SkyAPM/bluge v0.0.0-20241202124911-7db485ccafc7 h1:2I3tM57k+g1j3YbJT+WH3u741iHFcW6rXFUVjbLtrrw= -github.com/SkyAPM/bluge v0.0.0-20241202124911-7db485ccafc7/go.mod h1:6o9wC3xO3qb5Q7VmD1x0r54qQBDpO9+ghGAQvuOHsCU= ++github.com/SkyAPM/bluge v0.0.0-20241204041252-279547fc72c6 h1:Zwv1x7CUnX1tBqfTmBzM2tAOROMi4ZHJtdbkuBEpHfo= ++github.com/SkyAPM/bluge v0.0.0-20241204041252-279547fc72c6/go.mod h1:6o9wC3xO3qb5Q7VmD1x0r54qQBDpO9+ghGAQvuOHsCU= github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s= github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44= github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 h1:Ja62sgOCp2qPTd8Xmldv1U83v11IRIsh6KlB7UaFLj4= diff --cc pkg/index/index.go index 9cbe4608,c13c2877..855e4828 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@@ -23,9 -23,7 +23,11 @@@ import "context" "fmt" "io" + "math" + "strconv" + ++ "github.com/blugelabs/bluge/numeric" + "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" @@@ -70,182 -62,50 +71,178 @@@ func (f FieldKey) Marshal() string return string(convert.Uint32ToBytes(f.IndexRuleID)) } ++// NewStringField creates a new string field. +func NewStringField(key FieldKey, value string) Field { + return Field{ + term: &BytesTermValue{Value: convert.StringToBytes(value)}, + Key: key, + } +} + ++// NewIntField creates a new int field. +func NewIntField(key FieldKey, value int64) Field { + return Field{ + term: &FloatTermValue{Value: numeric.Int64ToFloat64(value)}, + Key: key, + } +} + ++// NewBytesField creates a new bytes field. +func NewBytesField(key FieldKey, value []byte) Field { + return Field{ + term: &BytesTermValue{Value: bytes.Clone(value)}, + Key: key, + } +} + - func NewFloatField(key FieldKey, value float64) Field { - return Field{ - term: &FloatTermValue{Value: value}, - Key: key, - } - } - - func IntToStr(i int64) string { - return strconv.FormatFloat(numeric.Int64ToFloat64(i), 'f', -1, 64) - } - // Field is a indexed item in a document. type Field struct { - Term []byte + term IsTermValue Key FieldKey NoSort bool Store bool Index bool } ++// GetTerm returns the term value of the field. +func (f *Field) GetTerm() IsTermValue { + return f.term +} + ++// GetBytes returns the byte value of the field. +func (f *Field) GetBytes() []byte { + if bv, ok := f.GetTerm().(*BytesTermValue); ok { + return bv.Value + } + panic("field is not bytes") +} + ++// GetFloat returns the float value of the field. +func (f *Field) GetFloat() float64 { + if fv, ok := f.GetTerm().(*FloatTermValue); ok { + return fv.Value + } + panic("field is not float") +} + ++// String returns a string representation of the field. +func (f *Field) String() string { + return fmt.Sprintf("{\"key\": \"%s\", \"term\": %s}", f.Key.Marshal(), f.term) +} + ++// MarshalJSON encodes f to JSON. +func (f *Field) MarshalJSON() ([]byte, error) { + return []byte(f.String()), nil +} + ++// IsTermValue is the interface for term value. +type IsTermValue interface { + isTermValue() + String() string +} + ++// BytesTermValue represents a byte term value. +type BytesTermValue struct { + Value []byte +} + +func (BytesTermValue) isTermValue() {} + +func (b BytesTermValue) String() string { + return convert.BytesToString(b.Value) +} + ++// FloatTermValue represents a float term value. +type FloatTermValue struct { + Value float64 +} + +func (FloatTermValue) isTermValue() {} + +func (f FloatTermValue) String() string { + return strconv.FormatInt(numeric.Float64ToInt64(f.Value), 10) +} + // RangeOpts contains options to performance a continuous scan. type RangeOpts struct { - Upper []byte - Lower []byte + Upper IsTermValue + Lower IsTermValue IncludesUpper bool IncludesLower bool } -// Between reports whether value is in the range. -func (r RangeOpts) Between(value []byte) int { - if r.Upper != nil { - var in bool - if r.IncludesUpper { - in = bytes.Compare(r.Upper, value) >= 0 - } else { - in = bytes.Compare(r.Upper, value) > 0 - } - if !in { - return 1 - } ++// IsEmpty returns true if the range is empty. +func (r RangeOpts) IsEmpty() bool { + return r.Upper == nil && r.Lower == nil +} + ++// Valid returns true if the range is valid. +func (r RangeOpts) Valid() bool { + if r.Upper == nil || r.Lower == nil { + return false } - switch r.Upper.(type) { - if r.Lower != nil { - var in bool - if r.IncludesLower { - in = bytes.Compare(r.Lower, value) <= 0 - } else { - in = bytes.Compare(r.Lower, value) < 0 ++ switch upper := r.Upper.(type) { + case *BytesTermValue: - if bytes.Compare(r.Lower.(*BytesTermValue).Value, r.Upper.(*BytesTermValue).Value) > 0 { ++ if bytes.Compare(r.Lower.(*BytesTermValue).Value, upper.Value) > 0 { + return false } - if !in { - return -1 + case *FloatTermValue: - if r.Lower.(*FloatTermValue).Value > r.Upper.(*FloatTermValue).Value { ++ if r.Lower.(*FloatTermValue).Value > upper.Value { + return false } + default: + return false + } + return true +} + ++// NewStringRangeOpts creates a new string range option. +func NewStringRangeOpts(lower, upper string, includesLower, includesUpper bool) RangeOpts { + var upperBytes, lowerBytes []byte + if len(upper) == 0 { + upperBytes = defaultUpper + } else { + upperBytes = convert.StringToBytes(upper) + } + if len(lower) == 0 { + lowerBytes = defaultLower + } else { + lowerBytes = convert.StringToBytes(lower) + } + return RangeOpts{ + Lower: &BytesTermValue{Value: upperBytes}, + Upper: &BytesTermValue{Value: lowerBytes}, + IncludesLower: includesLower, + IncludesUpper: includesUpper, + } +} + ++// NewIntRangeOpts creates a new int range option. +func NewIntRangeOpts(lower, upper int64, includesLower, includesUpper bool) RangeOpts { + return RangeOpts{ + Lower: &FloatTermValue{Value: numeric.Int64ToFloat64(lower)}, + Upper: &FloatTermValue{Value: numeric.Int64ToFloat64(upper)}, + IncludesLower: includesLower, + IncludesUpper: includesUpper, + } +} + ++// NewBytesRangeOpts creates a new bytes range option. +func NewBytesRangeOpts(lower, upper []byte, includesLower, includesUpper bool) RangeOpts { + if len(upper) == 0 { + upper = defaultUpper + } + if len(lower) == 0 { + lower = defaultLower + } + return RangeOpts{ + Lower: &BytesTermValue{Value: bytes.Clone(lower)}, + Upper: &BytesTermValue{Value: bytes.Clone(upper)}, + IncludesLower: includesLower, + IncludesUpper: includesUpper, } - return 0 } - func NewFloatRangeOpts(lower, upper float64, includesLower, includesUpper bool) RangeOpts { - return RangeOpts{ - Lower: &FloatTermValue{Value: lower}, - Upper: &FloatTermValue{Value: upper}, - IncludesLower: includesLower, - IncludesUpper: includesUpper, - } - } - // DocumentResult represents a document in an index. type DocumentResult struct { EntityValues []byte diff --cc pkg/index/inverted/inverted_series.go index 32b6d3df,a13c10b2..34b77925 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@@ -72,23 -72,33 +72,33 @@@ func (s *store) UpdateSeriesBatch(batc return s.writer.Batch(b) } - func toDoc(d index.Document) *bluge.Document { + func toDoc(d index.Document, toParseFieldNames bool) (*bluge.Document, []string) { doc := bluge.NewDocument(convert.BytesToString(d.EntityValues)) + var fieldNames []string + if toParseFieldNames && len(d.Fields) > 0 { + fieldNames = make([]string, 0, len(d.Fields)) + } for _, f := range d.Fields { - tf := bluge.NewKeywordFieldBytes(f.Key.Marshal(), f.GetBytes()) - if !f.Index { - tf.FieldOptions = 0 - } else if !f.NoSort { - tf.Sortable() - } - - if f.Store { - tf.StoreValue() - } - if f.Key.Analyzer != index.AnalyzerUnspecified { - tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer]) + var tf *bluge.TermField + k := f.Key.Marshal() + if f.Index { - tf = bluge.NewKeywordFieldBytes(k, f.Term) ++ tf = bluge.NewKeywordFieldBytes(k, f.GetBytes()) + if f.Store { + tf.StoreValue() + } + if !f.NoSort { + tf.Sortable() + } + if f.Key.Analyzer != index.AnalyzerUnspecified { + tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer]) + } + } else { - tf = bluge.NewStoredOnlyField(k, f.Term) ++ tf = bluge.NewStoredOnlyField(k, f.GetBytes()) } doc.AddField(tf) + if fieldNames != nil { + fieldNames = append(fieldNames, k) + } } if d.Timestamp > 0 { diff --cc pkg/index/inverted/inverted_series_test.go index 1b3038f6,5f3f384c..afd0c1fd --- a/pkg/index/inverted/inverted_series_test.go +++ b/pkg/index/inverted/inverted_series_test.go @@@ -904,10 -904,32 +904,10 @@@ func generateDocs() (index.Batch, index series2 := index.Document{ EntityValues: []byte("test2"), Fields: []index.Field{ - field(fieldKeyDuration, convert.Int64ToBytes(100), true, true), - field(fieldKeyServiceName, []byte("svc2"), true, true), - field(fieldKeyStartTime, convert.Int64ToBytes(100), true, true), - field(index.FieldKey{TagName: "short_name"}, []byte("t2"), true, false), - { - Key: fieldKeyDuration, - Term: convert.Int64ToBytes(int64(100)), - Store: true, - Index: true, - }, - { - Key: fieldKeyServiceName, - Term: []byte("svc2"), - Store: true, - Index: true, - }, - { - Key: fieldKeyStartTime, - Term: convert.Int64ToBytes(int64(100)), - Store: true, - Index: true, - }, - { - Key: index.FieldKey{ - TagName: "short_name", - }, - Term: []byte("t2"), - Store: true, - Index: false, - }, ++ field(fieldKeyDuration, convert.Int64ToBytes(100), true), ++ field(fieldKeyServiceName, []byte("svc2"), true), ++ field(fieldKeyStartTime, convert.Int64ToBytes(100), true), ++ field(index.FieldKey{TagName: "short_name"}, []byte("t2"), false), }, Timestamp: int64(101), } @@@ -915,17 -937,44 +915,17 @@@ series3 := index.Document{ EntityValues: []byte("test3"), Fields: []index.Field{ - field(fieldKeyDuration, convert.Int64ToBytes(500), true, true), - field(fieldKeyStartTime, convert.Int64ToBytes(1000), true, true), - field(index.FieldKey{TagName: "short_name"}, []byte("t3"), true, false), - { - Key: fieldKeyDuration, - Term: convert.Int64ToBytes(int64(500)), - Store: true, - Index: true, - }, - { - Key: fieldKeyStartTime, - Term: convert.Int64ToBytes(int64(1000)), - Store: true, - Index: true, - }, - { - Key: index.FieldKey{ - TagName: "short_name", - }, - Term: []byte("t3"), - Store: true, - Index: false, - }, ++ field(fieldKeyDuration, convert.Int64ToBytes(500), true), ++ field(fieldKeyStartTime, convert.Int64ToBytes(1000), true), ++ field(index.FieldKey{TagName: "short_name"}, []byte("t3"), false), }, Timestamp: int64(1001), } series4 := index.Document{ EntityValues: []byte("test4"), Fields: []index.Field{ - field(fieldKeyDuration, convert.Int64ToBytes(500), true, true), - field(fieldKeyStartTime, convert.Int64ToBytes(2000), true, true), - { - Key: fieldKeyDuration, - Term: convert.Int64ToBytes(int64(500)), - Store: true, - Index: true, - }, - { - Key: fieldKeyStartTime, - Term: convert.Int64ToBytes(int64(2000)), - Store: true, - Index: true, - }, ++ field(fieldKeyDuration, convert.Int64ToBytes(500), true), ++ field(fieldKeyStartTime, convert.Int64ToBytes(2000), true), }, Timestamp: int64(2001), } @@@ -936,13 -985,6 +936,13 @@@ } } - func field(key index.FieldKey, value []byte, stored, indexed bool) index.Field { ++func field(key index.FieldKey, value []byte, indexed bool) index.Field { + f := index.NewBytesField(key, value) + f.Index = indexed - f.Store = stored ++ f.Store = true + return f +} + func updateData(tester *require.Assertions, s index.SeriesStore) { b1, b2 := generateDocs() tester.NoError(s.UpdateSeriesBatch(b1)) diff --cc pkg/query/logical/expr_literal.go index b49a499f,c21fdd08..c22bd25c --- a/pkg/query/logical/expr_literal.go +++ b/pkg/query/logical/expr_literal.go @@@ -119,24 -107,6 +119,24 @@@ type int64ArrLiteral struct arr []int64 } - func (i *int64ArrLiteral) Field(key index.FieldKey) index.Field { ++func (i *int64ArrLiteral) Field(_ index.FieldKey) index.Field { + logger.Panicf("unsupported generate an index field for int64 array") + return index.Field{} +} + - func (i *int64ArrLiteral) RangeOpts(isUpper bool, includeLower bool, includeUpper bool) index.RangeOpts { ++func (i *int64ArrLiteral) RangeOpts(_ bool, _ bool, _ bool) index.RangeOpts { + logger.Panicf("unsupported generate an index range opts for int64 array") + return index.RangeOpts{} +} + +func (i *int64ArrLiteral) SubExprs() []LiteralExpr { + exprs := make([]LiteralExpr, 0, len(i.arr)) + for _, v := range i.arr { + exprs = append(exprs, newInt64Literal(v)) + } + return exprs +} + func newInt64ArrLiteral(val []int64) *int64ArrLiteral { return &int64ArrLiteral{ arr: val, @@@ -299,24 -260,6 +299,24 @@@ type strArrLiteral struct arr []string } - func (s *strArrLiteral) Field(key index.FieldKey) index.Field { ++func (s *strArrLiteral) Field(_ index.FieldKey) index.Field { + logger.Panicf("unsupported generate an index field for string array") + return index.Field{} +} + - func (s *strArrLiteral) RangeOpts(isUpper bool, includeLower bool, includeUpper bool) index.RangeOpts { ++func (s *strArrLiteral) RangeOpts(_ bool, _ bool, _ bool) index.RangeOpts { + logger.Panicf("unsupported generate an index range opts for string array") + return index.RangeOpts{} +} + +func (s *strArrLiteral) SubExprs() []LiteralExpr { + exprs := make([]LiteralExpr, 0, len(s.arr)) + for _, v := range s.arr { + exprs = append(exprs, str(v)) + } + return exprs +} + func newStrArrLiteral(val []string) *strArrLiteral { return &strArrLiteral{ arr: val, @@@ -397,18 -381,6 +397,18 @@@ var type nullLiteral struct{} - func (s *nullLiteral) Field(key index.FieldKey) index.Field { ++func (s *nullLiteral) Field(_ index.FieldKey) index.Field { + return nullIndexField +} + - func (s *nullLiteral) RangeOpts(isUpper bool, includeLower bool, includeUpper bool) index.RangeOpts { ++func (s *nullLiteral) RangeOpts(_ bool, _ bool, _ bool) index.RangeOpts { + return nullRangeOpts +} + +func (s *nullLiteral) SubExprs() []LiteralExpr { + panic("unimplemented") +} + func newNullLiteral() *nullLiteral { return nullLiteralExpr } diff --cc pkg/query/logical/stream/index_filter.go index e5746710,fa8852b6..bfb83f2f --- a/pkg/query/logical/stream/index_filter.go +++ b/pkg/query/logical/stream/index_filter.go @@@ -104,13 -106,23 +104,13 @@@ func parseConditionToFilter(cond *model ) (index.Filter, [][]*modelv1.TagValue, error) { switch cond.Op { case modelv1.Condition_BINARY_OP_GT: - return newRange(indexRule, index.RangeOpts{ - Lower: bytes.Join(expr.Bytes(), nil), - }), [][]*modelv1.TagValue{entity}, nil + return newRange(indexRule, expr.RangeOpts(false, false, false)), [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_GE: - return newRange(indexRule, index.RangeOpts{ - IncludesLower: true, - Lower: bytes.Join(expr.Bytes(), nil), - }), [][]*modelv1.TagValue{entity}, nil + return newRange(indexRule, expr.RangeOpts(false, true, false)), [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_LT: - return newRange(indexRule, index.RangeOpts{ - Upper: bytes.Join(expr.Bytes(), nil), - }), [][]*modelv1.TagValue{entity}, nil + return newRange(indexRule, expr.RangeOpts(true, false, false)), [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_LE: - return newRange(indexRule, expr.RangeOpts(true, true, false)), [][]*modelv1.TagValue{entity}, nil - return newRange(indexRule, index.RangeOpts{ - IncludesUpper: true, - Upper: bytes.Join(expr.Bytes(), nil), - }), [][]*modelv1.TagValue{entity}, nil ++ return newRange(indexRule, expr.RangeOpts(true, false, true)), [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_EQ: return newEq(indexRule, expr), [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_MATCH:
