Re: [akka-user] Akka Stream stalling with JsonFraming

2016-12-06 Thread subopt1
I figured it out. Submitting a pull request!

On Tuesday, December 6, 2016 at 12:29:28 PM UTC-7, Konrad Malawski wrote:
>
> Is the JSON well formed and "normal" or something weird or maybe huge 
> objects or something in there etc?
> Try to debug at which point it gets stuck.
>
> A minimized reproducer would help.
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 6 December 2016 at 20:26:43, sub...@gmail.com  (
> sub...@gmail.com ) wrote:
>
> I working on a Akka Streams project that reads gzipped files from S3 and 
> parses json. The issue I'm running into the stream stalls at about 24523530 
> bytes and then times-out after a 1 minute 
> (java.util.concurrent.TimeoutException: No elements passed in the last 1 
> minute), but there is no error otherwise. If I remove the JsonFraming.
> objectScanner from the flow, it does not exhibit this behavior. The 
> stream looks like:
>
> val s3FileSource: Source[ByteString, NotUsed] = 
> s3FileNamesSource.via(Flows.downloadObjectsFlow(s3Client, bucket))
>
>
>   val result = s3FileSource
> .via(Compression.gunzip(100))
> .via(JsonFraming.objectScanner(Int.MaxValue))
> .runWith(FileIO.toPath(Paths.get("stream.out"), Set(CREATE, WRITE)))
>
>
> The json that is does write to the file looks just fine. I'm on Akka 2.4.14. 
> Any ideas on what might be going on or how to troubleshoot? Thanks,
>
>
> Andrew
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Persistence with Avro

2016-12-06 Thread Richard Rodseth
Anyone else using Avro and avro4s to serialize persistence events? I was
able to get the SerializerWithStringManifest below working (with generic
serialize and deserialize methods). Should I be extending something other
than SerializerWithStringManifest, given that Avro on its own helps with
schema evolution?

Note: avro4s seems very promising, although I did run into this limitation
https://github.com/sksamuel/avro4s/issues/75

class PersistenceEventsAvroSerializer extends SerializerWithStringManifest {

  val FooManifest = "Foo"

  val BarManifest = "Bar"

  def identifier = 1234567

 override def manifest(obj: AnyRef): String =

obj match {

  case _: Bar => BarManifest

  case _: Foo => FooManifest

}

 override def toBinary(obj: AnyRef): Array[Byte] = {

   obj match {

  case foo: Foo => serialize[Foo](foo)

  case bsr: Bar => serialize[Bar](bar)

}

  }

 override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {

manifest match {

  case FooManifest => deserialize[Foo](bytes)

  case BarManifest => deserialize[Bar](bytes)

}

  }

  private def serialize[T](data: T)(implicit s: SchemaFor[T], r:
ToRecord[T]): Array[Byte] = {

val baos = new ByteArrayOutputStream

val output = AvroOutputStream.binary[T](baos)

output.write(data)

output.close()

val result = baos.toByteArray()

result

  }


  private def deserialize[T](data: Array[Byte])(implicit s: SchemaFor[T],
r: FromRecord[T]): T = {


val input = AvroInputStream.binary[T](data)

val result: T = if (input.iterator.isEmpty) {

  ??? // TODO Check

} else {

  input.iterator.toSeq.head

}

result

  }


}

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Stream stalling with JsonFraming

2016-12-06 Thread Konrad Malawski
Is the JSON well formed and "normal" or something weird or maybe huge
objects or something in there etc?
Try to debug at which point it gets stuck.

A minimized reproducer would help.

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 6 December 2016 at 20:26:43, subo...@gmail.com (subo...@gmail.com) wrote:

I working on a Akka Streams project that reads gzipped files from S3 and
parses json. The issue I'm running into the stream stalls at about 24523530
bytes and then times-out after a 1 minute
(java.util.concurrent.TimeoutException: No elements passed in the last 1
minute), but there is no error otherwise. If I remove the JsonFraming.
objectScanner from the flow, it does not exhibit this behavior. The stream
looks like:

val s3FileSource: Source[ByteString, NotUsed] =
s3FileNamesSource.via(Flows.downloadObjectsFlow(s3Client, bucket))


  val result = s3FileSource
.via(Compression.gunzip(100))
.via(JsonFraming.objectScanner(Int.MaxValue))
.runWith(FileIO.toPath(Paths.get("stream.out"), Set(CREATE, WRITE)))


The json that is does write to the file looks just fine. I'm on Akka
2.4.14. Any ideas on what might be going on or how to troubleshoot?
Thanks,


Andrew


--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Stream stalling with JsonFraming

