>From the slick docs:

Execution of the DBIOAction does not start until a Subscriber is attached
to the stream. Only a singleSubscriber is supported, and any further
attempts to subscribe again will fail ...


I'm guessing what you actually want is not to create your publisher once
upon application start and subscribe to that multiple time like you do now
but rather once for every request that comes in to webservice/csv (so
inside the block of the get directive.

-- 
Johan
Akka Team

On Wed, Feb 1, 2017 at 3:27 PM, Mark Goldenstein <[email protected]>
wrote:

> Hi guys!
>
> (Cross-posting this since I did not get a response in the Slick group.)
>
> I use akka-http together with Slick to stream rows from a mysql database
> as csv via http.
>
> Streaming works as expected unless I disconnect the http connection
> prematurely. In this case I would expect that the sql query should stop
> executing since there is no downstream demand anymore. However, I can see
> in the mysql processes that the query is still running and it only stops
> when I shut down the webserver (or when the query finishes fetching all the
> rows).
>
> If I turn on debug logs I see repeatedly, while streaming:
>
> 22:05:17.738 [db-8] DEBUG slick.basic.BasicBackend.stream - Suspending
> streaming action with continuation (more data available)
> 22:05:17.743 [default-akka.actor.default-dispatcher-8] DEBUG
> slick.basic.BasicBackend.stream - Scheduling stream continuation after
> transition from demand = 0
> 22:05:17.743 [db-9] DEBUG slick.basic.BasicBackend.stream - Restarting
>  streaming action, realDemand = 8
>
> Then, once I disconnect the connection, and there is no downstream demand
> anymore, I see:
>
> 22:05:17.744 [db-9] DEBUG slick.basic.BasicBackend.stream - Suspending
> streaming action with continuation (more data available)
> 22:05:17.749 [default-akka.actor.default-dispatcher-7] DEBUG
> akka.io.TcpIncomingConnection - Closing connection due to IO error
> java.io.IOException: Broken pipe
> 22:05:17.774 [default-akka.actor.default-dispatcher-16] DEBUG
> slick.basic.BasicBackend.stream - Scheduling stream continuation after
> transition from demand = 0
> 22:05:17.774 [db-10] DEBUG slick.basic.BasicBackend.stream - Restarting
>  streaming action, realDemand = oo
>
> What is going on here? Why is realDemand oo?
>
> My code looks something like this:
>
> object Main extends App {
>   implicit val system = ActorSystem()
>   implicit val executor = system.dispatcher
>   implicit val materializer = ActorMaterializer()
>
>   implicit val csvMarshaller =
>     Marshaller.withFixedContentType[CSVLine,
> ByteString](ContentTypes.`text/csv(UTF-8)`) {
>       case CSVLine(line) => ByteString(line)
>     }
>
>   implicit val csvStreamingSupport = EntityStreamingSupport.csv()
>     .withParallelMarshalling(parallelism = 8, unordered = false)
>
>   val query = ??? // a Slick query
>
>   val publisher: DatabasePublisher[Tick] = DB.get.stream(
>     query.result.withStatementParameters(statementInit = DB.enableStream))
>
>   val routes = {
>     logRequestResult("webservice") {
>       encodeResponse {
>         pathPrefix("csv") {
>           pathSingleSlash {
>             get {
>               complete {
>                 Source.fromPublisher(publisher).map(t =>
> CSVLine(t.toCSV()))
>               }
>             }
>           }
>         }
>       }
>     }
>   }
>
>   Http().bindAndHandle(routes, "127.0.0.1", 9000)
> }
>
> object DB {
>   private val db = Database.forConfig("db")
>
>   def get = db
>
>   def enableStream(statement: java.sql.Statement): Unit = {
>     statement match {
>       case s if s.isWrapperFor(classOf[com.mysql.jdbc.StatementImpl]) =>
>         s.unwrap(classOf[com.mysql.jdbc.StatementImpl]).enableStream
> ingResults()
>       case _ => // do nothing
>     }
>   }
> }
>
> Any thoughts on this?
>
> Mark
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to