This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new d2d1902ce4 =str Fold InHandler and OutHandler into GraphStageLogic for 
wiretap.
d2d1902ce4 is described below

commit d2d1902ce40309703b6a40373c810c5fddc2841e
Author: He-Pin <[email protected]>
AuthorDate: Sat Sep 2 21:13:48 2023 +0800

    =str Fold InHandler and OutHandler into GraphStageLogic for wiretap.
---
 .../org/apache/pekko/stream/scaladsl/Graph.scala   | 88 +++++++++++-----------
 1 file changed, 42 insertions(+), 46 deletions(-)

diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
index b3d0ec2591..f5a5b92bfd 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
@@ -710,61 +710,57 @@ private[stream] final class WireTap[T] extends 
GraphStage[FanOutShape2[T, T, T]]
   val in: Inlet[T] = Inlet[T]("WireTap.in")
   val outMain: Outlet[T] = Outlet[T]("WireTap.outMain")
   val outTap: Outlet[T] = Outlet[T]("WireTap.outTap")
-  override def initialAttributes = DefaultAttributes.wireTap
+  override def initialAttributes: Attributes = DefaultAttributes.wireTap
   override val shape: FanOutShape2[T, T, T] = new FanOutShape2(in, outMain, 
outTap)
 
-  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
new GraphStageLogic(shape) {
-    private var pendingTap: Option[T] = None
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      private var pendingTap: Option[T] = None
 
-    setHandler(in,
-      new InHandler {
-        override def onPush() = {
-          val elem = grab(in)
-          push(outMain, elem)
-          if (isAvailable(outTap)) {
-            push(outTap, elem)
-          } else {
-            pendingTap = Some(elem)
-          }
+      override def onPush(): Unit = {
+        val elem = grab(in)
+        push(outMain, elem)
+        if (isAvailable(outTap)) {
+          push(outTap, elem)
+        } else {
+          pendingTap = Some(elem)
         }
-      })
+      }
+      override def onPull(): Unit = {
+        pull(in)
+      }
 
-    setHandler(outMain,
-      new OutHandler {
-        override def onPull() = {
-          pull(in)
-        }
+      override def onDownstreamFinish(cause: Throwable): Unit = {
+        cancelStage(cause)
+      }
 
-        override def onDownstreamFinish(cause: Throwable): Unit = {
-          cancelStage(cause)
-        }
-      })
+      // The 'tap' output can neither backpressure, nor cancel, the stage.
+      setHandler(
+        outTap,
+        new OutHandler {
+          override def onPull() = {
+            pendingTap match {
+              case Some(elem) =>
+                push(outTap, elem)
+                pendingTap = None
+              case None => // no pending element to emit
+            }
+          }
 
-    // The 'tap' output can neither backpressure, nor cancel, the stage.
-    setHandler(
-      outTap,
-      new OutHandler {
-        override def onPull() = {
-          pendingTap match {
-            case Some(elem) =>
-              push(outTap, elem)
-              pendingTap = None
-            case None => // no pending element to emit
+          override def onDownstreamFinish(cause: Throwable): Unit = {
+            setHandler(in,
+              new InHandler {
+                override def onPush() = {
+                  push(outMain, grab(in))
+                }
+              })
+            // Allow any outstanding element to be garbage-collected
+            pendingTap = None
           }
-        }
+        })
 
-        override def onDownstreamFinish(cause: Throwable): Unit = {
-          setHandler(in,
-            new InHandler {
-              override def onPush() = {
-                push(outMain, grab(in))
-              }
-            })
-          // Allow any outstanding element to be garbage-collected
-          pendingTap = None
-        }
-      })
-  }
+      setHandlers(in, outMain, this)
+    }
   override def toString = "WireTap"
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to