2016-12-06 Thread subopt1
I working on a Akka Streams project that reads gzipped files from S3 and 
parses json. The issue I'm running into the stream stalls at about 24523530 
bytes and then times-out after a 1 minute 
(java.util.concurrent.TimeoutException: No elements passed in the last 1 
minute), but there is no error otherwise. If I remove the JsonFraming.
objectScanner from the flow, it does not exhibit this behavior. The stream 
looks like:

val s3FileSource: Source[ByteString, NotUsed] = 
s3FileNamesSource.via(Flows.downloadObjectsFlow(s3Client, bucket))


  val result = s3FileSource
.via(Compression.gunzip(100))
.via(JsonFraming.objectScanner(Int.MaxValue))
.runWith(FileIO.toPath(Paths.get("stream.out"), Set(CREATE, WRITE)))


The json that is does write to the file looks just fine. I'm on Akka 2.4.14. 
Any ideas on what might be going on or how to troubleshoot? Thanks,


Andrew


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Cluster Sharding custom routing

2016-12-06 Thread Luis Pedrosa


Hello!


>From what I understand, the cluster sharder GracefulShutdown 
implementation, removes the node from the cluster. It also signals all the 
remaining nodes to buffer any messages that target shards already allocated 
on the given node.


I believe this is because the use case assumes that you will migrate the 
entities accordingly before the node is removed from the cluster.


I have a different use case in mind. I would like to signal the sharder 
that a node shouldn't be considered for any new allocation. However, It 
should still be able to accept any message for pre-allocated shards.


The reason is that my entities are not ready for persistence (something I 
would like to address at a later stage). Given that, migrating them is not 
a desirable option.


I was wondering, is there a way to make this use case work with the cluster 
sharder mechanics?

Should I consider implementing a simple clustered routing solution that 
satisfies our use case?

Or is there another solution provided by the akka-cluster extensions that I 
should be looking at?


In conclusion, on the ClusterSharder, a new shard is allocated to a node 
that is currently up. I would like to control this allocation, for a given 
node:

   - By still being able to accept messages for shards that are already 
   allocated in the node
   - By not allowing any new shards to be allocated in the node

Thank you in advance!

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Java TestProbe with ReceiveWhile

2016-12-06 Thread Allan Brighton
In the Akka Scala API you can call receiveWhile to receive messages sent to 
a TestProbe:

val msgs = testProbe.receiveWhile(5.seconds) {
  case x *=> ...*

} 

In the Java API you can do something similar, but I could not find a way to 
use ReceiveWhile with a TestProbe:

final CurrentState[] msgs =
  new ReceiveWhile(CurrentState.class, duration("5 seconds")) {
protected CurrentState match(Object in) {
  if (in instanceof CurrentState) {
CurrentState cs = (CurrentState) in;

// ...
return cs;
  }
  throw noMatch();
}
  }.get(); // this extracts the received messages


Is there a way to do this in the Java API where the messages are sent to a 
TestProbe?


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] (akka-http) server.max-connections vs max socket opening/s

2016-12-06 Thread Konrad Malawski
Before using ab I was stressing the app with Gatling which does not reuse
connections between users. In order to simplify my demonstration I switched
to ab without keepalive to keep the same scenario. I will have a look to
wrk.

Please do remember to benchmark not the same box on which you're running
the load generator by the way... Otherwise you're competing for resources
between benchmark runner and benchmarked app ;-)

If I understand correctly the end of your answer, the use case which
consist to open many tcp connections at the same time is currently not
optimised by akka-http materialization and this is what I observe on my
system when I reach a limit of around 1200 opening sockets/s, is that right
?

Yeah, most likely. Though honestly that's plenty anyway for most apps –
connections are expensive and are kept around in polls by all known-to-me
serious clients (including browsers, http client libraries etc).

-- Konrad

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] (akka-http) server.max-connections vs max socket opening/s

2016-12-06 Thread Charley Kayser
Thanks for answering so quickly.
Indeed the performance is much better if I reuse tcp connections.

Before using ab I was stressing the app with Gatling which does not reuse 
connections between users. In order to simplify my demonstration I switched 
to ab without keepalive to keep the same scenario. I will have a look to 
wrk.

If I understand correctly the end of your answer, the use case which 
consist to open many tcp connections at the same time is currently not 
optimised by akka-http materialization and this is what I observe on my 
system when I reach a limit of around 1200 opening sockets/s, is that right 
?

Thanks again.


