This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new f735cca1a5 chore: Remove CollectionUtil (#2582)
f735cca1a5 is described below
commit f735cca1a5dea740ea79ea643b9470991979cedd
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Dec 14 19:32:33 2025 +0800
chore: Remove CollectionUtil (#2582)
---
.../scala/org/apache/pekko/util/Collections.scala | 24 +++++++++++++
.../pekko/stream/javadsl/CollectionUtil.scala | 39 ----------------------
.../org/apache/pekko/stream/javadsl/Flow.scala | 6 ++--
.../org/apache/pekko/stream/javadsl/Sink.scala | 3 +-
.../org/apache/pekko/stream/javadsl/Source.scala | 21 ++++++++----
.../org/apache/pekko/stream/javadsl/SubFlow.scala | 6 ++--
.../apache/pekko/stream/javadsl/SubSource.scala | 6 ++--
.../org/apache/pekko/stream/javadsl/Tcp.scala | 9 ++---
8 files changed, 57 insertions(+), 57 deletions(-)
diff --git a/actor/src/main/scala/org/apache/pekko/util/Collections.scala
b/actor/src/main/scala/org/apache/pekko/util/Collections.scala
index 1fa5003da1..12f9114959 100644
--- a/actor/src/main/scala/org/apache/pekko/util/Collections.scala
+++ b/actor/src/main/scala/org/apache/pekko/util/Collections.scala
@@ -20,6 +20,30 @@ import scala.collection.immutable
* INTERNAL API
*/
private[pekko] object Collections {
+ // Cached function that can be used with PartialFunction.applyOrElse to
ensure that A) the guard is only applied once,
+ // and the caller can check the returned value with Collect.notApplied to
query whether the PF was applied or not.
+ // Prior art:
https://github.com/scala/scala/blob/v2.11.4/src/library/scala/collection/immutable/List.scala#L458
+ private object NotApplied extends (Any => Any) {
+ final override def apply(v1: Any): Any = this
+ }
+
+ implicit class IterableOps[T](val iterable: java.lang.Iterable[T]) extends
AnyVal {
+ def collectToImmutableSeq[R](pf: PartialFunction[T, R]): immutable.Seq[R]
= {
+ val builder = immutable.Seq.newBuilder[R]
+ iterable.forEach((t: T) => {
+ // 1. `applyOrElse` is faster than (`pf.isDefinedAt` and then
`pf.apply`)
+ // 2. using reference comparing here instead of pattern matching can
generate less and quicker bytecode,
+ // eg: just a simple `IF_ACMPNE`, and you can find the same trick in
`CollectWhile` operator.
+ // If you interest, you can check the associated PR for this change
and the
+ // current implementation of
`scala.collection.IterableOnceOps.collectFirst`.
+ pf.applyOrElse(t, NotApplied) match {
+ case _: NotApplied.type => // do nothing
+ case r: R @unchecked => builder += r
+ }
+ })
+ builder.result()
+ }
+ }
case object EmptyImmutableSeq extends immutable.Seq[Nothing] {
override final def iterator = Iterator.empty
diff --git
a/stream/src/main/scala-2.13/org/apache/pekko/stream/javadsl/CollectionUtil.scala
b/stream/src/main/scala-2.13/org/apache/pekko/stream/javadsl/CollectionUtil.scala
deleted file mode 100644
index 56b4a02844..0000000000
---
a/stream/src/main/scala-2.13/org/apache/pekko/stream/javadsl/CollectionUtil.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pekko.stream
-package javadsl
-
-import scala.collection.immutable
-import scala.jdk.CollectionConverters._
-
-import org.apache.pekko
-import pekko.annotation.InternalApi
-
-/**
- * INTERNAL API
- *
- * Utility methods for converting Java collections to Scala collections.
- */
-@InternalApi
-private[javadsl] object CollectionUtil {
- @inline def toSeq[T](jlist: java.util.List[T]): immutable.Seq[T] =
- jlist.asScala.toSeq
-
- @inline def toSeq[T](jiterable: java.lang.Iterable[T]): immutable.Seq[T] =
- jiterable.asScala.toSeq
-}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 1c985575d3..88f40cee84 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -3495,7 +3495,8 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = {
- val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
+ import pekko.util.Collections._
+ val seq = if (those ne null) those.collectToImmutableSeq {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
@@ -3577,7 +3578,8 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = {
- val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
+ import pekko.util.Collections._
+ val seq = if (those ne null) those.collectToImmutableSeq {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index b280ae3a79..bcf53dbbf0 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -465,7 +465,8 @@ object Sink {
sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
fanOutStrategy: function.Function[java.lang.Integer,
Graph[UniformFanOutShape[T, U], NotUsed]])
: Sink[T, java.util.List[M]] = {
- val seq = if (sinks ne null) CollectionUtil.toSeq(sinks).collect {
+ import pekko.util.Collections._
+ val seq = if (sinks ne null) sinks.collectToImmutableSeq {
case sink: Sink[U @unchecked, M @unchecked] => sink.asScala
case other => other
}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index f1bb9e7733..9df1019e60 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -549,7 +549,8 @@ object Source {
rest: java.util.List[Source[T, _ <: Any]],
fanInStrategy: function.Function[java.lang.Integer, _ <:
Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, NotUsed] = {
- val seq = if (rest ne null) CollectionUtil.toSeq(rest).map(_.asScala) else
immutable.Seq()
+ import scala.jdk.CollectionConverters._
+ val seq = if (rest ne null) rest.asScala.map(_.asScala).toSeq else
immutable.Seq()
new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq:
_*)(num => fanInStrategy.apply(num)))
}
@@ -574,7 +575,8 @@ object Source {
sources: java.util.List[_ <: Graph[SourceShape[T], M]],
fanInStrategy: function.Function[java.lang.Integer,
Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, java.util.List[M]] = {
- val seq = if (sources ne null) CollectionUtil.toSeq(sources).collect {
+ import pekko.util.Collections._
+ val seq = if (sources ne null) sources.collectToImmutableSeq {
case source: Source[T @unchecked, M @unchecked] => source.asScala
case other => other
}
@@ -586,7 +588,8 @@ object Source {
* Combine the elements of multiple streams into a stream of lists.
*/
def zipN[T](sources: java.util.List[Source[T, _ <: Any]]):
Source[java.util.List[T], NotUsed] = {
- val seq = if (sources ne null)
CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq()
+ import scala.jdk.CollectionConverters._
+ val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else
immutable.Seq()
new Source(scaladsl.Source.zipN(seq).map(_.asJava))
}
@@ -596,7 +599,8 @@ object Source {
def zipWithN[T, O](
zipper: function.Function[java.util.List[T], O],
sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
- val seq = if (sources ne null)
CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq()
+ import scala.jdk.CollectionConverters._
+ val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else
immutable.Seq()
new Source(scaladsl.Source.zipWithN[T, O](seq =>
zipper.apply(seq.asJava))(seq))
}
@@ -844,9 +848,10 @@ object Source {
def mergePrioritizedN[T](
sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any],
java.lang.Integer]],
eagerComplete: Boolean): javadsl.Source[T, NotUsed] = {
+ import scala.jdk.CollectionConverters._
val seq =
if (sourcesAndPriorities ne null)
- CollectionUtil.toSeq(sourcesAndPriorities).map(pair =>
(pair.first.asScala, pair.second.intValue()))
+ sourcesAndPriorities.asScala.map(pair => (pair.first.asScala,
pair.second.intValue())).toSeq
else
immutable.Seq()
new Source(scaladsl.Source.mergePrioritizedN(seq, eagerComplete))
@@ -1625,7 +1630,8 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): javadsl.Source[Out, Mat] = {
- val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
+ import pekko.util.Collections._
+ val seq = if (those ne null) those.collectToImmutableSeq {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
@@ -1705,7 +1711,8 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): javadsl.Source[Out, Mat] = {
- val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
+ import pekko.util.Collections._
+ val seq = if (those ne null) those.collectToImmutableSeq {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 0525aa9cb7..fa3ee316b1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -2368,7 +2368,8 @@ final class SubFlow[In, Out, Mat](
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): SubFlow[In, Out, Mat] = {
- val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
+ import pekko.util.Collections._
+ val seq = if (those ne null) those.collectToImmutableSeq {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
@@ -2426,7 +2427,8 @@ final class SubFlow[In, Out, Mat](
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): SubFlow[In, Out, Mat] = {
- val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
+ import pekko.util.Collections._
+ val seq = if (those ne null) those.collectToImmutableSeq {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index d93246a5a3..6324c447f9 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -2334,7 +2334,8 @@ final class SubSource[Out, Mat](
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): SubSource[Out, Mat] = {
- val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
+ import pekko.util.Collections._
+ val seq = if (those ne null) those.collectToImmutableSeq {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
@@ -2393,7 +2394,8 @@ final class SubSource[Out, Mat](
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): SubSource[Out, Mat] = {
- val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
+ import pekko.util.Collections._
+ val seq = if (those ne null) those.collectToImmutableSeq {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala
index dd4a64a99f..22fe45c4eb 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala
@@ -42,6 +42,7 @@ import pekko.stream.SystemMaterializer
import pekko.stream.TLSClosing
import pekko.stream.scaladsl
import pekko.util.ByteString
+import pekko.japi.Util.immutableSeq
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
@@ -177,7 +178,7 @@ class Tcp(system: ExtendedActorSystem) extends
pekko.actor.Extension {
idleTimeout: Optional[java.time.Duration]): Source[IncomingConnection,
CompletionStage[ServerBinding]] =
Source.fromGraph(
delegate
- .bind(interface, port, backlog, CollectionUtil.toSeq(options),
halfClose, optionalDurationToScala(idleTimeout))
+ .bind(interface, port, backlog, immutableSeq(options), halfClose,
optionalDurationToScala(idleTimeout))
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava))
@@ -228,7 +229,7 @@ class Tcp(system: ExtendedActorSystem) extends
pekko.actor.Extension {
.outgoingConnection(
remoteAddress,
localAddress.toScala,
- CollectionUtil.toSeq(options),
+ immutableSeq(options),
halfClose,
optionalDurationToScala(connectTimeout),
optionalDurationToScala(idleTimeout))
@@ -291,7 +292,7 @@ class Tcp(system: ExtendedActorSystem) extends
pekko.actor.Extension {
remoteAddress,
createSSLEngine = () => createSSLEngine.create(),
localAddress.toScala,
- CollectionUtil.toSeq(options),
+ immutableSeq(options),
optionalDurationToScala(connectTimeout),
optionalDurationToScala(idleTimeout),
session =>
@@ -342,7 +343,7 @@ class Tcp(system: ExtendedActorSystem) extends
pekko.actor.Extension {
port,
createSSLEngine = () => createSSLEngine.create(),
backlog,
- CollectionUtil.toSeq(options),
+ immutableSeq(options),
optionalDurationToScala(idleTimeout),
session =>
verifySession.apply(session).toScala match {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]