This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 673da546c14 [#30083][prism] Factor out hold tracking to dedicated 
structures (#31105)
673da546c14 is described below

commit 673da546c1465c931fdbbc5769e7d566ff55b4d8
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Fri Apr 26 15:11:39 2024 -0700

    [#30083][prism] Factor out hold tracking to dedicated structures (#31105)
    
    * [prism] Factor out hold tracking to dedicated structures
    
    * review comment-reorder move code out of ladder.
    
    ---------
    
    Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com>
---
 .../prism/internal/engine/elementmanager.go        |  77 +++-----------
 .../prism/internal/engine/elementmanager_test.go   |   2 +-
 .../beam/runners/prism/internal/engine/holds.go    | 105 +++++++++++++++++++
 .../runners/prism/internal/engine/holds_test.go    | 115 +++++++++++++++++++++
 .../runners/prism/internal/engine/teststream.go    |  12 +--
 5 files changed, 236 insertions(+), 75 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index e40f5513dae..5d665edf286 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -414,7 +414,7 @@ func (em *ElementManager) checkForQuiescence(advanced 
set[string]) {
                outW := ss.OutputWatermark()
                upPCol, upW := ss.UpstreamWatermark()
                upS := em.pcolParents[upPCol]
-               stageState = append(stageState, fmt.Sprintln(id, "watermark 
in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, 
"byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", 
ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts", 
ss.watermarkHoldsCounts))
+               stageState = append(stageState, fmt.Sprintln(id, "watermark 
in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, 
"byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", 
ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", 
ss.watermarkHolds.counts))
        }
        panic(fmt.Sprintf("nothing in progress and no refreshes with non zero 
pending elements: %v\n%v", v, strings.Join(stageState, "")))
 }
@@ -706,18 +706,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, 
col2Coders map[string]PCol
        delete(stage.inprogressKeysByBundle, rb.BundleID)
 
        for hold, v := range stage.inprogressHoldsByBundle[rb.BundleID] {
-               n := stage.watermarkHoldsCounts[hold] - v
-               if n == 0 {
-                       delete(stage.watermarkHoldsCounts, hold)
-                       for i, h := range stage.watermarkHoldHeap {
-                               if hold == h {
-                                       heap.Remove(&stage.watermarkHoldHeap, i)
-                                       break
-                               }
-                       }
-               } else {
-                       stage.watermarkHoldsCounts[hold] = n
-               }
+               stage.watermarkHolds.Drop(hold, v)
        }
        delete(stage.inprogressHoldsByBundle, rb.BundleID)
 
@@ -918,8 +907,7 @@ type stageState struct {
        // We track the count of timers with the same hold, and clear it from
        // the map and heap when the count goes to zero.
        // This avoids scanning the heap to remove or access a hold for each 
element.
-       watermarkHoldsCounts    map[mtime.Time]int
-       watermarkHoldHeap       holdHeap
+       watermarkHolds          *holdTracker
        inprogressHoldsByBundle map[string]map[mtime.Time]int // bundle to 
associated holds.
 }
 
@@ -940,37 +928,15 @@ type dataAndTimers struct {
        timers   map[timerKey]timerTimes
 }
 
-// holdHeap orders holds based on their timestamps
-// so we can always find the minimum timestamp of pending holds.
-type holdHeap []mtime.Time
-
-func (h holdHeap) Len() int           { return len(h) }
-func (h holdHeap) Less(i, j int) bool { return h[i] < h[j] }
-func (h holdHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
-
-func (h *holdHeap) Push(x any) {
-       // Push and Pop use pointer receivers because they modify the slice's 
length,
-       // not just its contents.
-       *h = append(*h, x.(mtime.Time))
-}
-
-func (h *holdHeap) Pop() any {
-       old := *h
-       n := len(old)
-       x := old[n-1]
-       *h = old[0 : n-1]
-       return x
-}
-
 // makeStageState produces an initialized stageState.
 func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) 
*stageState {
        ss := &stageState{
-               ID:                   ID,
-               outputIDs:            outputIDs,
-               sides:                sides,
-               strat:                defaultStrat{},
-               state:                
map[LinkID]map[typex.Window]map[string]StateData{},
-               watermarkHoldsCounts: map[mtime.Time]int{},
+               ID:             ID,
+               outputIDs:      outputIDs,
+               sides:          sides,
+               strat:          defaultStrat{},
+               state:          
map[LinkID]map[typex.Window]map[string]StateData{},
+               watermarkHolds: newHoldTracker(),
 
                input:           mtime.MinTimestamp,
                output:          mtime.MinTimestamp,
@@ -1016,29 +982,13 @@ func (ss *stageState) AddPending(newPending []element) 
int {
                                        // don't increase the count this time, 
as "this" timer is already pending.
                                        count--
                                        // clear out the existing hold for 
accounting purposes.
-                                       v := 
ss.watermarkHoldsCounts[lastSet.hold] - 1
-                                       if v == 0 {
-                                               delete(ss.watermarkHoldsCounts, 
lastSet.hold)
-                                               for i, hold := range 
ss.watermarkHoldHeap {
-                                                       if hold == lastSet.hold 
{
-                                                               
heap.Remove(&ss.watermarkHoldHeap, i)
-                                                               break
-                                                       }
-                                               }
-                                       } else {
-                                               
ss.watermarkHoldsCounts[lastSet.hold] = v
-                                       }
+                                       ss.watermarkHolds.Drop(lastSet.hold, 1)
                                }
                                // Update the last set time on the timer.
                                dnt.timers[timerKey{family: e.family, tag: 
e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: 
e.holdTimestamp}
 
                                // Mark the hold in the heap.
-                               ss.watermarkHoldsCounts[e.holdTimestamp] = 
ss.watermarkHoldsCounts[e.holdTimestamp] + 1
-
-                               if len(ss.watermarkHoldsCounts) != 
len(ss.watermarkHoldHeap) {
-                                       // The hold should not be in the heap, 
so we add it.
-                                       heap.Push(&ss.watermarkHoldHeap, 
e.holdTimestamp)
-                               }
+                               ss.watermarkHolds.Add(e.holdTimestamp, 1)
                        }
                }
                return count
@@ -1308,10 +1258,7 @@ func (ss *stageState) updateWatermarks(em 
*ElementManager) set[string] {
        defer ss.mu.Unlock()
 
        minPending := ss.minPendingTimestampLocked()
-       minWatermarkHold := mtime.MaxTimestamp
-       if ss.watermarkHoldHeap.Len() > 0 {
-               minWatermarkHold = ss.watermarkHoldHeap[0]
-       }
+       minWatermarkHold := ss.watermarkHolds.Min()
 
        // PCollection watermarks are based on their parents's output watermark.
        _, newIn := ss.UpstreamWatermark()
diff --git 
a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
index 7235508f164..275dd790d2b 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
@@ -295,7 +295,7 @@ func TestStageState_updateWatermarks(t *testing.T) {
                        ss.output = test.initOutput
                        ss.updateUpstreamWatermark(inputCol, test.upstream)
                        ss.pending = append(ss.pending, element{timestamp: 
test.minPending})
-                       ss.watermarkHoldHeap = append(ss.watermarkHoldHeap, 
test.minStateHold)
+                       ss.watermarkHolds.Add(test.minStateHold, 1)
                        ss.updateWatermarks(em)
                        if got, want := ss.input, test.wantInput; got != want {
                                pcol, up := ss.UpstreamWatermark()
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go
new file mode 100644
index 00000000000..9077b3f439d
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package engine
+
+import (
+       "container/heap"
+       "fmt"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+)
+
+// holdHeap orders holds based on their timestamps
+// so we can always find the minimum timestamp of pending holds.
+type holdHeap []mtime.Time
+
+func (h holdHeap) Len() int           { return len(h) }
+func (h holdHeap) Less(i, j int) bool { return h[i] < h[j] }
+func (h holdHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
+
+func (h *holdHeap) Push(x any) {
+       // Push and Pop use pointer receivers because they modify the slice's 
length,
+       // not just its contents.
+       *h = append(*h, x.(mtime.Time))
+}
+
+func (h *holdHeap) Pop() any {
+       old := *h
+       n := len(old)
+       x := old[n-1]
+       *h = old[0 : n-1]
+       return x
+}
+
+// holdTracker track the watermark holds for a stage.
+//
+// Timers hold back the watermark until they fire, but multiple
+// timers may set the same watermark hold.
+// To track when the watermark may advance further this structure maintains
+// counts for each set watermark hold.
+// As timers are processed, their associated holds are removed, reducing the 
counts.
+//
+// A heap of the hold times is kept so we have quick access to the minimum 
hold, for calculating
+// how to advance the watermark.
+type holdTracker struct {
+       heap   holdHeap
+       counts map[mtime.Time]int
+}
+
+func newHoldTracker() *holdTracker {
+       return &holdTracker{
+               counts: map[mtime.Time]int{},
+       }
+}
+
+// Drop the given hold count. When the count of a hold time reaches zero, it's
+// removed from the heap. Drop panics if holds become negative.
+func (ht *holdTracker) Drop(hold mtime.Time, v int) {
+       n := ht.counts[hold] - v
+       if n > 0 {
+               ht.counts[hold] = n
+               return
+       } else if n < 0 {
+               panic(fmt.Sprintf("prism error: negative watermark hold count 
%v for time %v", n, hold))
+       }
+       delete(ht.counts, hold)
+       for i, h := range ht.heap {
+               if hold == h {
+                       heap.Remove(&ht.heap, i)
+                       break
+               }
+       }
+}
+
+// Add a hold a number of times to heap. If the hold time isn't already 
present in the heap, it is added.
+func (ht *holdTracker) Add(hold mtime.Time, v int) {
+       // Mark the hold in the heap.
+       ht.counts[hold] = ht.counts[hold] + v
+
+       if len(ht.counts) != len(ht.heap) {
+               // Since there's a difference, the hold should not be in the 
heap, so we add it.
+               heap.Push(&ht.heap, hold)
+       }
+}
+
+// Min returns the earliest hold in the heap. Returns [mtime.MaxTimestamp] if 
the heap is empty.
+func (ht *holdTracker) Min() mtime.Time {
+       minWatermarkHold := mtime.MaxTimestamp
+       if len(ht.heap) > 0 {
+               minWatermarkHold = ht.heap[0]
+       }
+       return minWatermarkHold
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/holds_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/holds_test.go
new file mode 100644
index 00000000000..91de51bc1af
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/holds_test.go
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package engine
+
+import (
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+)
+
+func TestHoldTracker(t *testing.T) {
+
+       type op func(*holdTracker)
+       add := func(hold mtime.Time, count int) op {
+               return func(ht *holdTracker) {
+                       ht.Add(hold, count)
+               }
+       }
+
+       drop := func(hold mtime.Time, count int) op {
+               return func(ht *holdTracker) {
+                       ht.Drop(hold, count)
+               }
+       }
+
+       tests := []struct {
+               name    string
+               ops     []op
+               wantMin mtime.Time
+               wantLen int
+       }{
+               {
+                       name:    "zero-max",
+                       wantMin: mtime.MaxTimestamp,
+                       wantLen: 0,
+               }, {
+
+                       name: "one-min",
+                       ops: []op{
+                               add(mtime.MinTimestamp, 1),
+                       },
+                       wantMin: mtime.MinTimestamp,
+                       wantLen: 1,
+               }, {
+
+                       name: "cleared-max",
+                       ops: []op{
+                               add(mtime.MinTimestamp, 1),
+                               drop(mtime.MinTimestamp, 1),
+                       },
+                       wantMin: mtime.MaxTimestamp,
+                       wantLen: 0,
+               }, {
+                       name: "cleared-non-eogw",
+                       ops: []op{
+                               add(mtime.MinTimestamp, 1),
+                               add(mtime.EndOfGlobalWindowTime, 1),
+                               drop(mtime.MinTimestamp, 1),
+                       },
+                       wantMin: mtime.EndOfGlobalWindowTime,
+                       wantLen: 1,
+               }, {
+                       name: "uncleared-non-min",
+                       ops: []op{
+                               add(mtime.MinTimestamp, 2),
+                               add(mtime.EndOfGlobalWindowTime, 1),
+                               drop(mtime.MinTimestamp, 1),
+                       },
+                       wantMin: mtime.MinTimestamp,
+                       wantLen: 2,
+               }, {
+                       name: "uncleared-non-min",
+                       ops: []op{
+                               add(1, 1),
+                               add(2, 1),
+                               add(3, 1),
+                               drop(2, 1),
+                               add(4, 1),
+                               add(3, 1),
+                               drop(1, 1),
+                               add(2, 1),
+                               drop(4, 1),
+                       },
+                       wantMin: 2,
+                       wantLen: 2,
+               },
+       }
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       tracker := newHoldTracker()
+                       for _, op := range test.ops {
+                               op(tracker)
+                       }
+                       if got, want := tracker.Min(), test.wantMin; got != 
want {
+                               t.Errorf("tracker.heap.Min() = %v, want %v", 
got, want)
+                       }
+                       if got, want := tracker.heap.Len(), test.wantLen; got 
!= want {
+                               t.Errorf("tracker.heap.Len() = %v, want %v", 
got, want)
+                       }
+               })
+       }
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
index f0350064d52..34b79d455ce 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
@@ -16,7 +16,6 @@
 package engine
 
 import (
-       "container/heap"
        "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
@@ -139,13 +138,9 @@ func (ts *testStreamHandler) UpdateHold(em 
*ElementManager, newHold mtime.Time)
        ss.mu.Lock()
        defer ss.mu.Unlock()
 
-       if ss.watermarkHoldsCounts[ts.currentHold] > 0 {
-               heap.Pop(&ss.watermarkHoldHeap)
-               ss.watermarkHoldsCounts[ts.currentHold] = 
ss.watermarkHoldsCounts[ts.currentHold] - 1
-       }
+       ss.watermarkHolds.Drop(ts.currentHold, 1)
        ts.currentHold = newHold
-       heap.Push(&ss.watermarkHoldHeap, ts.currentHold)
-       ss.watermarkHoldsCounts[ts.currentHold] = 1
+       ss.watermarkHolds.Add(ts.currentHold, 1)
 
        // kick the TestStream and Impulse stages too.
        kick := singleSet(ts.ID)
@@ -281,8 +276,7 @@ func (tsi *testStreamImpl) initHandler(id string) {
                tsi.em.addPending(1) // We subtrack a pending after event 
execution, so add one now for the final event to avoid a race condition.
 
                // Arrest the watermark initially to prevent terminal 
advancement.
-               heap.Push(&ss.watermarkHoldHeap, 
tsi.em.testStreamHandler.currentHold)
-               ss.watermarkHoldsCounts[tsi.em.testStreamHandler.currentHold] = 
1
+               ss.watermarkHolds.Add(tsi.em.testStreamHandler.currentHold, 1)
        }
 }
 

Reply via email to