This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 474188b  Reduce periodic latency spikes (#307)
474188b is described below

commit 474188b27be68e3fe084970060d2e47a1551d1ce
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Feb 4 00:37:41 2026 +0100

    Reduce periodic latency spikes (#307)
    
    * Reduce periodic latency spikes
    
    * Update BySliceQuery.scala
    
    * Create latency-spikes.excludes
    
    * Update latency-spikes.excludes
---
 .../latency-spikes.excludes                        | 23 ++++++++++++++++++++++
 .../persistence/r2dbc/internal/BySliceQuery.scala  | 20 +++++++++++++------
 .../r2dbc/query/scaladsl/R2dbcReadJournal.scala    |  4 +++-
 3 files changed, 40 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/mima-filters/2.0.x.backwards.excludes/latency-spikes.excludes 
b/core/src/main/mima-filters/2.0.x.backwards.excludes/latency-spikes.excludes
new file mode 100644
index 0000000..0a35781
--- /dev/null
+++ 
b/core/src/main/mima-filters/2.0.x.backwards.excludes/latency-spikes.excludes
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Caused by https://github.com/apache/pekko-persistence-r2dbc/pull/325
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.copy")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.copy$default$5")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState._5")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.this")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.apply")
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
index a5d20cb..9be7ae7 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
@@ -43,7 +43,7 @@ import org.slf4j.Logger
 
   object QueryState {
     val empty: QueryState =
-      QueryState(TimestampOffset.Zero, 0, 0, 0, backtracking = false, 
TimestampOffset.Zero, Buckets.empty)
+      QueryState(TimestampOffset.Zero, 0, 0, 0, backtrackingCount = 0, 
TimestampOffset.Zero, Buckets.empty)
   }
 
   final case class QueryState(
@@ -51,10 +51,12 @@ import org.slf4j.Logger
       rowCount: Int,
       queryCount: Long,
       idleCount: Long,
-      backtracking: Boolean,
+      backtrackingCount: Int,
       latestBacktracking: TimestampOffset,
       buckets: Buckets) {
 
+    def backtracking: Boolean = backtrackingCount > 0
+
     def currentOffset: TimestampOffset =
       if (backtracking) latestBacktracking
       else latest
@@ -346,7 +348,8 @@ import org.slf4j.Logger
     }
 
     def switchFromBacktracking(state: QueryState): Boolean = {
-      state.backtracking && state.rowCount < settings.bufferSize - 1
+      // backtrackingCount is for fairness, to not run too many backtracking 
queries in a row
+      state.backtracking && (state.backtrackingCount >= 3 || state.rowCount < 
settings.bufferSize - 1)
     }
 
     def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, 
NotUsed]]) = {
@@ -370,14 +373,19 @@ import org.slf4j.Logger
             rowCount = 0,
             queryCount = state.queryCount + 1,
             idleCount = newIdleCount,
-            backtracking = true,
+            backtrackingCount = 1,
             latestBacktracking = fromOffset)
         } else if (switchFromBacktracking(state)) {
           // switch from backtracking
-          state.copy(rowCount = 0, queryCount = state.queryCount + 1, 
idleCount = newIdleCount, backtracking = false)
+          state.copy(rowCount = 0, queryCount = state.queryCount + 1, 
idleCount = newIdleCount, backtrackingCount = 0)
         } else {
           // continue
-          state.copy(rowCount = 0, queryCount = state.queryCount + 1, 
idleCount = newIdleCount)
+          val newBacktrackingCount = if (state.backtracking) 
state.backtrackingCount + 1 else 0
+          state.copy(
+            rowCount = 0,
+            queryCount = state.queryCount + 1,
+            idleCount = newIdleCount,
+            backtrackingCount = newBacktrackingCount)
         }
 
       val behindCurrentTime =
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index f623d4e..2f8de36 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -171,7 +171,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
               pubSub.eventTopic(entityType, slice) ! 
Topic.Subscribe(ref.toTyped[EventEnvelope[Event]])
             }
           }
-      
dbSource.merge(pubSubSource).via(deduplicate(settings.deduplicateCapacity))
+      dbSource
+        .mergePrioritized(pubSubSource, leftPriority = 1, rightPriority = 10)
+        .via(deduplicate(settings.deduplicateCapacity))
     } else
       dbSource
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to