He-Pin commented on code in PR #1669:
URL: https://github.com/apache/pekko/pull/1669#discussion_r1902068068
##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala:
##########
@@ -77,6 +78,48 @@ import pekko.util.ccompat._
}
}
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object ZipWithIndex extends
GraphStage[FlowShape[Any, (Any, Long)]] {
+ val in = Inlet[Any]("ZipWithIndex.in")
+ val out = Outlet[(Any, Long)]("ZipWithIndex.out")
+ override val shape = FlowShape(in, out)
+ override def initialAttributes: Attributes = DefaultAttributes.zipWithIndex
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ private var index = 0L
+ override def onPush(): Unit = {
+ push(out, (grab(in), index))
+ index += 1
+ }
+
+ override def onPull(): Unit = pull(in)
+ setHandlers(in, out, this)
+ }
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object ZipWithIndexJava extends
GraphStage[FlowShape[Any, Pair[Any, Long]]] {
+ val in = Inlet[Any]("ZipWithIndex.in")
+ val out = Outlet[Pair[Any, Long]]("ZipWithIndex.out")
+ override val shape = FlowShape(in, out)
+ override def initialAttributes: Attributes = DefaultAttributes.zipWithIndex
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ private var index = 0L
+ override def onPush(): Unit = {
+ push(out, new Pair(grab(in), index))
+ index += 1
+ }
+
+ override def onPull(): Unit = pull(in)
+ setHandlers(in, out, this)
+ }
+}
Review Comment:
As these operators are pretty common, so I think it would be better to be
implemented with dedicated graphs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]