This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
commit e386eeecd690948bbcab9e631bf394f9cc4da619 Author: Scala Steward <[email protected]> AuthorDate: Sat Jan 17 16:29:27 2026 +0000 Reformat with scalafmt 3.10.4 Executed command: scalafmt --non-interactive --- build.sbt | 3 +- .../persistence/r2dbc/internal/BySliceQuery.scala | 9 +++-- .../persistence/r2dbc/internal/R2dbcExecutor.scala | 3 +- .../pekko/persistence/r2dbc/internal/SqlSpec.scala | 3 +- .../query/CurrentPersistenceIdsQuerySpec.scala | 3 +- .../r2dbc/query/EventsBySliceSpec.scala | 38 ++++++++++------------ .../state/CurrentPersistenceIdsQuerySpec.scala | 3 +- .../projection/r2dbc/EventSourcedPubSubSpec.scala | 14 ++++---- .../r2dbc/R2dbcTimestampOffsetProjectionSpec.scala | 3 +- .../r2dbc/TestSourceProviderWithInput.scala | 3 +- 10 files changed, 43 insertions(+), 39 deletions(-) diff --git a/build.sbt b/build.sbt index 387aff8..59d1151 100644 --- a/build.sbt +++ b/build.sbt @@ -104,7 +104,8 @@ lazy val docs = project "scala.binary.version" -> scalaBinaryVersion.value, "extref.pekko.base_url" -> s"https://pekko.apache.org/docs/pekko/${Dependencies.PekkoVersionInDocs}/%s", "extref.pekko-docs.base_url" -> s"https://pekko.apache.org/docs/pekko/${Dependencies.PekkoVersionInDocs}/%s", - "extref.pekko-projection.base_url" -> s"https://pekko.apache.org/docs/pekko-projection/${Dependencies.PekkoProjectionVersionInDocs}/%s", + "extref.pekko-projection.base_url" -> + s"https://pekko.apache.org/docs/pekko-projection/${Dependencies.PekkoProjectionVersionInDocs}/%s", "extref.java-docs.base_url" -> "https://docs.oracle.com/en/java/javase/11/%s", "scaladoc.scala.base_url" -> "https://www.scala-lang.org/api/current/", "scaladoc.org.apache.pekko.persistence.r2dbc.base_url" -> diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala index f3bacbc..a5d20cb 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala @@ -353,7 +353,8 @@ import org.slf4j.Logger val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0 val newState = if (settings.backtrackingEnabled && !state.backtracking && state.latest != TimestampOffset.Zero && - (newIdleCount >= 5 || JDuration + (newIdleCount >= 5 || + JDuration .between(state.latestBacktracking.timestamp, state.latest.timestamp) .compareTo(halfBacktrackingWindow) > 0)) { // FIXME config for newIdleCount >= 5 and maybe something like `newIdleCount % 5 == 0` @@ -429,12 +430,14 @@ import org.slf4j.Logger maxSlice: Int, state: QueryState): Option[Future[QueryState]] = { // Don't run this too frequently - if ((state.buckets.isEmpty || JDuration + if ((state.buckets.isEmpty || + JDuration .between(state.buckets.createdAt, Instant.now()) .compareTo(eventBucketCountInterval) > 0) && // For Durable State we always refresh the bucket counts at the interval. For Event Sourced we know // that they don't change because events are append only. - (dao.countBucketsMayChange || state.buckets + (dao.countBucketsMayChange || + state.buckets .findTimeForLimit(state.latest.timestamp, settings.bufferSize) .isEmpty)) { diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala index 55dd88a..1c6b861 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala @@ -120,8 +120,7 @@ class R2dbcExecutor(val connectionFactory: ConnectionFactory, log: Logger, logDb private def nanoTime(): Long = if (logDbCallsExceedingEnabled) System.nanoTime() else 0L - private def durationInMicros(startTime: Long): Long = - (nanoTime() - startTime) / 1000 + private def durationInMicros(startTime: Long): Long = (nanoTime() - startTime) / 1000 private def getConnection(logPrefix: String): Future[Connection] = { val startTime = nanoTime() diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/SqlSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/SqlSpec.scala index c2c8b4d..0097fea 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/SqlSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/internal/SqlSpec.scala @@ -23,7 +23,8 @@ class SqlSpec extends AnyWordSpec with TestSuite with Matchers { "SQL string interpolation" should { "replace ? bind parameters with numbered $" in { sql"select * from bar where a = ?" shouldBe "select * from bar where a = $1" - sql"select * from bar where a = ? and b = ? and c = ?" shouldBe "select * from bar where a = $1 and b = $2 and c = $3" + sql"select * from bar where a = ? and b = ? and c = ?" shouldBe + "select * from bar where a = $1 and b = $2 and c = $3" sql"select * from bar" shouldBe "select * from bar" } diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala index 67c7ec8..0a59495 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala @@ -51,8 +51,7 @@ class CurrentPersistenceIdsQuerySpec private val zeros = "0000" private val entityType = nextEntityType() private val numberOfPids = 100 - private val pids = - (1 to numberOfPids).map(n => PersistenceId(entityType, "p" + zeros.drop(n.toString.length) + n)) + private val pids = (1 to numberOfPids).map(n => PersistenceId(entityType, "p" + zeros.drop(n.toString.length) + n)) override protected def beforeAll(): Unit = { super.beforeAll() diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala index 05a6d11..e506c9f 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala @@ -290,18 +290,17 @@ class EventsBySliceSpec ranges(2) should be(512 to 767) ranges(3) should be(768 to 1023) - val allEnvelopes = - (0 until 4).flatMap { rangeIndex => - val result = - query - .currentEventsBySlices[String](entityType, ranges(rangeIndex).min, ranges(rangeIndex).max, NoOffset) - .runWith(Sink.seq) - .futureValue - result.foreach { env => - ranges(rangeIndex) should contain(query.sliceForPersistenceId(env.persistenceId)) - } - result + val allEnvelopes = (0 until 4).flatMap { rangeIndex => + val result = + query + .currentEventsBySlices[String](entityType, ranges(rangeIndex).min, ranges(rangeIndex).max, NoOffset) + .runWith(Sink.seq) + .futureValue + result.foreach { env => + ranges(rangeIndex) should contain(query.sliceForPersistenceId(env.persistenceId)) } + result + } allEnvelopes.size should be(numberOfPersisters * numberOfEvents) } } @@ -347,15 +346,14 @@ class EventsBySliceSpec ranges(2) should be(512 to 767) ranges(3) should be(768 to 1023) - val queries: Seq[Source[EventEnvelope[String], NotUsed]] = - (0 until 4).map { rangeIndex => - query - .eventsBySlices[String](entityType, ranges(rangeIndex).min, ranges(rangeIndex).max, NoOffset) - .map { env => - ranges(rangeIndex) should contain(query.sliceForPersistenceId(env.persistenceId)) - env - } - } + val queries: Seq[Source[EventEnvelope[String], NotUsed]] = (0 until 4).map { rangeIndex => + query + .eventsBySlices[String](entityType, ranges(rangeIndex).min, ranges(rangeIndex).max, NoOffset) + .map { env => + ranges(rangeIndex) should contain(query.sliceForPersistenceId(env.persistenceId)) + env + } + } val allEnvelopes = queries(0) .merge(queries(1)) diff --git a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala index d0df18b..e4de282 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala @@ -52,8 +52,7 @@ class CurrentPersistenceIdsQuerySpec private val zeros = "0000" private val entityType = nextEntityType() private val numberOfPids = 100 - private val pids = - (1 to numberOfPids).map(n => PersistenceId(entityType, "p" + zeros.drop(n.toString.length) + n)) + private val pids = (1 to numberOfPids).map(n => PersistenceId(entityType, "p" + zeros.drop(n.toString.length) + n)) override protected def beforeAll(): Unit = { super.beforeAll() diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala index 306d2d8..338cf42 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala @@ -234,16 +234,18 @@ class EventSourcedPubSubSpec val viaPubSub = processedByPid.filter(p => - p.envelope.offset.asInstanceOf[TimestampOffset].timestamp == p.envelope.offset - .asInstanceOf[TimestampOffset] - .readTimestamp) + p.envelope.offset.asInstanceOf[TimestampOffset].timestamp == + p.envelope.offset + .asInstanceOf[TimestampOffset] + .readTimestamp) log.info("via pub-sub {}: {}", pid: Any, viaPubSub.map(_.envelope.sequenceNr).mkString(", "): Any) } val countViaPubSub = processed.count(p => - p.envelope.offset.asInstanceOf[TimestampOffset].timestamp == p.envelope.offset - .asInstanceOf[TimestampOffset] - .readTimestamp) + p.envelope.offset.asInstanceOf[TimestampOffset].timestamp == + p.envelope.offset + .asInstanceOf[TimestampOffset] + .readTimestamp) log.info("Total via pub-sub: {}", countViaPubSub) countViaPubSub shouldBe >(0) diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala index aaac877..dc130b3 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala @@ -112,7 +112,8 @@ object R2dbcTimestampOffsetProjectionSpec { override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = { Future.successful(envelopes.collectFirst { case env - if env.persistenceId == persistenceId && env.sequenceNr == sequenceNr && env.offset + if env.persistenceId == persistenceId && env.sequenceNr == sequenceNr && + env.offset .isInstanceOf[TimestampOffset] => env.offset.asInstanceOf[TimestampOffset].timestamp }) diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestSourceProviderWithInput.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestSourceProviderWithInput.scala index 537df6b..a6e6b7e 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestSourceProviderWithInput.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestSourceProviderWithInput.scala @@ -88,7 +88,8 @@ class TestSourceProviderWithInput()(implicit val system: ActorSystem[_]) override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = { Future.successful(envelopes.iterator().asScala.collectFirst { case env - if env.persistenceId == persistenceId && env.sequenceNr == sequenceNr && env.offset + if env.persistenceId == persistenceId && env.sequenceNr == sequenceNr && + env.offset .isInstanceOf[TimestampOffset] => env.offset.asInstanceOf[TimestampOffset].timestamp }) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
