This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 04e17d0b Fix memory leaks and OOM issues in streaming processing (#777)
04e17d0b is described below
commit 04e17d0bb19fbcaa0e1eb8443d2029a222635dbb
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun Sep 21 09:25:31 2025 +0800
Fix memory leaks and OOM issues in streaming processing (#777)
---
CHANGES.md | 1 +
banyand/measure/write_liaison.go | 8 +-
banyand/queue/pub/pub.go | 11 +-
banyand/stream/write_liaison.go | 8 +-
pkg/flow/dedup_priority_queue.go | 54 ++++-
pkg/flow/dedup_priority_queue_test.go | 370 ++++++++++++++++++++++++++++++
pkg/flow/streaming/sliding_window.go | 110 ++++-----
pkg/flow/streaming/sliding_window_test.go | 137 +++++++++++
pkg/flow/types.go | 2 +-
9 files changed, 634 insertions(+), 67 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 79a44f03..3ebd0e0e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -57,6 +57,7 @@ Release Notes.
- Fix topN parsing panic when the criteria is set.
- Remove the indexed_only field in TagSpec.
- Fix returning empty result when using IN operatior on the array type tags.
+- Fix memory leaks and OOM issues in streaming processing by implementing
deduplication logic in priority queues and improving sliding window memory
management.
- Fix etcd prefix matching any key that starts with this prefix.
### Document
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index 4b360c78..7b287ff5 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -131,9 +131,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
// Send to all nodes for this shard
for _, node := range nodes {
message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
- _, publishErr :=
w.tire2Client.Publish(ctx, topic, message)
+ future, publishErr :=
w.tire2Client.Publish(ctx, topic, message)
if publishErr != nil {
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to publish series index to node")
+ continue
+ }
+ _, err := future.Get()
+ if err != nil {
+
w.l.Error().Err(err).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to get response from publish")
+ continue
}
}
}
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 74fa2432..e8a397ed 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -229,7 +229,9 @@ type publishResult struct {
func (p *pub) publish(timeout time.Duration, topic bus.Topic, messages
...bus.Message) (bus.Future, error) {
var err error
- f := &future{}
+ f := &future{
+ log: p.log,
+ }
handleMessage := func(m bus.Message, err error) error {
r, errSend := messageToRequest(topic, m)
if errSend != nil {
@@ -362,6 +364,7 @@ func messageToRequest(topic bus.Topic, m bus.Message)
(*clusterv1.SendRequest, e
}
type future struct {
+ log *logger.Logger
clients []clusterv1.Service_SendClient
cancelFn []func()
topics []bus.Topic
@@ -372,10 +375,16 @@ func (l *future) Get() (bus.Message, error) {
if len(l.clients) < 1 {
return bus.Message{}, io.EOF
}
+
c := l.clients[0]
t := l.topics[0]
n := l.nodes[0]
+
defer func() {
+ if err := c.CloseSend(); err != nil {
+ l.log.Error().Err(err).Msg("failed to close send
stream")
+ }
+
l.clients = l.clients[1:]
l.topics = l.topics[1:]
l.cancelFn[0]()
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index d8478f1b..c78d1a37 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -204,9 +204,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
// Send to all nodes for this shard
for _, node := range nodes {
message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
- _, publishErr :=
w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
+ future, publishErr :=
w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
if publishErr != nil {
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to publish series index to node")
+ continue
+ }
+ _, err := future.Get()
+ if err != nil {
+
w.l.Error().Err(err).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to get response from publish")
+ continue
}
}
}
diff --git a/pkg/flow/dedup_priority_queue.go b/pkg/flow/dedup_priority_queue.go
index 527ab6f5..becbd55d 100644
--- a/pkg/flow/dedup_priority_queue.go
+++ b/pkg/flow/dedup_priority_queue.go
@@ -31,11 +31,21 @@ type Element interface {
SetIndex(int)
}
+// HashableElement represents an element that can be hashed and compared for
equality.
+type HashableElement interface {
+ Element
+ // Hash returns a hash value for this element
+ Hash() uint64
+ // Equal compares this element with another for content equality
+ Equal(HashableElement) bool
+}
+
// DedupPriorityQueue implements heap.Interface.
// DedupPriorityQueue is not thread-safe.
type DedupPriorityQueue struct {
comparator utils.Comparator
cache map[Element]struct{}
+ hashCache map[uint64][]HashableElement // For content-based
deduplication
Items []Element
allowDuplicates bool
}
@@ -46,6 +56,7 @@ func NewPriorityQueue(comparator utils.Comparator,
allowDuplicates bool) *DedupP
comparator: comparator,
Items: make([]Element, 0),
cache: make(map[Element]struct{}),
+ hashCache: make(map[uint64][]HashableElement),
allowDuplicates: allowDuplicates,
}
}
@@ -60,6 +71,9 @@ func (pq *DedupPriorityQueue) Less(i, j int) bool {
// Swap exchanges indexes of the items.
func (pq *DedupPriorityQueue) Swap(i, j int) {
+ if i < 0 || i >= len(pq.Items) || j < 0 || j >= len(pq.Items) {
+ panic("index out of range in DedupPriorityQueue.Swap")
+ }
pq.Items[i], pq.Items[j] = pq.Items[j], pq.Items[i]
pq.Items[i].SetIndex(i)
pq.Items[j].SetIndex(j)
@@ -71,11 +85,29 @@ func (pq *DedupPriorityQueue) Push(x interface{}) {
item := x.(Element)
// if duplicates is not allowed
if !pq.allowDuplicates {
- // use mutex to protect cache and items
- // check existence
+ // Check for reference-based duplicates first
if _, ok := pq.cache[item]; ok {
return
}
+
+ // Check for content-based duplicates if the item implements
HashableElement
+ if hashableItem, ok := item.(HashableElement); ok {
+ hash := hashableItem.Hash()
+ if existingItems, exists := pq.hashCache[hash]; exists {
+ // Check if any existing item has the same
content
+ for _, existing := range existingItems {
+ if hashableItem.Equal(existing) {
+ return // Duplicate found,
don't add
+ }
+ }
+ // No duplicate found, add to hash cache
+ pq.hashCache[hash] = append(pq.hashCache[hash],
hashableItem)
+ } else {
+ // First item with this hash
+ pq.hashCache[hash] =
[]HashableElement{hashableItem}
+ }
+ }
+
pq.cache[item] = struct{}{}
}
n := len(pq.Items)
@@ -90,6 +122,24 @@ func (pq *DedupPriorityQueue) Pop() interface{} {
item := pq.Items[n-1]
item.SetIndex(-1) // for safety
delete(pq.cache, item)
+
+ // Clean up hash cache if item implements HashableElement
+ if hashableItem, ok := item.(HashableElement); ok {
+ hash := hashableItem.Hash()
+ if existingItems, exists := pq.hashCache[hash]; exists {
+ // Remove the specific item from the hash cache
+ for i, existing := range existingItems {
+ if hashableItem.Equal(existing) {
+ pq.hashCache[hash] =
append(existingItems[:i], existingItems[i+1:]...)
+ if len(pq.hashCache[hash]) == 0 {
+ delete(pq.hashCache, hash)
+ }
+ break
+ }
+ }
+ }
+ }
+
pq.Items = pq.Items[0 : n-1]
return item
}
diff --git a/pkg/flow/dedup_priority_queue_test.go
b/pkg/flow/dedup_priority_queue_test.go
new file mode 100644
index 00000000..d79b0187
--- /dev/null
+++ b/pkg/flow/dedup_priority_queue_test.go
@@ -0,0 +1,370 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package flow
+
+import (
+ "container/heap"
+ "fmt"
+ "testing"
+)
+
+type testHashableElement struct {
+ id string
+ val int
+ index int
+}
+
+func (t *testHashableElement) GetIndex() int {
+ return t.index
+}
+
+func (t *testHashableElement) SetIndex(idx int) {
+ t.index = idx
+}
+
+func (t *testHashableElement) Hash() uint64 {
+ // Simple hash based on id
+ hash := uint64(0)
+ for _, b := range []byte(t.id) {
+ hash = hash*31 + uint64(b)
+ }
+ return hash
+}
+
+func (t *testHashableElement) Equal(other HashableElement) bool {
+ if otherElem, ok := other.(*testHashableElement); ok {
+ return t.id == otherElem.id
+ }
+ return false
+}
+
+type testElement struct {
+ val int
+ index int
+}
+
+func (t *testElement) GetIndex() int {
+ return t.index
+}
+
+func (t *testElement) SetIndex(idx int) {
+ t.index = idx
+}
+
+func TestHashableElementHash(t *testing.T) {
+ elem1 := &testHashableElement{id: "test"}
+ elem2 := &testHashableElement{id: "test"}
+ elem3 := &testHashableElement{id: "different"}
+
+ if elem1.Hash() != elem2.Hash() {
+ t.Errorf("Expected same hash for elements with same id, got %d
and %d", elem1.Hash(), elem2.Hash())
+ }
+
+ if elem1.Hash() == elem3.Hash() {
+ t.Errorf("Expected different hash for elements with different
id, got same hash %d", elem1.Hash())
+ }
+}
+
+func TestHashableElementEqual(t *testing.T) {
+ elem1 := &testHashableElement{id: "test", val: 10}
+ elem2 := &testHashableElement{id: "test", val: 20} // Same id,
different val
+ elem3 := &testHashableElement{id: "different", val: 10} // Different
id, same val
+
+ // Same id should be equal regardless of other fields
+ if !elem1.Equal(elem2) {
+ t.Error("Expected elements with same id to be equal")
+ }
+
+ // Different id should not be equal
+ if elem1.Equal(elem3) {
+ t.Error("Expected elements with different id to not be equal")
+ }
+}
+
+func TestDedupPriorityQueue_ContentBasedDeduplication(t *testing.T) {
+ // Create a priority queue with deduplication enabled
+ pq := NewPriorityQueue(func(a, b interface{}) int {
+ return a.(*testHashableElement).val -
b.(*testHashableElement).val
+ }, false)
+
+ // Create two elements with the same id (should be deduplicated)
+ elem1 := &testHashableElement{id: "test", val: 10}
+ elem2 := &testHashableElement{id: "test", val: 10}
+
+ // Push both elements
+ heap.Push(pq, elem1)
+ heap.Push(pq, elem2)
+
+ // Should only have one element in the heap
+ if pq.Len() != 1 {
+ t.Errorf("Expected 1 element in heap after deduplication, got
%d", pq.Len())
+ }
+
+ // Create elements with different ids (should not be deduplicated)
+ elem3 := &testHashableElement{id: "different", val: 20}
+ heap.Push(pq, elem3)
+
+ // Should now have two elements
+ if pq.Len() != 2 {
+ t.Errorf("Expected 2 elements in heap after adding different
element, got %d", pq.Len())
+ }
+}
+
+func TestDedupPriorityQueue_AllowDuplicates(t *testing.T) {
+ // Create a priority queue with duplicates allowed
+ pq := NewPriorityQueue(func(a, b interface{}) int {
+ return a.(*testHashableElement).val -
b.(*testHashableElement).val
+ }, true)
+
+ // Create two elements with the same id
+ elem1 := &testHashableElement{id: "test", val: 10}
+ elem2 := &testHashableElement{id: "test", val: 10}
+
+ // Push both elements
+ heap.Push(pq, elem1)
+ heap.Push(pq, elem2)
+
+ // Should have two elements in the heap (duplicates allowed)
+ if pq.Len() != 2 {
+ t.Errorf("Expected 2 elements in heap when duplicates are
allowed, got %d", pq.Len())
+ }
+
+ // Even pushing the same reference should be allowed
+ heap.Push(pq, elem1)
+ if pq.Len() != 3 {
+ t.Errorf("Expected 3 elements in heap after pushing same
reference when duplicates allowed, got %d", pq.Len())
+ }
+}
+
+func TestDedupPriorityQueue_HashCollisionHandling(t *testing.T) {
+ // Create a priority queue with deduplication enabled
+ pq := NewPriorityQueue(func(a, b interface{}) int {
+ return a.(*testHashableElement).val -
b.(*testHashableElement).val
+ }, false)
+
+ // Create elements that might have hash collisions
+ elem1 := &testHashableElement{id: "ab", val: 10} // Hash might collide
with "ba"
+ elem2 := &testHashableElement{id: "ba", val: 10} // Different id, might
have same hash
+ elem3 := &testHashableElement{id: "ab", val: 20} // Same id as elem1,
should be deduplicated
+
+ heap.Push(pq, elem1)
+ heap.Push(pq, elem2)
+ heap.Push(pq, elem3)
+
+ // Should have 2 elements: elem1 (or elem3, they're equivalent) and
elem2
+ if pq.Len() != 2 {
+ t.Errorf("Expected 2 elements after hash collision test, got
%d", pq.Len())
+ }
+}
+
+func TestDedupPriorityQueue_PopCleanup(t *testing.T) {
+ // Create a priority queue with deduplication enabled
+ pq := NewPriorityQueue(func(a, b interface{}) int {
+ return a.(*testHashableElement).val -
b.(*testHashableElement).val
+ }, false)
+
+ // Add elements
+ elem1 := &testHashableElement{id: "test1", val: 10}
+ elem2 := &testHashableElement{id: "test2", val: 20}
+ elem3 := &testHashableElement{id: "test1", val: 30} // Same id as elem1
+
+ heap.Push(pq, elem1)
+ heap.Push(pq, elem2)
+ heap.Push(pq, elem3) // Should be deduplicated
+
+ // Should have 2 elements (elem1 and elem2)
+ if pq.Len() != 2 {
+ t.Errorf("Expected 2 elements after deduplication, got %d",
pq.Len())
+ }
+
+ // Pop an element
+ popped := heap.Pop(pq).(*testHashableElement)
+
+ // Should have 1 element left
+ if pq.Len() != 1 {
+ t.Errorf("Expected 1 element after pop, got %d", pq.Len())
+ }
+
+ // Try to add the same element again - should be allowed since it was
popped
+ heap.Push(pq, popped)
+
+ // Should now have 2 elements again
+ if pq.Len() != 2 {
+ t.Errorf("Expected 2 elements after re-adding popped element,
got %d", pq.Len())
+ }
+}
+
+func TestDedupPriorityQueue_MixedElementTypes(t *testing.T) {
+ // Create a priority queue that can handle both Element and
HashableElement
+ pq := NewPriorityQueue(func(a, b interface{}) int {
+ switch va := a.(type) {
+ case *testHashableElement:
+ switch vb := b.(type) {
+ case *testHashableElement:
+ return va.val - vb.val
+ case *testElement:
+ return va.val - vb.val
+ }
+ case *testElement:
+ switch vb := b.(type) {
+ case *testHashableElement:
+ return va.val - vb.val
+ case *testElement:
+ return va.val - vb.val
+ }
+ }
+ return 0
+ }, false)
+
+ // Add mixed element types
+ hashElem1 := &testHashableElement{id: "test", val: 10}
+ hashElem2 := &testHashableElement{id: "test", val: 20} // Same id,
should be deduplicated
+ regularElem := &testElement{val: 15}
+
+ heap.Push(pq, hashElem1)
+ heap.Push(pq, regularElem)
+ heap.Push(pq, hashElem2) // Should be deduplicated with hashElem1
+
+ // Should have 2 elements: hashElem1 and regularElem
+ if pq.Len() != 2 {
+ t.Errorf("Expected 2 elements with mixed types, got %d",
pq.Len())
+ }
+}
+
+func TestDedupPriorityQueue_EmptyHeapOperations(t *testing.T) {
+ // Create an empty priority queue
+ pq := NewPriorityQueue(func(a, b interface{}) int {
+ return a.(*testHashableElement).val -
b.(*testHashableElement).val
+ }, false)
+
+ // Test operations on empty heap
+ if pq.Len() != 0 {
+ t.Errorf("Expected empty heap to have length 0, got %d",
pq.Len())
+ }
+
+ if pq.Peek() != nil {
+ t.Error("Expected Peek() on empty heap to return nil")
+ }
+
+ // Test that Pop panics on empty heap
+ defer func() {
+ if r := recover(); r == nil {
+ t.Error("Expected Pop() on empty heap to panic")
+ }
+ }()
+ heap.Pop(pq)
+}
+
+func TestDedupPriorityQueue_PriorityOrdering(t *testing.T) {
+ // Test that deduplication doesn't affect priority ordering
+ pq := NewPriorityQueue(func(a, b interface{}) int {
+ return a.(*testHashableElement).val -
b.(*testHashableElement).val
+ }, false)
+
+ // Add elements in random order
+ elem1 := &testHashableElement{id: "high", val: 30}
+ elem2 := &testHashableElement{id: "low", val: 10}
+ elem3 := &testHashableElement{id: "high", val: 40} // Same id as elem1,
should be deduplicated
+ elem4 := &testHashableElement{id: "medium", val: 20}
+
+ heap.Push(pq, elem1)
+ heap.Push(pq, elem2)
+ heap.Push(pq, elem3) // Should be deduplicated
+ heap.Push(pq, elem4)
+
+ // Should have 3 elements (elem3 deduplicated)
+ if pq.Len() != 3 {
+ t.Errorf("Expected 3 elements after deduplication, got %d",
pq.Len())
+ }
+
+ // Pop elements and verify they come out in priority order
+ expectedOrder := []int{10, 20, 30} // elem2, elem4, elem1
+ for i, expectedVal := range expectedOrder {
+ if pq.Len() == 0 {
+ t.Errorf("Expected more elements, but heap is empty at
position %d", i)
+ break
+ }
+ popped := heap.Pop(pq).(*testHashableElement)
+ if popped.val != expectedVal {
+ t.Errorf("Expected value %d at position %d, got %d",
expectedVal, i, popped.val)
+ }
+ }
+}
+
+func TestDedupPriorityQueue_LargeScaleDeduplication(t *testing.T) {
+ // Test deduplication with many elements
+ pq := NewPriorityQueue(func(a, b interface{}) int {
+ return a.(*testHashableElement).val -
b.(*testHashableElement).val
+ }, false)
+
+ // Add 100 elements, but only 10 unique ids
+ for i := 0; i < 100; i++ {
+ id := "id" + string(rune(i%10+'0')) // id0, id1, ..., id9, id0,
id1, ...
+ elem := &testHashableElement{id: id, val: i}
+ heap.Push(pq, elem)
+ }
+
+ // Should have only 10 unique elements
+ if pq.Len() != 10 {
+ t.Errorf("Expected 10 unique elements, got %d", pq.Len())
+ }
+
+ // Verify all remaining elements have unique ids
+ seen := make(map[string]bool)
+ for pq.Len() > 0 {
+ elem := heap.Pop(pq).(*testHashableElement)
+ if seen[elem.id] {
+ t.Errorf("Found duplicate id %s in final heap", elem.id)
+ }
+ seen[elem.id] = true
+ }
+
+ // Should have seen exactly 10 unique ids
+ if len(seen) != 10 {
+ t.Errorf("Expected 10 unique ids, got %d", len(seen))
+ }
+}
+
+func BenchmarkDedupPriorityQueue_Push(b *testing.B) {
+ pq := NewPriorityQueue(func(a, b interface{}) int {
+ return a.(*testHashableElement).val -
b.(*testHashableElement).val
+ }, false)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ elem := &testHashableElement{id: "benchmark", val: i}
+ heap.Push(pq, elem)
+ }
+}
+
+func BenchmarkDedupPriorityQueue_Pop(b *testing.B) {
+ pq := NewPriorityQueue(func(a, b interface{}) int {
+ return a.(*testHashableElement).val -
b.(*testHashableElement).val
+ }, false)
+
+ // Pre-populate with unique elements
+ for i := 0; i < b.N; i++ {
+ elem := &testHashableElement{id: fmt.Sprintf("unique_%d", i),
val: i}
+ heap.Push(pq, elem)
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N && pq.Len() > 0; i++ {
+ heap.Pop(pq)
+ }
+}
diff --git a/pkg/flow/streaming/sliding_window.go
b/pkg/flow/streaming/sliding_window.go
index ed012084..b049dd5d 100644
--- a/pkg/flow/streaming/sliding_window.go
+++ b/pkg/flow/streaming/sliding_window.go
@@ -142,7 +142,7 @@ func (s *tumblingTimeWindows) flushDueWindows() {
defer s.timerMu.Unlock()
for {
if lookAhead, ok := s.timerHeap.Peek().(*internalTimer); ok {
- if lookAhead.triggerTimeMillis <= s.currentWatermark {
+ if lookAhead.w.MaxTimestamp() <= s.currentWatermark {
oldestTimer :=
heap.Pop(s.timerHeap).(*internalTimer)
s.flushWindow(oldestTimer.w)
continue
@@ -162,37 +162,32 @@ func (s *tumblingTimeWindows) receive() {
defer s.Done()
for elem := range s.in {
- assignedWindows, err := s.AssignWindows(elem.TimestampMillis())
+ assignedWindow, err := s.AssignWindows(elem.TimestampMillis())
if err != nil {
s.errorHandler(err)
continue
}
- ctx := triggerContext{
- delegation: s,
+ // drop if the window is late
+ if s.isWindowLate(assignedWindow) {
+ continue
}
- for _, w := range assignedWindows {
- // drop if the window is late
- if s.isWindowLate(w) {
- continue
- }
- tw := w.(timeWindow)
- ctx.window = tw
- // add elem to the bucket
- if oldAggr, ok := s.snapshots.Get(tw); ok {
-
oldAggr.(flow.AggregationOp).Add([]flow.StreamRecord{elem})
- } else {
- newAggr := s.aggregationFactory()
- newAggr.Add([]flow.StreamRecord{elem})
- s.snapshots.Add(tw, newAggr)
- if e := s.l.Debug(); e.Enabled() {
- e.Stringer("window", tw).Msg("create
new window")
- }
+ tw := assignedWindow.(timeWindow)
+ // add elem to the bucket
+ if oldAggr, ok := s.snapshots.Get(tw); ok {
+
oldAggr.(flow.AggregationOp).Add([]flow.StreamRecord{elem})
+ } else {
+ newAggr := s.aggregationFactory()
+ newAggr.Add([]flow.StreamRecord{elem})
+ s.snapshots.Add(tw, newAggr)
+ if e := s.l.Debug(); e.Enabled() {
+ e.Stringer("window", tw).Msg("create new
window")
}
+ }
- result := ctx.OnElement(elem)
- if result == fire {
- s.flushWindow(tw)
- }
+ result := s.eventTimeTriggerOnElement(tw)
+
+ if result == fire {
+ s.flushWindow(tw)
}
// even if the incoming elements do not follow strict order,
@@ -260,7 +255,7 @@ func NewTumblingTimeWindows(size time.Duration,
maxFlushInterval time.Duration)
return &tumblingTimeWindows{
windowSize: ws,
timerHeap: flow.NewPriorityQueue(func(a, b interface{}) int {
- return int(a.(*internalTimer).triggerTimeMillis -
b.(*internalTimer).triggerTimeMillis)
+ return int(a.(*internalTimer).w.MaxTimestamp() -
b.(*internalTimer).w.MaxTimestamp())
}, false),
in: make(chan flow.StreamRecord),
out: make(chan flow.StreamRecord),
@@ -285,14 +280,12 @@ func (t timeWindow) String() string {
}
// AssignWindows assigns windows according to the given timestamp.
-func (s *tumblingTimeWindows) AssignWindows(timestamp int64) ([]flow.Window,
error) {
+func (s *tumblingTimeWindows) AssignWindows(timestamp int64) (flow.Window,
error) {
if timestamp > math.MinInt64 {
start := getWindowStart(timestamp, s.windowSize)
- return []flow.Window{
- timeWindow{
- start: start,
- end: start + s.windowSize,
- },
+ return timeWindow{
+ start: start,
+ end: start + s.windowSize,
}, nil
}
return nil, errors.New("invalid timestamp from the element")
@@ -305,43 +298,27 @@ func getWindowStart(timestamp, windowSize int64) int64 {
}
// eventTimeTriggerOnElement processes element(s) with EventTimeTrigger.
-func eventTimeTriggerOnElement(window timeWindow, ctx *triggerContext)
triggerResult {
- if window.MaxTimestamp() <= ctx.GetCurrentWatermark() {
+func (s *tumblingTimeWindows) eventTimeTriggerOnElement(window timeWindow)
triggerResult {
+ if window.MaxTimestamp() <= s.currentWatermark {
// if watermark is already past the window fire immediately
return fire
}
- ctx.RegisterEventTimeTimer(window.MaxTimestamp())
- return cont
-}
-
-type triggerContext struct {
- delegation *tumblingTimeWindows
- window timeWindow
-}
-
-func (ctx *triggerContext) GetCurrentWatermark() int64 {
- return ctx.delegation.currentWatermark
-}
-
-func (ctx *triggerContext) RegisterEventTimeTimer(triggerTime int64) {
- ctx.delegation.timerMu.Lock()
- defer ctx.delegation.timerMu.Unlock()
- heap.Push(ctx.delegation.timerHeap, &internalTimer{
- triggerTimeMillis: triggerTime,
- w: ctx.window,
+ s.timerMu.Lock()
+ defer s.timerMu.Unlock()
+ heap.Push(s.timerHeap, &internalTimer{
+ w: window,
})
+ return cont
}
-func (ctx *triggerContext) OnElement(_ flow.StreamRecord) triggerResult {
- return eventTimeTriggerOnElement(ctx.window, ctx)
-}
-
-var _ flow.Element = (*internalTimer)(nil)
+var (
+ _ flow.Element = (*internalTimer)(nil)
+ _ flow.HashableElement = (*internalTimer)(nil)
+)
type internalTimer struct {
- w timeWindow
- triggerTimeMillis int64
- index int
+ w timeWindow
+ index int
}
func (t *internalTimer) GetIndex() int {
@@ -351,3 +328,14 @@ func (t *internalTimer) GetIndex() int {
func (t *internalTimer) SetIndex(idx int) {
t.index = idx
}
+
+func (t *internalTimer) Equal(other flow.HashableElement) bool {
+ if otherTimer, ok := other.(*internalTimer); ok {
+ return t.w.start == otherTimer.w.start && t.w.end ==
otherTimer.w.end
+ }
+ return false
+}
+
+func (t *internalTimer) Hash() uint64 {
+ return uint64(t.w.start)<<32 | uint64(t.w.end)
+}
diff --git a/pkg/flow/streaming/sliding_window_test.go
b/pkg/flow/streaming/sliding_window_test.go
index cee7973d..28077bc8 100644
--- a/pkg/flow/streaming/sliding_window_test.go
+++ b/pkg/flow/streaming/sliding_window_test.go
@@ -18,6 +18,7 @@
package streaming
import (
+ "container/heap"
"context"
"time"
@@ -136,4 +137,140 @@ var _ = g.Describe("Sliding Window", func() {
}).WithTimeout(flags.EventuallyTimeout).Should(gomega.Succeed())
})
})
+
+ g.Describe("Timer Heap Deduplication", func() {
+ var timerHeap *flow.DedupPriorityQueue
+
+ g.BeforeEach(func() {
+ timerHeap = flow.NewPriorityQueue(func(a, b
interface{}) int {
+ return int(a.(*internalTimer).w.MaxTimestamp()
- b.(*internalTimer).w.MaxTimestamp())
+ }, false)
+ })
+
+ g.It("Should deduplicate same reference internalTimer objects",
func() {
+ timer1 := &internalTimer{
+ w: timeWindow{start: 1000, end: 2000},
+ }
+
+ // Push the same reference twice
+ heap.Push(timerHeap, timer1)
+ heap.Push(timerHeap, timer1)
+
+ // Should only have one item due to reference-based
deduplication
+ gomega.Expect(timerHeap.Len()).Should(gomega.Equal(1))
+ })
+
+ g.It("Should deduplicate different internalTimer objects with
same window content", func() {
+ timer1 := &internalTimer{
+ w: timeWindow{start: 1000, end: 2000},
+ }
+ timer2 := &internalTimer{
+ w: timeWindow{start: 1000, end: 2000}, // Same
window content
+ }
+
+ // Push different objects with same content
+ heap.Push(timerHeap, timer1)
+ heap.Push(timerHeap, timer2)
+
+ // Should only have one item due to content-based
deduplication
+ gomega.Expect(timerHeap.Len()).Should(gomega.Equal(1))
+ })
+
+ g.It("Should not deduplicate internalTimer objects with
different windows", func() {
+ timer1 := &internalTimer{
+ w: timeWindow{start: 1000, end: 2000},
+ }
+ timer2 := &internalTimer{
+ w: timeWindow{start: 2000, end: 3000}, //
Different window
+ }
+
+ // Push different objects with different content
+ heap.Push(timerHeap, timer1)
+ heap.Push(timerHeap, timer2)
+
+ // Should have two items as they have different content
+ gomega.Expect(timerHeap.Len()).Should(gomega.Equal(2))
+ })
+
+ g.It("Should maintain proper ordering after deduplication",
func() {
+ timer1 := &internalTimer{
+ w: timeWindow{start: 3000, end: 4000}, // Later
timestamp
+ }
+ timer2 := &internalTimer{
+ w: timeWindow{start: 1000, end: 2000}, //
Earlier timestamp
+ }
+ timer3 := &internalTimer{
+ w: timeWindow{start: 1000, end: 2000}, //
Duplicate of timer2
+ }
+
+ // Push in order: later, earlier, duplicate
+ heap.Push(timerHeap, timer1)
+ heap.Push(timerHeap, timer2)
+ heap.Push(timerHeap, timer3) // Should be deduplicated
+
+ // Should only have 2 items
+ gomega.Expect(timerHeap.Len()).Should(gomega.Equal(2))
+
+ // Peek should return the earliest timer (timer2)
+ earliest := timerHeap.Peek().(*internalTimer)
+
gomega.Expect(earliest.w.start).Should(gomega.Equal(int64(1000)))
+
gomega.Expect(earliest.w.end).Should(gomega.Equal(int64(2000)))
+ })
+
+ g.It("Should verify Hash and Equal methods work correctly",
func() {
+ timer1 := &internalTimer{
+ w: timeWindow{start: 1000, end: 2000},
+ }
+ timer2 := &internalTimer{
+ w: timeWindow{start: 1000, end: 2000}, // Same
content
+ }
+ timer3 := &internalTimer{
+ w: timeWindow{start: 1000, end: 3000}, //
Different end
+ }
+
+ // Test Hash method
+
gomega.Expect(timer1.Hash()).Should(gomega.Equal(timer2.Hash()))
+
gomega.Expect(timer1.Hash()).ShouldNot(gomega.Equal(timer3.Hash()))
+
+ // Test Equal method
+
gomega.Expect(timer1.Equal(timer2)).Should(gomega.BeTrue())
+
gomega.Expect(timer1.Equal(timer3)).Should(gomega.BeFalse())
+ })
+
+ g.It("Should work with heap operations", func() {
+ timer1 := &internalTimer{
+ w: timeWindow{start: 3000, end: 4000}, // Later
+ }
+ timer2 := &internalTimer{
+ w: timeWindow{start: 1000, end: 2000}, //
Earlier
+ }
+ timer3 := &internalTimer{
+ w: timeWindow{start: 1000, end: 2000}, //
Duplicate of timer2
+ }
+
+ // Initialize heap
+ heap.Init(timerHeap)
+
+ // Push timers
+ heap.Push(timerHeap, timer1)
+ heap.Push(timerHeap, timer2)
+ heap.Push(timerHeap, timer3) // Should be deduplicated
+
+ // Should only have 2 items
+ gomega.Expect(timerHeap.Len()).Should(gomega.Equal(2))
+
+ // Pop should return earliest first
+ earliest := heap.Pop(timerHeap).(*internalTimer)
+
gomega.Expect(earliest.w.start).Should(gomega.Equal(int64(1000)))
+
gomega.Expect(earliest.w.end).Should(gomega.Equal(int64(2000)))
+
+ // Next should be the later timer
+ later := heap.Pop(timerHeap).(*internalTimer)
+
gomega.Expect(later.w.start).Should(gomega.Equal(int64(3000)))
+
gomega.Expect(later.w.end).Should(gomega.Equal(int64(4000)))
+
+ // Heap should be empty now
+ gomega.Expect(timerHeap.Len()).Should(gomega.Equal(0))
+ })
+ })
})
diff --git a/pkg/flow/types.go b/pkg/flow/types.go
index 6e8576a4..20bf4c50 100644
--- a/pkg/flow/types.go
+++ b/pkg/flow/types.go
@@ -73,7 +73,7 @@ type Window interface {
type WindowAssigner interface {
// AssignWindows assigns a slice of Window according to the given
timestamp, e.g. eventTime.
// The unit of the timestamp here is MilliSecond.
- AssignWindows(timestamp int64) ([]Window, error)
+ AssignWindows(timestamp int64) (Window, error)
}
// AggregationOp defines the stateful operation for aggregation.