Hi!

I would try and dig bit by bit into what the bottleneck is:

 1) Disable the checkpointing, see what difference that makes
 2) Use a dummy sink (discarding) rather than elastic search, to see if
that is limiting
 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
easily dominate the entire pipeline.

Greetings,
Stephan


On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote:

> Hello,
>
> I started evaluating Flink and tried simple performance test.
> The result was just about 4000 messages/sec with 300% CPU usage. I
> think this is quite low and wondering if it is a reasonable result.
> If someone could check it, it would be great.
>
> Here is the detail:
>
> [servers]
> - 3 Kafka broker with 3 partitions
> - 3 Flink TaskManager + 1 JobManager
> - 1 Elasticsearch
> All of them are separate VM with 8vCPU, 8GB memory
>
> [test case]
> The application counts access log by URI with in 1 minute window and
> send the result to Elasticsearch. The actual code is below.
> I used '-p 3' option to flink run command, so the task was distributed
> to 3 TaskManagers.
> In the test, I sent about 5000 logs/sec to Kafka.
>
> [result]
> - From Elasticsearch records, the total access count for all URI was
> about 260,000/min = 4300/sec. This is the entire throughput.
> - Kafka consumer lag was keep growing.
> - The CPU usage of each TaskManager machine was about 13-14%. From top
> command output, Flink java process was using 100%(1 CPU full)
>
> So I thought the bottleneck here was CPU used by Flink Tasks.
>
> Here is the application code.
> ---
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.enableCheckpointing(1000)
> ...
>     val stream = env
>       .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
> SimpleStringSchema(), properties))
>       .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
> AnyRef]] }
>       .map{ x => x.get("uri") match {
>         case Some(y) => (y.asInstanceOf[String],1)
>         case None => ("", 1)
>       }}
>       .keyBy(0)
>       .timeWindow(Time.of(1, TimeUnit.MINUTES))
>       .sum(1)
>       .map{ x => (System.currentTimeMillis(), x)}
>       .addSink(new ElasticsearchSink(config, transports, new
> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>         override def createIndexRequest(element: Tuple2[Long,
> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>           val json = new HashMap[String, AnyRef]
>           json.put("@timestamp", new Timestamp(element._1))
>           json.put("uri", element._2._1)
>           json.put("count", element._2._2: java.lang.Integer)
>           println("SENDING: " + element)
>
> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>         }
>       }))
> ---
>
> Regards,
> Hironori Ogibayashi
>

Reply via email to