[akka-user] Re: [akka-streams] Generic streams and abstract types

2017-07-12 Thread Jeff
Thanks for the great suggestions - I eventually came to the "custom tuple" 
solution myself and it seems to work well. 

As for the issue of complexity, it's actually not as complex as it sounds. 
I'm using Http().superPool() to make api requests and I wanted to avoid 
having to create a separate stream for every single iteration of api 
request when the only thing that changed was the Unmarshaller. Instead of 
materializing multiple streams where the only thing that changed was the 
Sink, I just created one stream where the Sink.foreach(...) take the 
Unmarshaller function and resolves the Promise. 

On Wednesday, July 12, 2017 at 5:24:07 AM UTC-7, johannes...@lightbend.com 
wrote:
>
> Hi Jeff,
>
> your API seems quite complex. I don't know the purpose of it so I cannot 
> suggest anything but I'd try to simplify. :)
>
> That said, your problem seems to be that you cannot write a concrete type 
> that would express the dependency between the two components of the tuple 
> `(RequestBuilder, Promise[???])`. There are two ways to solve it:
>
> 1) make `Out` a type parameter and convert `sink` to a `def sink[Out]`, 
> then you can use the tuple `(RequestBuilder[Out], Promise[Out])`
> 2) create your custom tuple type that allows to express the dependency:
>
> case class BuilderWithPromise[O](builder: RequestBuilder { type Out = O }, 
> promise: Promise[O])
>
> and then use it as MergeHub.source[BuilderWithPromise[_]]
>
> But I can only repeat that getting rid of the dependent types if possible 
> is usually the best solution ;)
>
> Johannes
>
> On Thursday, July 6, 2017 at 11:23:50 PM UTC+2, Jeff wrote:
>>
>> Here is a strawman program which illustrates the issue I am having
>>
>> trait RequestBuilder {
>>   type Out
>>
>>   def complete(p: Promise[Out]): Unit
>> }
>>
>> def makeRequest(in: RequestBuilder): Source[(RequestBuilder, 
>> Promise[in.Out]), Future[in.Out]] = {
>>   val p = Promise[in.Out]
>>
>>   Source.single(in -> p).mapMaterializedValue(_ => p.future)
>> }
>>
>> val sink = MergeHub.source[(RequestBuilder, Promise[???])].to(Sink.foreach {
>>   case (r, p) => r.complete(p)
>> }).run()
>>
>> sink.runWith(makeRequest(new RequestBuilder {
>>   type Out = Int
>>
>>   def complete(p: Promise[Out]): Unit = p.success(1)
>> }))
>>
>>
>> The issue is, how do I type the Promise[???]  in the sink? I have been 
>> able to work around this by making the Promise a part of the RequestBuilder 
>> trait itself, but this seems like a code smell to me
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-streams] Generic streams and abstract types

2017-07-12 Thread johannes . rudolph
Hi Jeff,

your API seems quite complex. I don't know the purpose of it so I cannot 
suggest anything but I'd try to simplify. :)

That said, your problem seems to be that you cannot write a concrete type 
that would express the dependency between the two components of the tuple 
`(RequestBuilder, Promise[???])`. There are two ways to solve it:

1) make `Out` a type parameter and convert `sink` to a `def sink[Out]`, 
then you can use the tuple `(RequestBuilder[Out], Promise[Out])`
2) create your custom tuple type that allows to express the dependency:

case class BuilderWithPromise[O](builder: RequestBuilder { type Out = O }, 
promise: Promise[O])

and then use it as MergeHub.source[BuilderWithPromise[_]]

But I can only repeat that getting rid of the dependent types if possible 
is usually the best solution ;)

Johannes

On Thursday, July 6, 2017 at 11:23:50 PM UTC+2, Jeff wrote:
>
> Here is a strawman program which illustrates the issue I am having
>
> trait RequestBuilder {
>   type Out
>
>   def complete(p: Promise[Out]): Unit
> }
>
> def makeRequest(in: RequestBuilder): Source[(RequestBuilder, 
> Promise[in.Out]), Future[in.Out]] = {
>   val p = Promise[in.Out]
>
>   Source.single(in -> p).mapMaterializedValue(_ => p.future)
> }
>
> val sink = MergeHub.source[(RequestBuilder, Promise[???])].to(Sink.foreach {
>   case (r, p) => r.complete(p)
> }).run()
>
> sink.runWith(makeRequest(new RequestBuilder {
>   type Out = Int
>
>   def complete(p: Promise[Out]): Unit = p.success(1)
> }))
>
>
> The issue is, how do I type the Promise[???]  in the sink? I have been 
> able to work around this by making the Promise a part of the RequestBuilder 
> trait itself, but this seems like a code smell to me
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Akka Camel and Akka Streams

2017-07-12 Thread Konrad “ktoso” Malawski
And it is listed under Alpakka projects too :-)
http://developer.lightbend.com/docs/alpakka/current/external-connectors.html#camel


— Konrad


On 12 July 2017 at 15:06:01, 'Martin Krasser' via Akka User List (
akka-user@googlegroups.com) wrote:

Hi Andreas,

the replacement for akka-camel is the Streamz
 project with its Camel DSL for Akka
Streams
.
It allows you to use any Camel component
 with Akka Streams for both
inbound and outbound message exchanges. There is a Scala and Java API.

Best Regards,
Martin

