This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch collectionUitl in repository https://gitbox.apache.org/repos/asf/pekko.git
commit c5261f4786a0be2e2cedb721597a8ea43d7d5f99 Author: He-Pin <[email protected]> AuthorDate: Sun Dec 14 16:06:21 2025 +0800 chore: Remove CollectionUtil --- .../scala/org/apache/pekko/util/Collections.scala | 24 +++++++++++++ .../pekko/stream/javadsl/CollectionUtil.scala | 39 ---------------------- .../org/apache/pekko/stream/javadsl/Flow.scala | 6 ++-- .../org/apache/pekko/stream/javadsl/Sink.scala | 3 +- .../org/apache/pekko/stream/javadsl/Source.scala | 21 ++++++++---- .../org/apache/pekko/stream/javadsl/SubFlow.scala | 6 ++-- .../apache/pekko/stream/javadsl/SubSource.scala | 6 ++-- .../org/apache/pekko/stream/javadsl/Tcp.scala | 9 ++--- 8 files changed, 57 insertions(+), 57 deletions(-) diff --git a/actor/src/main/scala/org/apache/pekko/util/Collections.scala b/actor/src/main/scala/org/apache/pekko/util/Collections.scala index 1fa5003da1..12f9114959 100644 --- a/actor/src/main/scala/org/apache/pekko/util/Collections.scala +++ b/actor/src/main/scala/org/apache/pekko/util/Collections.scala @@ -20,6 +20,30 @@ import scala.collection.immutable * INTERNAL API */ private[pekko] object Collections { + // Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once, + // and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not. + // Prior art: https://github.com/scala/scala/blob/v2.11.4/src/library/scala/collection/immutable/List.scala#L458 + private object NotApplied extends (Any => Any) { + final override def apply(v1: Any): Any = this + } + + implicit class IterableOps[T](val iterable: java.lang.Iterable[T]) extends AnyVal { + def collectToImmutableSeq[R](pf: PartialFunction[T, R]): immutable.Seq[R] = { + val builder = immutable.Seq.newBuilder[R] + iterable.forEach((t: T) => { + // 1. `applyOrElse` is faster than (`pf.isDefinedAt` and then `pf.apply`) + // 2. using reference comparing here instead of pattern matching can generate less and quicker bytecode, + // eg: just a simple `IF_ACMPNE`, and you can find the same trick in `CollectWhile` operator. + // If you interest, you can check the associated PR for this change and the + // current implementation of `scala.collection.IterableOnceOps.collectFirst`. + pf.applyOrElse(t, NotApplied) match { + case _: NotApplied.type => // do nothing + case r: R @unchecked => builder += r + } + }) + builder.result() + } + } case object EmptyImmutableSeq extends immutable.Seq[Nothing] { override final def iterator = Iterator.empty diff --git a/stream/src/main/scala-2.13/org/apache/pekko/stream/javadsl/CollectionUtil.scala b/stream/src/main/scala-2.13/org/apache/pekko/stream/javadsl/CollectionUtil.scala deleted file mode 100644 index 56b4a02844..0000000000 --- a/stream/src/main/scala-2.13/org/apache/pekko/stream/javadsl/CollectionUtil.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.pekko.stream -package javadsl - -import scala.collection.immutable -import scala.jdk.CollectionConverters._ - -import org.apache.pekko -import pekko.annotation.InternalApi - -/** - * INTERNAL API - * - * Utility methods for converting Java collections to Scala collections. - */ -@InternalApi -private[javadsl] object CollectionUtil { - @inline def toSeq[T](jlist: java.util.List[T]): immutable.Seq[T] = - jlist.asScala.toSeq - - @inline def toSeq[T](jiterable: java.lang.Iterable[T]): immutable.Seq[T] = - jiterable.asScala.toSeq -} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 1c985575d3..88f40cee84 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -3495,7 +3495,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], segmentSize: Int, eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = { - val seq = if (those ne null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -3577,7 +3578,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def mergeAll( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = { - val seq = if (those ne null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index b280ae3a79..bcf53dbbf0 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -465,7 +465,8 @@ object Sink { sinks: java.util.List[_ <: Graph[SinkShape[U], M]], fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]]) : Sink[T, java.util.List[M]] = { - val seq = if (sinks ne null) CollectionUtil.toSeq(sinks).collect { + import pekko.util.Collections._ + val seq = if (sinks ne null) sinks.collectToImmutableSeq { case sink: Sink[U @unchecked, M @unchecked] => sink.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index f1bb9e7733..9df1019e60 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -549,7 +549,8 @@ object Source { rest: java.util.List[Source[T, _ <: Any]], fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]]) : Source[U, NotUsed] = { - val seq = if (rest ne null) CollectionUtil.toSeq(rest).map(_.asScala) else immutable.Seq() + import scala.jdk.CollectionConverters._ + val seq = if (rest ne null) rest.asScala.map(_.asScala).toSeq else immutable.Seq() new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num => fanInStrategy.apply(num))) } @@ -574,7 +575,8 @@ object Source { sources: java.util.List[_ <: Graph[SourceShape[T], M]], fanInStrategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], NotUsed]]) : Source[U, java.util.List[M]] = { - val seq = if (sources ne null) CollectionUtil.toSeq(sources).collect { + import pekko.util.Collections._ + val seq = if (sources ne null) sources.collectToImmutableSeq { case source: Source[T @unchecked, M @unchecked] => source.asScala case other => other } @@ -586,7 +588,8 @@ object Source { * Combine the elements of multiple streams into a stream of lists. */ def zipN[T](sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = { - val seq = if (sources ne null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq() + import scala.jdk.CollectionConverters._ + val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else immutable.Seq() new Source(scaladsl.Source.zipN(seq).map(_.asJava)) } @@ -596,7 +599,8 @@ object Source { def zipWithN[T, O]( zipper: function.Function[java.util.List[T], O], sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = { - val seq = if (sources ne null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq() + import scala.jdk.CollectionConverters._ + val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else immutable.Seq() new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq)) } @@ -844,9 +848,10 @@ object Source { def mergePrioritizedN[T]( sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], java.lang.Integer]], eagerComplete: Boolean): javadsl.Source[T, NotUsed] = { + import scala.jdk.CollectionConverters._ val seq = if (sourcesAndPriorities ne null) - CollectionUtil.toSeq(sourcesAndPriorities).map(pair => (pair.first.asScala, pair.second.intValue())) + sourcesAndPriorities.asScala.map(pair => (pair.first.asScala, pair.second.intValue())).toSeq else immutable.Seq() new Source(scaladsl.Source.mergePrioritizedN(seq, eagerComplete)) @@ -1625,7 +1630,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], segmentSize: Int, eagerClose: Boolean): javadsl.Source[Out, Mat] = { - val seq = if (those ne null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -1705,7 +1711,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def mergeAll( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], eagerComplete: Boolean): javadsl.Source[Out, Mat] = { - val seq = if (those ne null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 0525aa9cb7..fa3ee316b1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -2368,7 +2368,8 @@ final class SubFlow[In, Out, Mat]( def mergeAll( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], eagerComplete: Boolean): SubFlow[In, Out, Mat] = { - val seq = if (those ne null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -2426,7 +2427,8 @@ final class SubFlow[In, Out, Mat]( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], segmentSize: Int, eagerClose: Boolean): SubFlow[In, Out, Mat] = { - val seq = if (those ne null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index d93246a5a3..6324c447f9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -2334,7 +2334,8 @@ final class SubSource[Out, Mat]( def mergeAll( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], eagerComplete: Boolean): SubSource[Out, Mat] = { - val seq = if (those ne null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -2393,7 +2394,8 @@ final class SubSource[Out, Mat]( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], segmentSize: Int, eagerClose: Boolean): SubSource[Out, Mat] = { - val seq = if (those ne null) CollectionUtil.toSeq(those).collect { + import pekko.util.Collections._ + val seq = if (those ne null) those.collectToImmutableSeq { case source: Source[Out @unchecked, _] => source.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala index dd4a64a99f..22fe45c4eb 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala @@ -42,6 +42,7 @@ import pekko.stream.SystemMaterializer import pekko.stream.TLSClosing import pekko.stream.scaladsl import pekko.util.ByteString +import pekko.japi.Util.immutableSeq object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { @@ -177,7 +178,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { idleTimeout: Optional[java.time.Duration]): Source[IncomingConnection, CompletionStage[ServerBinding]] = Source.fromGraph( delegate - .bind(interface, port, backlog, CollectionUtil.toSeq(options), halfClose, optionalDurationToScala(idleTimeout)) + .bind(interface, port, backlog, immutableSeq(options), halfClose, optionalDurationToScala(idleTimeout)) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava)) @@ -228,7 +229,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { .outgoingConnection( remoteAddress, localAddress.toScala, - CollectionUtil.toSeq(options), + immutableSeq(options), halfClose, optionalDurationToScala(connectTimeout), optionalDurationToScala(idleTimeout)) @@ -291,7 +292,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { remoteAddress, createSSLEngine = () => createSSLEngine.create(), localAddress.toScala, - CollectionUtil.toSeq(options), + immutableSeq(options), optionalDurationToScala(connectTimeout), optionalDurationToScala(idleTimeout), session => @@ -342,7 +343,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { port, createSSLEngine = () => createSSLEngine.create(), backlog, - CollectionUtil.toSeq(options), + immutableSeq(options), optionalDurationToScala(idleTimeout), session => verifySession.apply(session).toScala match { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
