Aljoscha,

Thank you for your response.

I tried no JSON parsing and no sink (DiscardingSink) case. The
throughput was 8228msg/sec.
Slightly better than JSON + Elasticsearch case.
I also tried using socketTextStream instead of FlinkKafkaConsumer, in
that case, the result was
60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket
server was the bottleneck)
That was amazing, although Flink's fault tolerance feature is not
available with socketTextStream.

Regards,
Hironori

2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
> Hi,
> Another interesting test would be a combination of 3) and 2). I.e. no JSON 
> parsing and no sink. This would show what the raw throughput can be before 
> being slowed down by writing to Elasticsearch.
>
> Also .print() is also not feasible for production since it just prints every 
> element to the stdout log on the TaskManagers, which itself can cause quite a 
> slowdown. You could try:
>
> datastream.addSink(new DiscardingSink())
>
> which is a dummy sink that does nothing.
>
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>>
>> Stephan,
>>
>> Sorry for the delay in my response.
>> I tried 3 cases you suggested.
>>
>> This time, I set parallelism to 1 for simpicity.
>>
>> 0) base performance (same as the first e-mail): 1,480msg/sec
>> 1) Disable checkpointing : almost same as 0)
>> 2) No ES sink. just print() : 1,510msg/sec
>> 3) JSON to TSV : 8,000msg/sec
>>
>> So, as you can see, the bottleneck was JSON parsing. I also want to
>> try eliminating Kafka to see
>> if there is a room to improve performance.(Currently, I am using
>> FlinkKafkaConsumer082 with Kafka 0.9
>> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
>> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
>> Flink's scalability and fault tolerance.
>> Thank you for your advice.
>>
>> Regards,
>> Hironori Ogibayashi
>>
>> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>:
>>> Stephan,
>>>
>>> Thank you for your quick response.
>>> I will try and post the result later.
>>>
>>> Regards,
>>> Hironori
>>>
>>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
>>>> Hi!
>>>>
>>>> I would try and dig bit by bit into what the bottleneck is:
>>>>
>>>> 1) Disable the checkpointing, see what difference that makes
>>>> 2) Use a dummy sink (discarding) rather than elastic search, to see if that
>>>> is limiting
>>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
>>>> easily dominate the entire pipeline.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I started evaluating Flink and tried simple performance test.
>>>>> The result was just about 4000 messages/sec with 300% CPU usage. I
>>>>> think this is quite low and wondering if it is a reasonable result.
>>>>> If someone could check it, it would be great.
>>>>>
>>>>> Here is the detail:
>>>>>
>>>>> [servers]
>>>>> - 3 Kafka broker with 3 partitions
>>>>> - 3 Flink TaskManager + 1 JobManager
>>>>> - 1 Elasticsearch
>>>>> All of them are separate VM with 8vCPU, 8GB memory
>>>>>
>>>>> [test case]
>>>>> The application counts access log by URI with in 1 minute window and
>>>>> send the result to Elasticsearch. The actual code is below.
>>>>> I used '-p 3' option to flink run command, so the task was distributed
>>>>> to 3 TaskManagers.
>>>>> In the test, I sent about 5000 logs/sec to Kafka.
>>>>>
>>>>> [result]
>>>>> - From Elasticsearch records, the total access count for all URI was
>>>>> about 260,000/min = 4300/sec. This is the entire throughput.
>>>>> - Kafka consumer lag was keep growing.
>>>>> - The CPU usage of each TaskManager machine was about 13-14%. From top
>>>>> command output, Flink java process was using 100%(1 CPU full)
>>>>>
>>>>> So I thought the bottleneck here was CPU used by Flink Tasks.
>>>>>
>>>>> Here is the application code.
>>>>> ---
>>>>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>    env.enableCheckpointing(1000)
>>>>> ...
>>>>>    val stream = env
>>>>>      .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>>>>> SimpleStringSchema(), properties))
>>>>>      .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>>>>> AnyRef]] }
>>>>>      .map{ x => x.get("uri") match {
>>>>>        case Some(y) => (y.asInstanceOf[String],1)
>>>>>        case None => ("", 1)
>>>>>      }}
>>>>>      .keyBy(0)
>>>>>      .timeWindow(Time.of(1, TimeUnit.MINUTES))
>>>>>      .sum(1)
>>>>>      .map{ x => (System.currentTimeMillis(), x)}
>>>>>      .addSink(new ElasticsearchSink(config, transports, new
>>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>>>>>        override def createIndexRequest(element: Tuple2[Long,
>>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>>>>>          val json = new HashMap[String, AnyRef]
>>>>>          json.put("@timestamp", new Timestamp(element._1))
>>>>>          json.put("uri", element._2._1)
>>>>>          json.put("count", element._2._2: java.lang.Integer)
>>>>>          println("SENDING: " + element)
>>>>>
>>>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>>>>>        }
>>>>>      }))
>>>>> ---
>>>>>
>>>>> Regards,
>>>>> Hironori Ogibayashi
>>>>
>>>>
>

Reply via email to