Hi Johan, Thank you for the quick response! Indeed, in my application I do create the DatabasePublisher anew for each request. This is a mistake I introduced when I stripped down the code for this forum post. Sorry about this.
The problem I have is that even when the http connection is closed, the database query still continues streaming. I would assume it should stop since there is no downstream demand anymore when the connection closes... I am not sure whether this is a problem related to slick or if my streaming setup is flawed. Mark On Thursday, February 2, 2017 at 12:18:38 AM UTC+1, Akka Team wrote: > > From the slick docs: > > Execution of the DBIOAction does not start until a Subscriber is attached > to the stream. Only a singleSubscriber is supported, and any further > attempts to subscribe again will fail ... > > > I'm guessing what you actually want is not to create your publisher once > upon application start and subscribe to that multiple time like you do now > but rather once for every request that comes in to webservice/csv (so > inside the block of the get directive. > > -- > Johan > Akka Team > > On Wed, Feb 1, 2017 at 3:27 PM, Mark Goldenstein <[email protected] > <javascript:>> wrote: > >> Hi guys! >> >> (Cross-posting this since I did not get a response in the Slick group.) >> >> I use akka-http together with Slick to stream rows from a mysql database >> as csv via http. >> >> Streaming works as expected unless I disconnect the http connection >> prematurely. In this case I would expect that the sql query should stop >> executing since there is no downstream demand anymore. However, I can see >> in the mysql processes that the query is still running and it only stops >> when I shut down the webserver (or when the query finishes fetching all the >> rows). >> >> If I turn on debug logs I see repeatedly, while streaming: >> >> 22:05:17.738 [db-8] DEBUG slick.basic.BasicBackend.stream - Suspending >> streaming action with continuation (more data available) >> 22:05:17.743 [default-akka.actor.default-dispatcher-8] DEBUG >> slick.basic.BasicBackend.stream - Scheduling stream continuation after >> transition from demand = 0 >> 22:05:17.743 [db-9] DEBUG slick.basic.BasicBackend.stream - Restarting >> streaming action, realDemand = 8 >> >> Then, once I disconnect the connection, and there is no downstream demand >> anymore, I see: >> >> 22:05:17.744 [db-9] DEBUG slick.basic.BasicBackend.stream - Suspending >> streaming action with continuation (more data available) >> 22:05:17.749 [default-akka.actor.default-dispatcher-7] DEBUG >> akka.io.TcpIncomingConnection - Closing connection due to IO error >> java.io.IOException: Broken pipe >> 22:05:17.774 [default-akka.actor.default-dispatcher-16] DEBUG >> slick.basic.BasicBackend.stream - Scheduling stream continuation after >> transition from demand = 0 >> 22:05:17.774 [db-10] DEBUG slick.basic.BasicBackend.stream - Restarting >> streaming action, realDemand = oo >> >> What is going on here? Why is realDemand oo? >> >> My code looks something like this: >> >> object Main extends App { >> implicit val system = ActorSystem() >> implicit val executor = system.dispatcher >> implicit val materializer = ActorMaterializer() >> >> implicit val csvMarshaller = >> Marshaller.withFixedContentType[CSVLine, >> ByteString](ContentTypes.`text/csv(UTF-8)`) { >> case CSVLine(line) => ByteString(line) >> } >> >> implicit val csvStreamingSupport = EntityStreamingSupport.csv() >> .withParallelMarshalling(parallelism = 8, unordered = false) >> >> val query = ??? // a Slick query >> >> val publisher: DatabasePublisher[Tick] = DB.get.stream( >> query.result.withStatementParameters(statementInit = DB.enableStream)) >> >> val routes = { >> logRequestResult("webservice") { >> encodeResponse { >> pathPrefix("csv") { >> pathSingleSlash { >> get { >> complete { >> Source.fromPublisher(publisher).map(t => >> CSVLine(t.toCSV())) >> } >> } >> } >> } >> } >> } >> } >> >> Http().bindAndHandle(routes, "127.0.0.1", 9000) >> } >> >> object DB { >> private val db = Database.forConfig("db") >> >> def get = db >> >> def enableStream(statement: java.sql.Statement): Unit = { >> statement match { >> case s if s.isWrapperFor(classOf[com.mysql.jdbc.StatementImpl]) => >> >> s.unwrap(classOf[com.mysql.jdbc.StatementImpl]).enableStreamingResults() >> case _ => // do nothing >> } >> } >> } >> >> Any thoughts on this? >> >> Mark >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected] <javascript:>. >> To post to this group, send email to [email protected] >> <javascript:>. >> Visit this group at https://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > > > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
