This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch 1.5.x-fromIterable in repository https://gitbox.apache.org/repos/asf/pekko.git
commit bf8e4709a6e81a4c4e39de65cf8ea0dfe1c26f31 Author: He-Pin(kerr) <[email protected]> AuthorDate: Sun Dec 14 19:32:33 2025 +0800 chore: Remove CollectionUtil (#2582) (cherry picked from commit f735cca1a5dea740ea79ea643b9470991979cedd) # Conflicts: # stream/src/main/scala-2.13/org/apache/pekko/stream/javadsl/CollectionUtil.scala # stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala # stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala # stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala # stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala # stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala # stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala --- .../scala/org/apache/pekko/util/Collections.scala | 24 +++++++++++++ .../pekko/stream/javadsl/CollectionUtil.scala | 39 ---------------------- .../org/apache/pekko/stream/javadsl/Flow.scala | 6 ++-- .../org/apache/pekko/stream/javadsl/Sink.scala | 3 +- .../org/apache/pekko/stream/javadsl/Source.scala | 19 ++++++----- .../org/apache/pekko/stream/javadsl/SubFlow.scala | 6 ++-- .../apache/pekko/stream/javadsl/SubSource.scala | 6 ++-- .../org/apache/pekko/stream/javadsl/Tcp.scala | 13 ++++---- 8 files changed, 56 insertions(+), 60 deletions(-) diff --git a/actor/src/main/scala/org/apache/pekko/util/Collections.scala b/actor/src/main/scala/org/apache/pekko/util/Collections.scala index 1fa5003da1..12f9114959 100644 --- a/actor/src/main/scala/org/apache/pekko/util/Collections.scala +++ b/actor/src/main/scala/org/apache/pekko/util/Collections.scala @@ -20,6 +20,30 @@ import scala.collection.immutable * INTERNAL API */ private[pekko] object Collections { + // Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once, + // and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not. + // Prior art: https://github.com/scala/scala/blob/v2.11.4/src/library/scala/collection/immutable/List.scala#L458 + private object NotApplied extends (Any => Any) { + final override def apply(v1: Any): Any = this + } + + implicit class IterableOps[T](val iterable: java.lang.Iterable[T]) extends AnyVal { + def collectToImmutableSeq[R](pf: PartialFunction[T, R]): immutable.Seq[R] = { + val builder = immutable.Seq.newBuilder[R] + iterable.forEach((t: T) => { + // 1. `applyOrElse` is faster than (`pf.isDefinedAt` and then `pf.apply`) + // 2. using reference comparing here instead of pattern matching can generate less and quicker bytecode, + // eg: just a simple `IF_ACMPNE`, and you can find the same trick in `CollectWhile` operator. + // If you interest, you can check the associated PR for this change and the + // current implementation of `scala.collection.IterableOnceOps.collectFirst`. + pf.applyOrElse(t, NotApplied) match { + case _: NotApplied.type => // do nothing + case r: R @unchecked => builder += r + } + }) + builder.result() + } + } case object EmptyImmutableSeq extends immutable.Seq[Nothing] { override final def iterator = Iterator.empty diff --git a/stream/src/main/scala-2.13/org/apache/pekko/stream/javadsl/CollectionUtil.scala b/stream/src/main/scala-2.13/org/apache/pekko/stream/javadsl/CollectionUtil.scala deleted file mode 100644 index 1f83183088..0000000000 --- a/stream/src/main/scala-2.13/org/apache/pekko/stream/javadsl/CollectionUtil.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.pekko.stream -package javadsl - -import scala.collection.immutable - -import org.apache.pekko -import pekko.annotation.InternalApi -import pekko.util.ccompat.JavaConverters._ - -/** - * INTERNAL API - * - * Utility methods for converting Java collections to Scala collections. - */ -@InternalApi -private[javadsl] object CollectionUtil { - @inline def toSeq[T](jlist: java.util.List[T]): immutable.Seq[T] = - jlist.asScala.toSeq - - @inline def toSeq[T](jiterable: java.lang.Iterable[T]): immutable.Seq[T] = - jiterable.asScala.toSeq -} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 24e6359300..217c8f2412 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -3684,7 +3684,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], segmentSize: Int, eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = { - val seq = if (those != null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -3766,7 +3767,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def mergeAll( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = { - val seq = if (those != null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index ead3f1a28f..d7378def02 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -521,7 +521,8 @@ object Sink { sinks: java.util.List[_ <: Graph[SinkShape[U], M]], fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]]) : Sink[T, java.util.List[M]] = { - val seq = if (sinks != null) CollectionUtil.toSeq(sinks).collect { + import pekko.util.Collections._ + val seq = if (sinks ne null) sinks.collectToImmutableSeq { case sink: Sink[U @unchecked, M @unchecked] => sink.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 5cc5401219..2ee0cc7ade 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -745,7 +745,7 @@ object Source { @deprecatedName(Symbol("strategy")) fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]]) : Source[U, NotUsed] = { - val seq = if (rest != null) CollectionUtil.toSeq(rest).map(_.asScala) else immutable.Seq() + val seq = if (rest ne null) rest.asScala.map(_.asScala).toSeq else immutable.Seq() new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num => fanInStrategy.apply(num))) } @@ -772,7 +772,8 @@ object Source { sources: java.util.List[_ <: Graph[SourceShape[T], M]], fanInStrategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], NotUsed]]) : Source[U, java.util.List[M]] = { - val seq = if (sources != null) CollectionUtil.toSeq(sources).collect { + import pekko.util.Collections._ + val seq = if (sources ne null) sources.collectToImmutableSeq { case source: Source[T @unchecked, M @unchecked] => source.asScala case other => other } @@ -784,7 +785,7 @@ object Source { * Combine the elements of multiple streams into a stream of lists. */ def zipN[T](sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = { - val seq = if (sources != null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq() + val seq = if (sources ne null) sources.asScala.map(_.asScala).toVector else immutable.Seq() new Source(scaladsl.Source.zipN(seq).map(_.asJava)) } @@ -794,7 +795,7 @@ object Source { def zipWithN[T, O]( zipper: function.Function[java.util.List[T], O], sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = { - val seq = if (sources != null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq() + val seq = if (sources ne null) sources.asScala.map(_.asScala).toVector else immutable.Seq() new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq)) } @@ -1039,8 +1040,8 @@ object Source { sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], java.lang.Integer]], eagerComplete: Boolean): javadsl.Source[T, NotUsed] = { val seq = - if (sourcesAndPriorities != null) - CollectionUtil.toSeq(sourcesAndPriorities).map(pair => (pair.first.asScala, pair.second.intValue())) + if (sourcesAndPriorities ne null) + sourcesAndPriorities.asScala.map(pair => (pair.first.asScala, pair.second.intValue())).toVector else immutable.Seq() new Source(scaladsl.Source.mergePrioritizedN(seq, eagerComplete)) @@ -1820,7 +1821,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], segmentSize: Int, eagerClose: Boolean): javadsl.Source[Out, Mat] = { - val seq = if (those != null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -1900,7 +1902,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def mergeAll( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], eagerComplete: Boolean): javadsl.Source[Out, Mat] = { - val seq = if (those != null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 9127e9323f..08c993f015 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -2491,7 +2491,8 @@ class SubFlow[In, Out, Mat]( def mergeAll( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], eagerComplete: Boolean): SubFlow[In, Out, Mat] = { - val seq = if (those != null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -2549,7 +2550,8 @@ class SubFlow[In, Out, Mat]( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], segmentSize: Int, eagerClose: Boolean): SubFlow[In, Out, Mat] = { - val seq = if (those != null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 0f798dc2dd..490ccba4e0 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -2455,7 +2455,8 @@ class SubSource[Out, Mat]( def mergeAll( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], eagerComplete: Boolean): SubSource[Out, Mat] = { - val seq = if (those != null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -2514,7 +2515,8 @@ class SubSource[Out, Mat]( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], segmentSize: Int, eagerClose: Boolean): SubSource[Out, Mat] = { - val seq = if (those != null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala index fed4044733..af0c5de0c5 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala @@ -36,6 +36,7 @@ import pekko.actor.ExtendedActorSystem import pekko.actor.ExtensionId import pekko.actor.ExtensionIdProvider import pekko.annotation.InternalApi +import pekko.japi.Util.immutableSeq import pekko.io.Inet.SocketOption import pekko.stream.Materializer import pekko.stream.SystemMaterializer @@ -181,7 +182,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { idleTimeout: Optional[java.time.Duration]): Source[IncomingConnection, CompletionStage[ServerBinding]] = Source.fromGraph( delegate - .bind(interface, port, backlog, CollectionUtil.toSeq(options), halfClose, optionalDurationToScala(idleTimeout)) + .bind(interface, port, backlog, immutableSeq(options), halfClose, optionalDurationToScala(idleTimeout)) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava)) @@ -263,7 +264,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { .outgoingConnection( remoteAddress, localAddress.toScala, - CollectionUtil.toSeq(options), + immutableSeq(options), halfClose, optionalDurationToScala(connectTimeout), optionalDurationToScala(idleTimeout)) @@ -368,7 +369,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { sslContext, negotiateNewSession, localAddress.toScala, - CollectionUtil.toSeq(options), + immutableSeq(options), connectTimeout, idleTimeout) .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava)) @@ -416,7 +417,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { remoteAddress, createSSLEngine = () => createSSLEngine.get(), localAddress.toScala, - CollectionUtil.toSeq(options), + immutableSeq(options), optionalDurationToScala(connectTimeout), optionalDurationToScala(idleTimeout), session => @@ -453,7 +454,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { idleTimeout: Duration): Source[IncomingConnection, CompletionStage[ServerBinding]] = Source.fromGraph( delegate - .bindTls(interface, port, sslContext, negotiateNewSession, backlog, CollectionUtil.toSeq(options), idleTimeout) + .bindTls(interface, port, sslContext, negotiateNewSession, backlog, immutableSeq(options), idleTimeout) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava)) @@ -517,7 +518,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { port, createSSLEngine = () => createSSLEngine.get(), backlog, - CollectionUtil.toSeq(options), + immutableSeq(options), optionalDurationToScala(idleTimeout), session => verifySession.apply(session).toScala match { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
