hanahmily commented on code in PR #947:
URL:
https://github.com/apache/skywalking-banyandb/pull/947#discussion_r2707460064
##########
banyand/internal/wqueue/wqueue.go:
##########
@@ -220,3 +220,12 @@ func (q *Queue[S, O]) GetTimeRange(ts time.Time)
timestamp.TimeRange {
func (q *Queue[S, O]) GetNodes(shardID common.ShardID) []string {
return q.opts.GetNodes(shardID)
}
+
+// Shards returns all shards in the queue.
+func (q *Queue[S, O]) Shards() []*Shard[S] {
Review Comment:
Since we only ever access the queue from this shard, we can simplify the
type and replace the shard with `AllSubQueue`. This removes unused fields and
makes the dependency explicit.
##########
banyand/stream/metadata.go:
##########
@@ -231,17 +236,228 @@ func (sr *schemaRepo) loadStream(metadata
*commonv1.Metadata) (*stream, bool) {
}
func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable,
option], error) {
+ if sr == nil {
+ return nil, fmt.Errorf("schemaRepo is nil")
+ }
g, ok := sr.LoadGroup(groupName)
if !ok {
return nil, fmt.Errorf("group %s not found", groupName)
}
db := g.SupplyTSDB()
if db == nil {
- return nil, fmt.Errorf("tsdb for group %s not found", groupName)
+ return nil, fmt.Errorf("group %s not found", groupName)
}
return db.(storage.TSDB[*tsTable, option]), nil
}
+// CollectDataInfo collects data info for a specific.
+func (sr *schemaRepo) CollectDataInfo(ctx context.Context, group string)
(*databasev1.DataInfo, error) {
+ if sr.nodeID == "" {
+ return nil, fmt.Errorf("node ID is empty")
+ }
+ node, nodeErr := sr.metadata.NodeRegistry().GetNode(ctx, sr.nodeID)
+ if nodeErr != nil {
+ return nil, fmt.Errorf("failed to get current node info: %w",
nodeErr)
+ }
+ tsdb, tsdbErr := sr.loadTSDB(group)
+ if tsdbErr != nil {
+ return nil, tsdbErr
+ }
+ if tsdb == nil {
+ return nil, nil
+ }
+ segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+ Start: time.Unix(0, 0),
+ End: time.Unix(0, timestamp.MaxNanoTime),
+ })
+ if segmentsErr != nil {
+ return nil, segmentsErr
+ }
+ var segmentInfoList []*databasev1.SegmentInfo
+ var totalDataSize int64
+ for _, segment := range segments {
+ timeRange := segment.GetTimeRange()
+ tables, _ := segment.Tables()
+ var shardInfoList []*databasev1.ShardInfo
+ for shardIdx, table := range tables {
+ shardInfo := sr.collectShardInfo(table,
uint32(shardIdx))
+ shardInfoList = append(shardInfoList, shardInfo)
+ totalDataSize += shardInfo.DataSizeBytes
+ }
+ seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
+ totalDataSize += seriesIndexInfo.DataSizeBytes
+ segmentInfo := &databasev1.SegmentInfo{
+ SegmentId: fmt.Sprintf("%d-%d",
timeRange.Start.UnixNano(), timeRange.End.UnixNano()),
+ TimeRangeStart:
timeRange.Start.Format(time.RFC3339Nano),
+ TimeRangeEnd: timeRange.End.Format(time.RFC3339Nano),
+ ShardInfo: shardInfoList,
+ SeriesIndexInfo: seriesIndexInfo,
+ }
+ segmentInfoList = append(segmentInfoList, segmentInfo)
+ }
+ dataInfo := &databasev1.DataInfo{
+ Node: node,
+ SegmentInfo: segmentInfoList,
+ DataSizeBytes: totalDataSize,
+ }
+ return dataInfo, nil
+}
+
+func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable,
option]) *databasev1.SeriesIndexInfo {
+ indexDB := segment.IndexDB()
+ if indexDB == nil {
+ return &databasev1.SeriesIndexInfo{
+ DataCount: 0,
+ DataSizeBytes: 0,
+ }
+ }
+ dataCount, dataSizeBytes := indexDB.Stats()
+ return &databasev1.SeriesIndexInfo{
+ DataCount: dataCount,
+ DataSizeBytes: dataSizeBytes,
+ }
+}
+
+func (sr *schemaRepo) collectShardInfo(table any, shardID uint32)
*databasev1.ShardInfo {
+ tst, ok := table.(*tsTable)
+ if !ok {
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: 0,
+ DataSizeBytes: 0,
+ PartCount: 0,
+ }
+ }
+ tst.RLock()
+ defer tst.RUnlock()
+ snapshot := tst.snapshot
+ if snapshot == nil {
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: 0,
+ DataSizeBytes: 0,
+ PartCount: 0,
+ }
+ }
+ var totalCount, compressedSize, uncompressedSize, partCount uint64
+ for _, pw := range snapshot.parts {
+ if pw.p != nil {
+ totalCount += pw.p.partMetadata.TotalCount
+ compressedSize += pw.p.partMetadata.CompressedSizeBytes
+ uncompressedSize +=
pw.p.partMetadata.UncompressedSizeBytes
+ partCount++
+ } else if pw.mp != nil {
+ totalCount += pw.mp.partMetadata.TotalCount
+ compressedSize += pw.mp.partMetadata.CompressedSizeBytes
+ uncompressedSize +=
pw.mp.partMetadata.UncompressedSizeBytes
+ partCount++
+ }
+ }
+ invertedIndexInfo := sr.collectInvertedIndexInfo(tst)
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: int64(totalCount),
+ DataSizeBytes: int64(compressedSize),
+ PartCount: int64(partCount),
+ InvertedIndexInfo: invertedIndexInfo,
+ SidxInfo: &databasev1.SIDXInfo{},
+ }
+}
+
+func (sr *schemaRepo) collectInvertedIndexInfo(tst *tsTable)
*databasev1.InvertedIndexInfo {
+ if tst.index == nil {
+ return &databasev1.InvertedIndexInfo{
+ DataCount: 0,
+ DataSizeBytes: 0,
+ }
+ }
+ dataCount, dataSizeBytes := tst.index.store.Stats()
+ return &databasev1.InvertedIndexInfo{
+ DataCount: dataCount,
+ DataSizeBytes: dataSizeBytes,
+ }
+}
+
+func (sr *schemaRepo) collectPendingWriteInfo(groupName string) (int64, error)
{
+ if sr == nil || sr.Repository == nil {
+ return 0, fmt.Errorf("schema repository is not initialized")
+ }
+ if sr.role == databasev1.Role_ROLE_LIAISON {
+ queue, queueErr := sr.loadQueue(groupName)
+ if queueErr != nil {
+ return 0, fmt.Errorf("failed to load queue: %w",
queueErr)
+ }
+ if queue == nil {
+ return 0, nil
+ }
+ var pendingWriteCount int64
+ for _, shard := range queue.Shards() {
+ tst := shard.SubQueue()
+ if tst != nil {
+ pendingWriteCount += tst.pendingDataCount.Load()
+ }
+ }
+ return pendingWriteCount, nil
+ }
+ // Standalone mode
+ tsdb, tsdbErr := sr.loadTSDB(groupName)
+ if tsdbErr != nil {
+ return 0, fmt.Errorf("failed to load TSDB: %w", tsdbErr)
+ }
+ if tsdb == nil {
+ return 0, fmt.Errorf("TSDB is nil for group %s", groupName)
+ }
+ segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
Review Comment:
Invoke segment.DecRef() to release them.
##########
banyand/stream/tstable.go:
##########
@@ -49,21 +49,22 @@ const (
)
type tsTable struct {
- fileSystem fs.FileSystem
- pm protector.Memory
- metrics *metrics
- index *elementIndex
- snapshot *snapshot
- loopCloser *run.Closer
- getNodes func() []string
- l *logger.Logger
- introductions chan *introduction
- p common.Position
- group string
- root string
- gc garbageCleaner
- option option
- curPartID uint64
+ fileSystem fs.FileSystem
+ pm protector.Memory
+ metrics *metrics
+ index *elementIndex
+ snapshot *snapshot
+ loopCloser *run.Closer
+ getNodes func() []string
+ l *logger.Logger
+ introductions chan *introduction
+ p common.Position
+ group string
+ root string
+ gc garbageCleaner
+ option option
+ curPartID uint64
+ pendingDataCount atomic.Int64
Review Comment:
Would you please move pendingDataCount to metrics and make it as a metric?
##########
banyand/metadata/schema/etcd.go:
##########
@@ -128,14 +132,20 @@ func CheckInterval(d time.Duration) WatcherOption {
}
}
+var _ Registry = (*etcdSchemaRegistry)(nil)
+
type etcdSchemaRegistry struct {
Review Comment:
`etcd` will become optional soon. Please do not add any new functionality to
it. Instead, create a new struct to implement these features.
##########
banyand/stream/metadata.go:
##########
@@ -231,17 +236,228 @@ func (sr *schemaRepo) loadStream(metadata
*commonv1.Metadata) (*stream, bool) {
}
func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable,
option], error) {
+ if sr == nil {
+ return nil, fmt.Errorf("schemaRepo is nil")
+ }
g, ok := sr.LoadGroup(groupName)
if !ok {
return nil, fmt.Errorf("group %s not found", groupName)
}
db := g.SupplyTSDB()
if db == nil {
- return nil, fmt.Errorf("tsdb for group %s not found", groupName)
+ return nil, fmt.Errorf("group %s not found", groupName)
}
return db.(storage.TSDB[*tsTable, option]), nil
}
+// CollectDataInfo collects data info for a specific.
+func (sr *schemaRepo) CollectDataInfo(ctx context.Context, group string)
(*databasev1.DataInfo, error) {
+ if sr.nodeID == "" {
+ return nil, fmt.Errorf("node ID is empty")
+ }
+ node, nodeErr := sr.metadata.NodeRegistry().GetNode(ctx, sr.nodeID)
+ if nodeErr != nil {
+ return nil, fmt.Errorf("failed to get current node info: %w",
nodeErr)
+ }
+ tsdb, tsdbErr := sr.loadTSDB(group)
+ if tsdbErr != nil {
+ return nil, tsdbErr
+ }
+ if tsdb == nil {
+ return nil, nil
+ }
+ segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+ Start: time.Unix(0, 0),
+ End: time.Unix(0, timestamp.MaxNanoTime),
+ })
+ if segmentsErr != nil {
+ return nil, segmentsErr
+ }
+ var segmentInfoList []*databasev1.SegmentInfo
+ var totalDataSize int64
+ for _, segment := range segments {
+ timeRange := segment.GetTimeRange()
+ tables, _ := segment.Tables()
+ var shardInfoList []*databasev1.ShardInfo
+ for shardIdx, table := range tables {
+ shardInfo := sr.collectShardInfo(table,
uint32(shardIdx))
+ shardInfoList = append(shardInfoList, shardInfo)
+ totalDataSize += shardInfo.DataSizeBytes
+ }
+ seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
+ totalDataSize += seriesIndexInfo.DataSizeBytes
+ segmentInfo := &databasev1.SegmentInfo{
+ SegmentId: fmt.Sprintf("%d-%d",
timeRange.Start.UnixNano(), timeRange.End.UnixNano()),
+ TimeRangeStart:
timeRange.Start.Format(time.RFC3339Nano),
+ TimeRangeEnd: timeRange.End.Format(time.RFC3339Nano),
+ ShardInfo: shardInfoList,
+ SeriesIndexInfo: seriesIndexInfo,
+ }
+ segmentInfoList = append(segmentInfoList, segmentInfo)
+ }
+ dataInfo := &databasev1.DataInfo{
+ Node: node,
+ SegmentInfo: segmentInfoList,
+ DataSizeBytes: totalDataSize,
+ }
+ return dataInfo, nil
+}
+
+func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable,
option]) *databasev1.SeriesIndexInfo {
+ indexDB := segment.IndexDB()
+ if indexDB == nil {
+ return &databasev1.SeriesIndexInfo{
+ DataCount: 0,
+ DataSizeBytes: 0,
+ }
+ }
+ dataCount, dataSizeBytes := indexDB.Stats()
+ return &databasev1.SeriesIndexInfo{
+ DataCount: dataCount,
+ DataSizeBytes: dataSizeBytes,
+ }
+}
+
+func (sr *schemaRepo) collectShardInfo(table any, shardID uint32)
*databasev1.ShardInfo {
+ tst, ok := table.(*tsTable)
+ if !ok {
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: 0,
+ DataSizeBytes: 0,
+ PartCount: 0,
+ }
+ }
+ tst.RLock()
+ defer tst.RUnlock()
+ snapshot := tst.snapshot
+ if snapshot == nil {
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: 0,
+ DataSizeBytes: 0,
+ PartCount: 0,
+ }
+ }
+ var totalCount, compressedSize, uncompressedSize, partCount uint64
+ for _, pw := range snapshot.parts {
+ if pw.p != nil {
+ totalCount += pw.p.partMetadata.TotalCount
+ compressedSize += pw.p.partMetadata.CompressedSizeBytes
+ uncompressedSize +=
pw.p.partMetadata.UncompressedSizeBytes
+ partCount++
+ } else if pw.mp != nil {
+ totalCount += pw.mp.partMetadata.TotalCount
+ compressedSize += pw.mp.partMetadata.CompressedSizeBytes
+ uncompressedSize +=
pw.mp.partMetadata.UncompressedSizeBytes
+ partCount++
+ }
+ }
+ invertedIndexInfo := sr.collectInvertedIndexInfo(tst)
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: int64(totalCount),
+ DataSizeBytes: int64(compressedSize),
+ PartCount: int64(partCount),
+ InvertedIndexInfo: invertedIndexInfo,
+ SidxInfo: &databasev1.SIDXInfo{},
+ }
+}
+
+func (sr *schemaRepo) collectInvertedIndexInfo(tst *tsTable)
*databasev1.InvertedIndexInfo {
+ if tst.index == nil {
+ return &databasev1.InvertedIndexInfo{
+ DataCount: 0,
+ DataSizeBytes: 0,
+ }
+ }
+ dataCount, dataSizeBytes := tst.index.store.Stats()
+ return &databasev1.InvertedIndexInfo{
+ DataCount: dataCount,
+ DataSizeBytes: dataSizeBytes,
+ }
+}
+
+func (sr *schemaRepo) collectPendingWriteInfo(groupName string) (int64, error)
{
+ if sr == nil || sr.Repository == nil {
+ return 0, fmt.Errorf("schema repository is not initialized")
+ }
+ if sr.role == databasev1.Role_ROLE_LIAISON {
+ queue, queueErr := sr.loadQueue(groupName)
+ if queueErr != nil {
+ return 0, fmt.Errorf("failed to load queue: %w",
queueErr)
+ }
+ if queue == nil {
+ return 0, nil
+ }
+ var pendingWriteCount int64
+ for _, shard := range queue.Shards() {
+ tst := shard.SubQueue()
+ if tst != nil {
+ pendingWriteCount += tst.pendingDataCount.Load()
+ }
+ }
+ return pendingWriteCount, nil
+ }
+ // Standalone mode
+ tsdb, tsdbErr := sr.loadTSDB(groupName)
+ if tsdbErr != nil {
+ return 0, fmt.Errorf("failed to load TSDB: %w", tsdbErr)
+ }
+ if tsdb == nil {
+ return 0, fmt.Errorf("TSDB is nil for group %s", groupName)
+ }
+ segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+ Start: time.Unix(0, 0),
+ End: time.Unix(0, timestamp.MaxNanoTime),
+ })
+ if segmentsErr != nil {
+ return 0, fmt.Errorf("failed to select segments: %w",
segmentsErr)
+ }
+ var pendingWriteCount int64
+ for _, segment := range segments {
+ tables, _ := segment.Tables()
+ for _, tst := range tables {
+ pendingWriteCount += tst.pendingDataCount.Load()
+ }
+ }
+ return pendingWriteCount, nil
+}
+
+func (sr *schemaRepo) collectPendingSyncInfo(groupName string) (partCount
int64, totalSizeBytes int64, err error) {
+ if sr == nil || sr.Repository == nil {
+ return 0, 0, fmt.Errorf("schema repository is not initialized")
+ }
+ // Only liaison nodes collect pending sync info
+ if sr.role != databasev1.Role_ROLE_LIAISON {
+ return 0, 0, nil
+ }
+ queue, queueErr := sr.loadQueue(groupName)
+ if queueErr != nil {
+ return 0, 0, fmt.Errorf("failed to load queue: %w", queueErr)
+ }
+ if queue == nil {
+ return 0, 0, nil
+ }
+ for _, shard := range queue.Shards() {
+ tst := shard.SubQueue()
+ if tst != nil {
+ tst.RLock()
+ snapshot := tst.snapshot
Review Comment:
Use currentSnapshot to avoid accessing the lock.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]