Le mardi 6 décembre 2016 10:51:56 UTC+1, Konrad Malawski a écrit :
>
> Your benchmark is likely unrealistic - it should use consistent 
> connections to give any kind of realistic numbers (clients always keep 
> connections).
> ab is also not the best tool around (hasn't been since a long time), 
> please try `wrk`.
>
> Your benchmark is effectively benchmarking materialization time of new 
> connections - which is a known worst case scenario foe Akka HTTP.
> We're in the middle of optimising this currently by the way, with faster 
> materialization, however it still is not realistic if you want to measure 
> RPS - then you should use ab with -k or use a proper tool like wrk.
>
> Happy hakking.
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 6 December 2016 at 10:42:42, Charley Kayser (kayser@gmail.com 
> ) wrote:
>
> Hi
>
> I'm testing akka-http and I try to measure how many concurrent connections 
> it can handle on my company's Linux VM (x86_64, 4 CPU).
> I use the simple http server implementation provided in the akka-http 
> documentation which you can find in the following github project created 
> for illustrating this ticket: https://github.com/jycr/simple-akka-http
>
> Using ApacheBench to stress the application (with option -c 2000), I 
> measure that the server can't handle more than around 1000 requests per 
> second.
> Without success I changed the following to try to go over this limit:
>
>- updated the Linux settings to allow more than 4000 TCP concurrent 
>connections (ulimit -n, sysctl net.core.somaxconn, ifconfig txqueuelen, 
>sysctl net.core.netdev_max_backlog, sysctl net.ipv4.tcp_max_syn_backlog)
>- set akka.http.server.max-connections to 2048
>
> I used YourKit to monitor the app and I could see the number of open 
> sockets didn't go over the limit of 1200 open sockets per second. This 
> makes me think the problem could be located in akka-io or in some kind of 
> back-pressure feature which tell the underlying nio.Selector not to accept 
> too many connections at once.
>
> What am I missing here ? 
>
> Thanks for your help
>
> Attached a screenshot of YourKit showing the socket metrics
>
>
>
> 
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] invalid materialized value

2016-12-06 Thread Endre Varga
Yes, this is a bug. I filed a ticket:
https://github.com/akka/akka/issues/21955

On Tue, Dec 6, 2016 at 10:39 AM, Endre Varga 
wrote:

> Hi Stefan,
>
> What version are you using? This might be a bug.
>
> -Endre
>
> On Tue, Dec 6, 2016 at 10:13 AM, EXTERNAL Wachter Stefan (Keybird IT
> Consulting und Vetriebs GmbH, INST-ICM/BSV-BS)  bosch-si.com> wrote:
>
>> Hi all,
>>
>>
>>
>> the following line of code
>>
>>
>>
>>
>>
>> *val *((pub, mat), sub) = *TestSource*.*probe*[Int].viaMat(*Flow*
>> [Int])(Keep.*both*).toMat(*TestSink*.*probe*)(Keep.*both*).run()
>>
>>
>>
>> throws a ClassCastException:
>>
>>
>>
>>
>>
>> java.lang.ClassCastException: akka.stream.testkit.TestPublisher$Probe
>> cannot be cast to scala.Tuple2
>>
>>
>>
>> it seems that the viaMat(...)(Keep.both) does not what it promises.
>>
>>
>>
>> Mit freundlichen Grüßen / Best regards
>>
>>
>>
>> *Dr. Stefan Wachter*
>>
>>
>>
>> Bosch Software Innovations GmbH
>>
>> Communications (INST-ICM/BSV-BS)
>>
>> Stuttgarterstr. 130
>>
>> 71332 Waiblingen
>>
>> GERMANY
>>
>> www.bosch-si.de
>>
>> www.blog.bosch-si.com
>>
>>
>>
>> Tel. +49 711 811 58 477
>>
>> Fax +49 711 811 58 100
>>
>> external.stefan.wach...@bosch-si.com 
>>
>>
>>
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] (akka-http) server.max-connections vs max socket opening/s

2016-12-06 Thread Konrad Malawski
Your benchmark is likely unrealistic - it should use consistent connections
to give any kind of realistic numbers (clients always keep connections).
ab is also not the best tool around (hasn't been since a long time), please
try `wrk`.

Your benchmark is effectively benchmarking materialization time of new
connections - which is a known worst case scenario foe Akka HTTP.
We're in the middle of optimising this currently by the way, with faster
materialization, however it still is not realistic if you want to measure
RPS - then you should use ab with -k or use a proper tool like wrk.

Happy hakking.

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 6 December 2016 at 10:42:42, Charley Kayser (kayser.char...@gmail.com)
wrote:

Hi

I'm testing akka-http and I try to measure how many concurrent connections
it can handle on my company's Linux VM (x86_64, 4 CPU).
I use the simple http server implementation provided in the akka-http
documentation which you can find in the following github project created
for illustrating this ticket: https://github.com/jycr/simple-akka-http

Using ApacheBench to stress the application (with option -c 2000), I
measure that the server can't handle more than around 1000 requests per
second.
Without success I changed the following to try to go over this limit:

   - updated the Linux settings to allow more than 4000 TCP concurrent
   connections (ulimit -n, sysctl net.core.somaxconn, ifconfig txqueuelen,
   sysctl net.core.netdev_max_backlog, sysctl net.ipv4.tcp_max_syn_backlog)
   - set akka.http.server.max-connections to 2048

I used YourKit to monitor the app and I could see the number of open
sockets didn't go over the limit of 1200 open sockets per second. This
makes me think the problem could be located in akka-io or in some kind of
back-pressure feature which tell the underlying nio.Selector not to accept
too many connections at once.

What am I missing here ?

Thanks for your help

Attached a screenshot of YourKit showing the socket metrics





--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] (akka-http) server.max-connections vs max socket opening/s

