Looks good Petr!
No, we do not have a built-in combinator like this, I can see it being
useful so perhaps it's worth rising an issue on akka/akka
<https://github.com/akka/akka/issues/new>, if you'd have a moment to spare
to explain the use case in a ticket - thanks!

On Sat, May 30, 2015 at 10:09 PM, Petr Janda <petrjand...@gmail.com> wrote:

> Hi Konrad,
>
> thanks for elaboration and advice about io.Source. I've heard that
> io.Source had some design flaws but wasn't sure what it was specifically.
> Will try your suggestion.
>
> Regarding the Flow, what you suggest is very close to what I've end up
> doing, as I've made a Flow which effectively transforms a Sink to Flow like
> this:
>
> object SinkFlow {
>   def apply[T](sink:Sink[T, _]): Flow[T, T, _] = Flow() { implicit b =>
>
>     import akka.stream.scaladsl.FlowGraph.Implicits._
>
>
>     val bcast = b.add(Broadcast[T](2))
>
>
>     bcast.out(0) ~> b.add(sink)
>
>
>     (bcast.in, bcast.out(1))
>   }
> }
>
> which allowed me to connect multiple Sinks into the flow as:
>
> source
>     .via(...)
>     .via(SinkFlow(sink))
>     .to(Sink.onComplete { ... }).run()
>
> I was just wondering if I could have overlooked something more "built in".
>
> Thanks!
> ~Petr
>
> On Friday, 29 May 2015 17:27:48 UTC+1, Konrad Malawski wrote:
>>
>> Hi Petr,
>> Firstly - do not use the io.Source + getLines trick to get lines from a
>> File, it's horribly slow :-)
>> Instead use the SynchronousFileSource*as shown in
>> stream-io.html#Streaming_File_IO
>> <http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-io.html#Streaming_File_IO>
>> .
>> It's much faster and also takes care of closing the File properly in case
>> of completion or failure.
>> You'll want to use the parseLines cookbook recipe for the time being for
>> parsing lines:
>>
>> http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html
>>
>> We already have a built-in lines parser in the works and it will be
>> provided in the next RC of Akka Streams:
>> https://github.com/akka/akka/pull/17446
>>
>> Secondly, more generally speaking in cases like these you can broadcast
>> to 2 sinks, one of them being an onComplete sink and then you
>> can use this sink as the "on completion do this and that" signal.
>>
>>
>> * Synchronous because it's using blocking API, because non-blocking API
>> is not available on Java 6 which Akka Streams have to support currently. We
>> have a ticket for the async file source
>> https://github.com/akka/akka/issues/17269 and it will be provided later
>> on (once Akka Streams join Akka 2.4 with requiring Java 8).
>>
>> On Tue, May 26, 2015 at 4:33 PM, Petr Janda <petrj...@gmail.com> wrote:
>>
>>> Hi guys,
>>>
>>> I was wondering what is the best practice used in Akka Streams to clean
>>> up opened resources. My example use case is the stream reading lines from
>>> the file, streaming them to Apache Kafka (using
>>> https://github.com/softwaremill/reactive-kafka subscriber). See the
>>> example code here:
>>>
>>> val file = io.Source.fromFile(path)
>>> val lines = file.getLines()
>>> val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost =
>>> "localhost:2181")
>>> val subscriber = kafka.publish("uppercaseStrings", "groupName", new
>>> StringEncoder())
>>>
>>> Source(() => lines)
>>>   .map(_.toUpperCase)
>>>   .to(Sink(subscriber))
>>>   .run()
>>>
>>>
>>> As soon as the flow is done I would like to cleanup and close the file
>>> input stream. One way I used to go about that is to have Sink.onComplete or
>>> Sink.fold although this is not viable here. Also, ideally I would like to
>>> close the file in case of any error.
>>>
>>> Could you advice on any idiomatic way to do this?
>>>
>>> Thanks,
>>> ~Petr
>>>
>>>
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Cheers,
>> Konrad 'ktoso' Malawski
>> Akka <http://akka.io/> @ Typesafe <http://typesafe.com/>
>>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
Konrad 'ktoso' Malawski
Akka <http://akka.io/> @ Typesafe <http://typesafe.com/>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to