This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 474188b Reduce periodic latency spikes (#307)
474188b is described below
commit 474188b27be68e3fe084970060d2e47a1551d1ce
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Feb 4 00:37:41 2026 +0100
Reduce periodic latency spikes (#307)
* Reduce periodic latency spikes
* Update BySliceQuery.scala
* Create latency-spikes.excludes
* Update latency-spikes.excludes
---
.../latency-spikes.excludes | 23 ++++++++++++++++++++++
.../persistence/r2dbc/internal/BySliceQuery.scala | 20 +++++++++++++------
.../r2dbc/query/scaladsl/R2dbcReadJournal.scala | 4 +++-
3 files changed, 40 insertions(+), 7 deletions(-)
diff --git
a/core/src/main/mima-filters/2.0.x.backwards.excludes/latency-spikes.excludes
b/core/src/main/mima-filters/2.0.x.backwards.excludes/latency-spikes.excludes
new file mode 100644
index 0000000..0a35781
--- /dev/null
+++
b/core/src/main/mima-filters/2.0.x.backwards.excludes/latency-spikes.excludes
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Caused by https://github.com/apache/pekko-persistence-r2dbc/pull/325
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.copy")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.copy$default$5")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState._5")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.this")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.persistence.r2dbc.internal.BySliceQuery#QueryState.apply")
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
index a5d20cb..9be7ae7 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
@@ -43,7 +43,7 @@ import org.slf4j.Logger
object QueryState {
val empty: QueryState =
- QueryState(TimestampOffset.Zero, 0, 0, 0, backtracking = false,
TimestampOffset.Zero, Buckets.empty)
+ QueryState(TimestampOffset.Zero, 0, 0, 0, backtrackingCount = 0,
TimestampOffset.Zero, Buckets.empty)
}
final case class QueryState(
@@ -51,10 +51,12 @@ import org.slf4j.Logger
rowCount: Int,
queryCount: Long,
idleCount: Long,
- backtracking: Boolean,
+ backtrackingCount: Int,
latestBacktracking: TimestampOffset,
buckets: Buckets) {
+ def backtracking: Boolean = backtrackingCount > 0
+
def currentOffset: TimestampOffset =
if (backtracking) latestBacktracking
else latest
@@ -346,7 +348,8 @@ import org.slf4j.Logger
}
def switchFromBacktracking(state: QueryState): Boolean = {
- state.backtracking && state.rowCount < settings.bufferSize - 1
+ // backtrackingCount is for fairness, to not run too many backtracking
queries in a row
+ state.backtracking && (state.backtrackingCount >= 3 || state.rowCount <
settings.bufferSize - 1)
}
def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope,
NotUsed]]) = {
@@ -370,14 +373,19 @@ import org.slf4j.Logger
rowCount = 0,
queryCount = state.queryCount + 1,
idleCount = newIdleCount,
- backtracking = true,
+ backtrackingCount = 1,
latestBacktracking = fromOffset)
} else if (switchFromBacktracking(state)) {
// switch from backtracking
- state.copy(rowCount = 0, queryCount = state.queryCount + 1,
idleCount = newIdleCount, backtracking = false)
+ state.copy(rowCount = 0, queryCount = state.queryCount + 1,
idleCount = newIdleCount, backtrackingCount = 0)
} else {
// continue
- state.copy(rowCount = 0, queryCount = state.queryCount + 1,
idleCount = newIdleCount)
+ val newBacktrackingCount = if (state.backtracking)
state.backtrackingCount + 1 else 0
+ state.copy(
+ rowCount = 0,
+ queryCount = state.queryCount + 1,
+ idleCount = newIdleCount,
+ backtrackingCount = newBacktrackingCount)
}
val behindCurrentTime =
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index f623d4e..2f8de36 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -171,7 +171,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
pubSub.eventTopic(entityType, slice) !
Topic.Subscribe(ref.toTyped[EventEnvelope[Event]])
}
}
-
dbSource.merge(pubSubSource).via(deduplicate(settings.deduplicateCapacity))
+ dbSource
+ .mergePrioritized(pubSubSource, leftPriority = 1, rightPriority = 10)
+ .via(deduplicate(settings.deduplicateCapacity))
} else
dbSource
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]