I wrote a simple test case using a graph rather than using src.runForEach.  

"CipherStage in graph" should "work" in {

  val clearText = "0123456789abcdef"
  val clearSource = Source.single(ByteString(clearText))
  val encryptedOut = Sink.head[ByteString]

  val encodedKey: String = "KCl02Tjzsid09VnDl6CDpDlnm4G4VUJr8l6PNg+MHkQ="
  val decodedKey = Base64.getDecoder.decode(encodedKey)
  val key = new SecretKeySpec(decodedKey, 0, decodedKey.length, "AES")

  val encodedIv: String = "AAAAAAAAAAAAAAAAAAAAAA=="
  val decodedIv = Base64.getDecoder.decode(encodedIv)
  val iv = new IvParameterSpec(decodedIv)

  val graph = GraphDSL.create(encryptedOut) { implicit builder => encOut =>
    import GraphDSL.Implicits._

    val in: Source[ByteString, Any] = clearSource
    val encryptor = new CipherStage(key, iv, Cipher.ENCRYPT_MODE)

    in ~> encryptor ~> encOut

  val rg = RunnableGraph.fromGraph[Future[ByteString]](graph)
  implicit val system = ActorSystem("test")
  implicit val materializer = ActorMaterializer()

  val blkOutFuture = rg.run()

  implicit val ec = system.dispatcher

  whenReady(blkOutFuture) { blkOut =>
    blkOut should be (encrypt(key, iv, clearText.getBytes))
This test fails.  However, when I run this test case, I get the following stack 

[ERROR] [10/07/2016 12:40:03.541] [test-akka.actor.default-dispatcher-3] 
[akka://test/user/StreamSupervisor-0/flow-0-0-unknown-operation] Error in stage 
[com.genecloud.blockstore.CipherStage@3e77bd3f]: requirement failed: Cannot 
push port (Encryptor.out) twice
java.lang.IllegalArgumentException: requirement failed: Cannot push port 
(Encryptor.out) twice
        at scala.Predef$.require(Predef.scala:224)
        at akka.stream.stage.GraphStageLogic.push(GraphStage.scala:459)
        at akka.actor.Actor$class.aroundPreStart(Actor.scala:489)
        at akka.actor.ActorCell.create(ActorCell.scala:590)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
        at akka.dispatch.Mailbox.run(Mailbox.scala:223)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

I don’t see that I am pushing twice.  Here is the current definition of 

class CipherStage(key: SecretKey, iv: IvParameterSpec, mode: Int) extends 
GraphStage[FlowShape[ByteString, ByteString]] {
  val in = Inlet[ByteString]("Encryptor.in")
  val out = Outlet[ByteString]("Encryptor.out")
  override val shape = FlowShape.of(in, out)

  val log = Logger(LoggerFactory.getLogger("CipherStage"))

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
new GraphStageLogic(shape) {
    val cipher: Cipher = Cipher.getInstance("AES/CBC/PKCS5Padding")
    cipher.init(mode, key, iv)

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {

    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        val chunk = grab(in)
        log.info(s"got chunk: ${chunk}")
        push(out, ByteString(cipher.update(chunk.toArray)))

      override def onUpstreamFinish(): Unit = {
        val chunk = cipher.doFinal()
        log.info(s"got final chunk: ${chunk}")
        push(out, ByteString(chunk))

      override def onUpstreamFailure(ex: Throwable): Unit = {

In this graph (which is simpler than my original graph, which combined 
digesting and encryption), onUpstreamFinish *is* being called, it just results 
in an exception on the “push” call.  I absolutely need to be able to “push” 
during onStreamFinish, since with AES/CBC encryption, there are more stream 
elements after the last input that must be emitted.

Anyone see what is wrong?  — Eric

