He-Pin commented on code in PR #2365:
URL: https://github.com/apache/pekko/pull/2365#discussion_r2462879139
##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala:
##########
@@ -455,4 +454,86 @@ class FlowStatefulMapSpec extends StreamSpec {
.expectComplete()
}
+ "will not call onComplete twice on cancel when `onComplete` fails" in {
+ val closedCounter = new AtomicInteger(0)
+ val (source, sink) = TestSource()
+ .viaMat(Flow[Int].statefulMap(() => 23)((s, elem) => (s, elem),
+ _ => {
+ closedCounter.incrementAndGet()
+ throw TE("boom")
+ }))(Keep.left)
+ .toMat(TestSink[Int]())(Keep.both)
+ .run()
+
+ EventFilter[TE](occurrences = 1).intercept {
+ sink.request(1)
+ source.sendNext(1)
+ sink.expectNext(1)
+ sink.cancel()
+ source.expectCancellation()
+ }
+ closedCounter.get() should ===(1)
+ }
+
+ "emit onClose return value before restarting" in {
+ val stateCounter = new AtomicInteger(0)
+ val (source, sink) = TestSource[String]()
+ .viaMat(Flow[String].statefulMap(() => stateCounter.incrementAndGet())({
(s, elem) =>
+ if (elem == "boom") throw TE("boom")
+ else (s, elem + s.toString)
+ }, _ => Some("onClose")))(Keep.left)
+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
+ .toMat(TestSink())(Keep.both)
+ .run()
+
+ sink.request(1)
+ source.sendNext("one")
+ sink.expectNext("one1")
+ sink.request(1)
+ source.sendNext("boom")
+ sink.expectNext("onClose")
+ sink.request(1)
+ source.sendNext("two")
+ sink.expectNext("two2")
+ sink.cancel()
+ source.expectCancellation()
+ }
+
+ "not allow null state" in {
+ EventFilter[NullPointerException](occurrences = 1).intercept {
+ Source
+ .single("one")
+ .statefulMap(() => null: String)((s, t) => (s, t), _ => None)
+ .runWith(Sink.head)
+ .failed
+ .futureValue shouldBe a[NullPointerException]
+ }
+ }
+
+ "not allow null next state" in {
+ EventFilter[NullPointerException](occurrences = 1).intercept {
+ Source
+ .single("one")
+ .statefulMap(() => "state")((_, t) => (null, t), _ => None)
+ .runWith(Sink.seq)
+ .failed
+ .futureValue shouldBe a[NullPointerException]
+ }
+ }
+
+ "not allow null state on restart" in {
Review Comment:
I still like to keep the old behavior, we can improve both document and test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]