Robert, Thank you for your response. I would like to try kafka-console-consumer but I have no idea about how to measure the consuming throughput. Are there any standard way? I would also try Kafka broker on physical servers.
Regarding version, I have upgraded to Flink 1.0.0 and replaced FlinkKafkaConsumer 082 with 09, but did not see any difference in performance. Regards, Hironori 2016-03-11 23:25 GMT+09:00 Robert Metzger <rmetz...@apache.org>: > Hi Hironori, > > can you try with the kafka-console-consumer how many messages you can read > in one minute? > Maybe the broker's disk I/O is limited because everything is running in > virtual machines (potentially sharing one hard disk?) > I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker is > working as expected. > > Our Kafka 0.8 consumer has been tested in environments where its reading > with more than 100 MB/s per from a broker. > > > On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >> >> Aljoscha, >> >> Thank you for your response. >> >> I tried no JSON parsing and no sink (DiscardingSink) case. The >> throughput was 8228msg/sec. >> Slightly better than JSON + Elasticsearch case. >> I also tried using socketTextStream instead of FlinkKafkaConsumer, in >> that case, the result was >> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket >> server was the bottleneck) >> That was amazing, although Flink's fault tolerance feature is not >> available with socketTextStream. >> >> Regards, >> Hironori >> >> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >> > Hi, >> > Another interesting test would be a combination of 3) and 2). I.e. no >> > JSON parsing and no sink. This would show what the raw throughput can be >> > before being slowed down by writing to Elasticsearch. >> > >> > Also .print() is also not feasible for production since it just prints >> > every element to the stdout log on the TaskManagers, which itself can cause >> > quite a slowdown. You could try: >> > >> > datastream.addSink(new DiscardingSink()) >> > >> > which is a dummy sink that does nothing. >> > >> > Cheers, >> > Aljoscha >> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >> >> >> >> Stephan, >> >> >> >> Sorry for the delay in my response. >> >> I tried 3 cases you suggested. >> >> >> >> This time, I set parallelism to 1 for simpicity. >> >> >> >> 0) base performance (same as the first e-mail): 1,480msg/sec >> >> 1) Disable checkpointing : almost same as 0) >> >> 2) No ES sink. just print() : 1,510msg/sec >> >> 3) JSON to TSV : 8,000msg/sec >> >> >> >> So, as you can see, the bottleneck was JSON parsing. I also want to >> >> try eliminating Kafka to see >> >> if there is a room to improve performance.(Currently, I am using >> >> FlinkKafkaConsumer082 with Kafka 0.9 >> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09). >> >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of >> >> Flink's scalability and fault tolerance. >> >> Thank you for your advice. >> >> >> >> Regards, >> >> Hironori Ogibayashi >> >> >> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>: >> >>> Stephan, >> >>> >> >>> Thank you for your quick response. >> >>> I will try and post the result later. >> >>> >> >>> Regards, >> >>> Hironori >> >>> >> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>: >> >>>> Hi! >> >>>> >> >>>> I would try and dig bit by bit into what the bottleneck is: >> >>>> >> >>>> 1) Disable the checkpointing, see what difference that makes >> >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see >> >>>> if that >> >>>> is limiting >> >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive >> >>>> and >> >>>> easily dominate the entire pipeline. >> >>>> >> >>>> Greetings, >> >>>> Stephan >> >>>> >> >>>> >> >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> >> >>>> wrote: >> >>>>> >> >>>>> Hello, >> >>>>> >> >>>>> I started evaluating Flink and tried simple performance test. >> >>>>> The result was just about 4000 messages/sec with 300% CPU usage. I >> >>>>> think this is quite low and wondering if it is a reasonable result. >> >>>>> If someone could check it, it would be great. >> >>>>> >> >>>>> Here is the detail: >> >>>>> >> >>>>> [servers] >> >>>>> - 3 Kafka broker with 3 partitions >> >>>>> - 3 Flink TaskManager + 1 JobManager >> >>>>> - 1 Elasticsearch >> >>>>> All of them are separate VM with 8vCPU, 8GB memory >> >>>>> >> >>>>> [test case] >> >>>>> The application counts access log by URI with in 1 minute window and >> >>>>> send the result to Elasticsearch. The actual code is below. >> >>>>> I used '-p 3' option to flink run command, so the task was >> >>>>> distributed >> >>>>> to 3 TaskManagers. >> >>>>> In the test, I sent about 5000 logs/sec to Kafka. >> >>>>> >> >>>>> [result] >> >>>>> - From Elasticsearch records, the total access count for all URI was >> >>>>> about 260,000/min = 4300/sec. This is the entire throughput. >> >>>>> - Kafka consumer lag was keep growing. >> >>>>> - The CPU usage of each TaskManager machine was about 13-14%. From >> >>>>> top >> >>>>> command output, Flink java process was using 100%(1 CPU full) >> >>>>> >> >>>>> So I thought the bottleneck here was CPU used by Flink Tasks. >> >>>>> >> >>>>> Here is the application code. >> >>>>> --- >> >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >>>>> env.enableCheckpointing(1000) >> >>>>> ... >> >>>>> val stream = env >> >>>>> .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new >> >>>>> SimpleStringSchema(), properties)) >> >>>>> .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, >> >>>>> AnyRef]] } >> >>>>> .map{ x => x.get("uri") match { >> >>>>> case Some(y) => (y.asInstanceOf[String],1) >> >>>>> case None => ("", 1) >> >>>>> }} >> >>>>> .keyBy(0) >> >>>>> .timeWindow(Time.of(1, TimeUnit.MINUTES)) >> >>>>> .sum(1) >> >>>>> .map{ x => (System.currentTimeMillis(), x)} >> >>>>> .addSink(new ElasticsearchSink(config, transports, new >> >>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]] { >> >>>>> override def createIndexRequest(element: Tuple2[Long, >> >>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = { >> >>>>> val json = new HashMap[String, AnyRef] >> >>>>> json.put("@timestamp", new Timestamp(element._1)) >> >>>>> json.put("uri", element._2._1) >> >>>>> json.put("count", element._2._2: java.lang.Integer) >> >>>>> println("SENDING: " + element) >> >>>>> >> >>>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json) >> >>>>> } >> >>>>> })) >> >>>>> --- >> >>>>> >> >>>>> Regards, >> >>>>> Hironori Ogibayashi >> >>>> >> >>>> >> > > >