2016-12-06 Thread Charley Kayser
Hi

I'm testing akka-http and I try to measure how many concurrent connections 
it can handle on my company's Linux VM (x86_64, 4 CPU).
I use the simple http server implementation provided in the akka-http 
documentation which you can find in the following github project created 
for illustrating this ticket: https://github.com/jycr/simple-akka-http

Using ApacheBench to stress the application (with option -c 2000), I 
measure that the server can't handle more than around 1000 requests per 
second.
Without success I changed the following to try to go over this limit:

   - updated the Linux settings to allow more than 4000 TCP concurrent 
   connections (ulimit -n, sysctl net.core.somaxconn, ifconfig txqueuelen, 
   sysctl net.core.netdev_max_backlog, sysctl net.ipv4.tcp_max_syn_backlog)
   - set akka.http.server.max-connections to 2048
   
I used YourKit to monitor the app and I could see the number of open 
sockets didn't go over the limit of 1200 open sockets per second. This 
makes me think the problem could be located in akka-io or in some kind of 
back-pressure feature which tell the underlying nio.Selector not to accept 
too many connections at once.

What am I missing here ? 

Thanks for your help

Attached a screenshot of YourKit showing the socket metrics





-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] invalid materialized value

2016-12-06 Thread Endre Varga
Hi Stefan,

What version are you using? This might be a bug.

-Endre

On Tue, Dec 6, 2016 at 10:13 AM, EXTERNAL Wachter Stefan (Keybird IT
Consulting und Vetriebs GmbH, INST-ICM/BSV-BS) <
external.stefan.wach...@bosch-si.com> wrote:

> Hi all,
>
>
>
> the following line of code
>
>
>
>
>
> *val *((pub, mat), sub) = *TestSource*.*probe*[Int].viaMat(*Flow*
> [Int])(Keep.*both*).toMat(*TestSink*.*probe*)(Keep.*both*).run()
>
>
>
> throws a ClassCastException:
>
>
>
>
>
> java.lang.ClassCastException: akka.stream.testkit.TestPublisher$Probe
> cannot be cast to scala.Tuple2
>
>
>
> it seems that the viaMat(...)(Keep.both) does not what it promises.
>
>
>
> Mit freundlichen Grüßen / Best regards
>
>
>
> *Dr. Stefan Wachter*
>
>
>
> Bosch Software Innovations GmbH
>
> Communications (INST-ICM/BSV-BS)
>
> Stuttgarterstr. 130
>
> 71332 Waiblingen
>
> GERMANY
>
> www.bosch-si.de
>
> www.blog.bosch-si.com
>
>
>
> Tel. +49 711 811 58 477
>
> Fax +49 711 811 58 100
>
> external.stefan.wach...@bosch-si.com 
>
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] invalid materialized value

2016-12-06 Thread EXTERNAL Wachter Stefan (Keybird IT Consulting und Vetriebs GmbH, INST-ICM/BSV-BS)
Hi all,

the following line of code


val ((pub, mat), sub) = 
TestSource.probe[Int].viaMat(Flow[Int])(Keep.both).toMat(TestSink.probe)(Keep.both).run()

throws a ClassCastException:


java.lang.ClassCastException: akka.stream.testkit.TestPublisher$Probe cannot be 
cast to scala.Tuple2

it seems that the viaMat(...)(Keep.both) does not what it promises.

Mit freundlichen Grüßen / Best regards

Dr. Stefan Wachter

Bosch Software Innovations GmbH
Communications (INST-ICM/BSV-BS)
Stuttgarterstr. 130
71332 Waiblingen
GERMANY
www.bosch-si.de
www.blog.bosch-si.com

Tel. +49 711 811 58 477
Fax +49 711 811 58 100
external.stefan.wach...@bosch-si.com

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.