Am Dienstag, 11. Juli 2017 14:26:09 UTC+2 schrieb Andreas Gies:
>
> Hello HAkkers,
>
> we are maintaining an integration framework (OSGi) project built on top of
> ActiveMQ, Spray and Camel implemented in Scala [1]
>
> Most of our internal API's rely on Akka and some also on the Akka-Camel
> integration.
>
>
> With the next major release we plan to upgrade our Spray routes to
> Akka-Http. Now that I have started looking at the concrete
> steps I have noticed that Akka-Camel is also deprecated and to be replaced
> with alpakka.
>
> I had a look through the Alpakka project. So far I haven't found the
> replacement for the Akka-Camel efforts in there.
> Perhaps I have overlooked something or is the replacement just on the
> roadmap ?
>
> Also I did have a look at the JMS part of Alpakka. It seems that this
> implementation currently only supports TextMessages
> and ignores any properties within the message. If I understand the
> implementation correctly, it would fail the stream in case
> of any JMSException and also when the buffer has been filled up ?
>
> On a broader level I was wondering if the implementation should be in the
> form that the inbound stream fails if and only if
> the connection is irrecoverably broken and in other cases the Stream
> should reconnect transparently.
>
> Also on a broader level I have noticed, that the messages are acknowledged
> as soon as they pushed. Coming from a JMS
> background that feels a bit strange to me, but that might be because I am
> unfamiliar with the streaming API. In our world
> a message is normally acknowledged when it has been processed successfully
> (which is normally it has been written to
> the file system, forwarded to another JMS destination or triggered some
> execution in the database).
>
> If the container crashes before it has acknowledged the message, the
> message will be redelivered. In cases we encounter
> an error we pass the message to an error handling destination or a retry
> destination.
>
>
> Apparently the architecture and the acceptance level of message loss
> changes when switching to a streaming approach.
>
>
> For now I have some concrete questions:
>
> 1) Have I missed the Camel replacement in Alpakka and if so, where is it
> located within Alpakka ?
> 2) How are others coping with the "window of potential message loss" when
> migrating from pure JMS flows to streams ?
> 3) Any pointers to good hands-on white papers are much appreciated. I have
> read through some and also through most
> of the streams documentation, but I guess I need to get my hands dirty
> ...
> 4) I don't dare to ask, but if anyone is using Akka / AkkaHttp and / or
> AkkaStream within OSGi I would be more than happy
> to exchange experiences & ideas.
>
> [1] https://github.com/woq-blended/blended
>
>
> Thanks in advance for your attention
> Best regards
> Andreas
>
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka Camel and Akka Streams

2017-07-12 Thread 'Martin Krasser' via Akka User List
Hi Andreas,

the replacement for akka-camel is the Streamz 
 project with its Camel DSL for Akka 
Streams 
. 
It allows you to use any Camel component 
 with Akka Streams for both 
inbound and outbound message exchanges. There is a Scala and Java API. 

Best Regards,
Martin

Am Dienstag, 11. Juli 2017 14:26:09 UTC+2 schrieb Andreas Gies:
>
> Hello HAkkers, 
>
> we are maintaining an integration framework (OSGi) project built on top of 
> ActiveMQ, Spray and Camel implemented in Scala [1]
>
> Most of our internal API's rely on Akka and some also on the Akka-Camel 
> integration. 
>
>
> With the next major release we plan to upgrade our Spray routes to 
> Akka-Http. Now that I have started looking at the concrete 
> steps I have noticed that Akka-Camel is also deprecated and to be replaced 
> with alpakka. 
>
> I had a look through the Alpakka project. So far I haven't found the 
> replacement for the Akka-Camel efforts in there. 
> Perhaps I have overlooked something or is the replacement just on the 
> roadmap ? 
>
> Also I did have a look at the JMS part of Alpakka. It seems that this 
> implementation currently only supports TextMessages 
> and ignores any properties within the message. If I understand the 
> implementation correctly, it would fail the stream in case 
> of any JMSException and also when the buffer has been filled up ?
>
> On a broader level I was wondering if the implementation should be in the 
> form that the inbound stream fails if and only if 
> the connection is irrecoverably broken and in other cases the Stream 
> should reconnect transparently. 
>
> Also on a broader level I have noticed, that the messages are acknowledged 
> as soon as they pushed. Coming from a JMS
> background that feels a bit strange to me, but that might be because I am 
> unfamiliar with the streaming API. In our world 
> a message is normally acknowledged when it has been processed successfully 
> (which is normally it has been written to 
> the file system, forwarded to another JMS destination or triggered some 
> execution in the database). 
>
> If the container crashes before it has acknowledged the message, the 
> message will be redelivered. In cases we encounter 
> an error we pass the message to an error handling destination or a retry 
> destination. 
>
>
> Apparently the architecture and the acceptance level of message loss 
> changes when switching to a streaming approach. 
>
>
> For now I have some concrete questions:
>
> 1) Have I missed the Camel replacement in Alpakka and if so, where is it 
> located within Alpakka ?
> 2) How are others coping with the "window of potential message loss" when 
> migrating from pure JMS flows to streams ?
> 3) Any pointers to good hands-on white papers are much appreciated. I have 
> read through some and also through most 
> of the streams documentation, but I guess I need to get my hands dirty 
> ... 
> 4) I don't dare to ask, but if anyone is using Akka / AkkaHttp and / or 
> AkkaStream within OSGi I would be more than happy 
> to exchange experiences & ideas. 
>
> [1] https://github.com/woq-blended/blended
>
>
> Thanks in advance for your attention 
> Best regards 
> Andreas
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.