This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 2dc8960074 Change Source.fromArray to Source.apply
2dc8960074 is described below
commit 2dc8960074bfe269da1686609eb88663cb50ad8b
Author: Matthew de Detrich <[email protected]>
AuthorDate: Sun Nov 2 22:56:02 2025 +0100
Change Source.fromArray to Source.apply
---
.../pekko/stream/DslFactoriesConsistencySpec.scala | 1 +
.../apache/pekko/stream/scaladsl/SourceSpec.scala | 12 +++++++++
.../org/apache/pekko/stream/javadsl/Source.scala | 2 +-
.../org/apache/pekko/stream/scaladsl/Source.scala | 31 +++++++++++-----------
4 files changed, 29 insertions(+), 17 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
index 74660a1c46..8aa9aee2c2 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
@@ -55,6 +55,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with
Matchers {
("apply" -> "fromGraph") ::
("apply" -> "fromIterator") ::
("apply" -> "fromFunctions") ::
+ ("apply" -> "fromArray") ::
Nil
// format: OFF
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 5c3a8f88ee..50972aaed8 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -62,6 +62,18 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
c.expectComplete()
}
+ "product elements with Array" in {
+ val p = Source(Array(1, 2, 3)).runWith(Sink.asPublisher(false))
+ val c = TestSubscriber.manualProbe[Int]()
+ p.subscribe(c)
+ val sub = c.expectSubscription()
+ sub.request(3)
+ c.expectNext(1)
+ c.expectNext(2)
+ c.expectNext(3)
+ c.expectComplete()
+ }
+
"reject later subscriber" in {
val p = Source.single(1).runWith(Sink.asPublisher(false))
val c1 = TestSubscriber.manualProbe[Int]()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 412af13d34..100eb1d394 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -199,7 +199,7 @@ object Source {
*
* @since 1.1.0
*/
- def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new
Source(scaladsl.Source.fromArray(array))
+ def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new
Source(scaladsl.Source(array))
/**
* Create a `Source` from an `Optional` value, emitting the value if it is
present.
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index e6f587cd2a..5198444a4b 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -405,6 +405,21 @@ object Source {
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
fromGraph(new
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+ /**
+ * Creates a `Source` from an array, if the array is empty, the stream is
completed immediately,
+ * otherwise, every element of the array will be emitted sequentially.
+ *
+ * @since 1.3.0
+ */
+ def apply[T](array: Array[T]): Source[T, NotUsed] = {
+ if (array.length == 0)
+ empty
+ else if (array.length == 1)
+ single(array(0))
+ else
+ Source.fromGraph(new ArraySource[T](array))
+ }
+
/**
* Elements are emitted periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has
requested any elements.
@@ -422,22 +437,6 @@ object Source {
def single[T](element: T): Source[T, NotUsed] =
fromGraph(new GraphStages.SingleSource(element))
- /**
- * Creates a `Source` from an array, if the array is empty, the stream is
completed immediately,
- * otherwise, every element of the array will be emitted sequentially.
- *
- * @since 1.3.0
- */
- def fromArray[T](array: Array[T]): Source[T, NotUsed] = {
- if (array.length == 0) {
- empty
- } else if (array.length == 1) {
- single(array(0))
- } else {
- Source.fromGraph(new ArraySource[T](array))
- }
- }
-
/**
* Create a `Source` from an `Option` value, emitting the value if it is
defined.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]