remyhaemmerle-da opened a new issue, #1525: URL: https://github.com/apache/pekko/issues/1525
On the one hand, [org.apache.pekko.stream.scaladsl.GraphDSL.Implicits.PortOps](https://github.com/apache/pekko/blob/95e8355c4c6f2041587e4d64590764e008d1f2b2/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala#L1839) inherits from [org.apache.pekko.stream.scaladsl.Flow](https://github.com/apache/pekko/blob/95e8355c4c6f2041587e4d64590764e008d1f2b2/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala#L61). As of https://github.com/apache/pekko/pull/591 the implementation of [Flow#zipWithIndex](https://github.com/apache/pekko/blob/95e8355c4c6f2041587e4d64590764e008d1f2b2/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala#L3307) call [Flow#withAttributes](https://github.com/apache/pekko/blob/95e8355c4c6f2041587e4d64590764e008d1f2b2/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala#L3310) ``` def zipWithIndex: Repr[(Out, Long)] = statefulMap(() => 0L)((index, out) => (index + 1L, (out, index)), _ => None) .withAttributes(DefaultAttributes.zipWithIndex) ``` However `PortOpsImpl` the implementation of `PortOps` does not support [`withAttributes`](https://github.com/apache/pekko/blob/95e8355c4c6f2041587e4d64590764e008d1f2b2/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala#L1848) ``` override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported ``` Here is a small snipet that works with pekko 1.0.2 but crashes with 1.1.1 ``` import org.apache.pekko import pekko.actor.ActorSystem import pekko.stream._ import pekko.stream.scaladsl._ object Example extends App { import GraphDSL.Implicits._ implicit val system = ActorSystem("GraphAndShapeExampleSystem") implicit val materializer = ActorMaterializer() val pickMaxOfThree = GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val zip1 = b.add(ZipWith[Int, Int, Int](math.max _)) val zip2 = b.add(ZipWith[Int, Int, Int](math.max _)) zip1.out ~> zip2.in0 UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1) } val resultSink = Sink.foreach(println) val g = RunnableGraph.fromGraph(GraphDSL.createGraph(resultSink) { implicit b => sink => import GraphDSL.Implicits._ // importing the partial graph will return its shape (inlets & outlets) val pm3 = b.add(pickMaxOfThree) Source.single(1) ~> pm3.in(0) Source.single(2) ~> pm3.in(1) Source.single(3) ~> pm3.in(2) pm3.out.zipWithIndex ~> sink.in ClosedShape }) g.run() } ``` <!-- Please report issues regarding specific projects in their respective issue trackers, e.g.: - Pekko HTTP: https://github.com/apache/pekko-http/issues - Pekko Connectors: https://github.com/apache/pekko-connectors/issues - Pekko Persistence Cassandra Plugin: https://github.com/apache/pekko-persistence-cassandra/issues - ... Please explain your issue precisely, and if possible provide a reproducer snippet (this helps resolve issues much quicker). Thanks for contributing! --> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
