[akka-user] Re: Question about Reactive Kafka performances

2017-06-27 Thread Kilic Ali-Firat
Alex, You're right about the parameter *receive.buffer.bytes*. When using his default value, my consumer throughput is about 3 000 msg / sec when my producer send about 5000 msg / sec. When I setting the receive.buffer.bytes to a higher value, I have the same throughput than the producer. Th

[akka-user] Re: Question about Reactive Kafka performances

2017-06-26 Thread Alex Cozzi
Great! about performance: I tried to tweak a lot of parameters, but what I found has most influence on the throughput on the reading side seems to be "receive.buffer.bytes". The optimum varies depending on your image size and other factors, but you can try to do a bit of parameter search. In my

[akka-user] Re: Question about Reactive Kafka performances

2017-06-26 Thread Kilic Ali-Firat
Hi Alex, I forget to include a security files to listen my kafka broker. Using your build.sbt (scala version 2.11 and reactive kafka 0.12) that I changed, I can reach a througput between 4 000 abd 5000 msg / sec. For now, it enough for me. Many thanks for sharing your problems and how you f

[akka-user] Re: Question about Reactive Kafka performances

2017-06-25 Thread Kilic Ali-Firat
I tried to add this bound on the fetch bytes but I didn't work too. I'm using java 1.8.0_121 and I tested something else : I'm listening a topic in which no data has been send and I get the same error ! I also tried with my old consumer in scala 2.11.7 and I have the same error. Le dimanch

[akka-user] Re: Question about Reactive Kafka performances

2017-06-25 Thread Alex Cozzi
your problem seems related to running out of memory. Have you tried removing: .withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "200") but if you are running on java8 you should not hit the out-of-memory error. -- >> Read the docs: http://akka.io/docs/ >> C

[akka-user] Re: Question about Reactive Kafka performances

2017-06-24 Thread Kilic Ali-Firat
Keeping the minimal dependencies of your build.sbt didn't change the error that I get : scalaVersion := "2.12.2" val akkaV = "2.4.17" val kafkaV = "0.10.0.1" val reactiveKafkaV = "0.13" libraryDependencies ++= Seq( "org.apache.kafka" % "kafka-clients" % kafkaV, "org.apache.kafka" % "kafka