hanahmily commented on code in PR #936:
URL: 
https://github.com/apache/skywalking-banyandb/pull/936#discussion_r2686167303


##########
banyand/measure/topn.go:
##########
@@ -784,12 +778,14 @@ type TopNValue struct {
        firstValue      int64
 }
 
-func (t *TopNValue) setMetadata(valueName string, entityTagNames []string) {
+// SetMetadata set valueName and entityTagNames.
+func (t *TopNValue) SetMetadata(valueName string, entityTagNames []string) {
        t.valueName = valueName
        t.entityTagNames = entityTagNames
 }
 
-func (t *TopNValue) addValue(value int64, entityValues []*modelv1.TagValue) {
+// AddValue add value and entityValues.
+func (t *TopNValue) AddValue(value int64, entityValues []*modelv1.TagValue) {

Review Comment:
   You don't have to expose it. 



##########
banyand/measure/topn.go:
##########
@@ -784,12 +778,14 @@ type TopNValue struct {
        firstValue      int64
 }
 
-func (t *TopNValue) setMetadata(valueName string, entityTagNames []string) {
+// SetMetadata set valueName and entityTagNames.
+func (t *TopNValue) SetMetadata(valueName string, entityTagNames []string) {

Review Comment:
   Why do you expose this function?



##########
banyand/measure/topn.go:
##########
@@ -953,3 +950,20 @@ func (t *TopNValue) Unmarshal(src []byte, decoder 
*encoding.BytesBlockDecoder) e
 func GroupName(groupTags []string) string {
        return strings.Join(groupTags, "|")
 }
+
+// GenerateTopNValuesDecoder returns a new decoder instance of TopNValues.
+func GenerateTopNValuesDecoder() *encoding.BytesBlockDecoder {

Review Comment:
   ```suggestion
   func generateTopNValuesDecoder() *encoding.BytesBlockDecoder {
   ```



##########
banyand/measure/topn.go:
##########
@@ -842,7 +838,8 @@ func (t *TopNValue) resizeEntities(size, entitySize int) 
[][]*modelv1.TagValue {
        return t.entities
 }
 
-func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
+// Marshal marshal the topNValue to the dst.
+func (t *TopNValue) Marshal(dst []byte) ([]byte, error) {

Review Comment:
   the same as before



##########
banyand/measure/topn.go:
##########
@@ -953,3 +950,20 @@ func (t *TopNValue) Unmarshal(src []byte, decoder 
*encoding.BytesBlockDecoder) e
 func GroupName(groupTags []string) string {
        return strings.Join(groupTags, "|")
 }
+
+// GenerateTopNValuesDecoder returns a new decoder instance of TopNValues.
+func GenerateTopNValuesDecoder() *encoding.BytesBlockDecoder {
+       v := topNValuesDecoderPool.Get()
+       if v == nil {
+               return &encoding.BytesBlockDecoder{}
+       }
+       return v
+}
+
+// ReleaseTopNValuesDecoder releases a decoder instance of TopNValues.
+func ReleaseTopNValuesDecoder(d *encoding.BytesBlockDecoder) {

Review Comment:
   ```suggestion
   func releaseTopNValuesDecoder(d *encoding.BytesBlockDecoder) {
   ```



##########
banyand/measure/block.go:
##########
@@ -704,8 +706,80 @@ func (bc *blockCursor) replace(r *model.MeasureResult, 
storedIndexValue map[comm
                        }
                }
        }
-       for i, c := range bc.fields.columns {
-               r.Fields[i].Values[len(r.Fields[i].Values)-1] = 
mustDecodeFieldValue(c.valueType, c.values[bc.idx])
+
+       if topNPostAggregator != nil {
+               topNValue := GenerateTopNValue()
+               defer ReleaseTopNValue(topNValue)
+               decoder := GenerateTopNValuesDecoder()
+               defer ReleaseTopNValuesDecoder(decoder)
+
+               uTimestamps := uint64(bc.timestamps[bc.idx])
+
+               for i, c := range bc.fields.columns {
+                       srcFieldValue := 
r.Fields[i].Values[len(r.Fields[i].Values)-1]
+                       destFieldValue := mustDecodeFieldValue(c.valueType, 
c.values[bc.idx])
+
+                       topNValue.Reset()
+
+                       if err := 
topNValue.Unmarshal(srcFieldValue.GetBinaryData(), decoder); err != nil {
+                               log.Error().Err(err).Msg("failed to unmarshal 
topN value, skip current batch")
+                               continue
+                       }
+
+                       valueName := topNValue.valueName
+                       entityTagNames := topNValue.entityTagNames
+
+                       for j, entityList := range topNValue.entities {
+                               entityValues := make(pbv1.EntityValues, 0, 
len(topNValue.entityValues))

Review Comment:
   This suggestion makes sense. please fix it. 



##########
banyand/measure/query.go:
##########
@@ -852,7 +872,7 @@ func (qr *queryResult) merge(storedIndexValue 
map[common.SeriesID]map[string]*mo
                if len(result.Timestamps) > 0 &&
                        topBC.timestamps[topBC.idx] == 
result.Timestamps[len(result.Timestamps)-1] {
                        if topBC.versions[topBC.idx] > lastVersion {
-                               topBC.replace(result, storedIndexValue)
+                               topBC.replace(result, storedIndexValue, 
topNPostAggregator)

Review Comment:
   1. Do not compare the version; always merge values with the same timestamp. 
   2. Create a new function; keep the "replace" functionality as it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to