Robert,

Thank you for your response.
I would like to try  kafka-console-consumer but I have no idea about
how to measure the consuming throughput.
Are there any standard way?
I would also try Kafka broker on physical servers.

Regarding version, I have upgraded to Flink 1.0.0 and replaced
FlinkKafkaConsumer 082 with 09, but did not see
any difference in performance.

Regards,
Hironori



2016-03-11 23:25 GMT+09:00 Robert Metzger <rmetz...@apache.org>:
> Hi Hironori,
>
> can you try with the kafka-console-consumer how many messages you can read
> in one minute?
> Maybe the broker's disk I/O is limited because everything is running in
> virtual machines (potentially sharing one hard disk?)
> I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker is
> working as expected.
>
> Our Kafka 0.8 consumer has been tested in environments where its reading
> with more than 100 MB/s per from a broker.
>
>
> On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>>
>> Aljoscha,
>>
>> Thank you for your response.
>>
>> I tried no JSON parsing and no sink (DiscardingSink) case. The
>> throughput was 8228msg/sec.
>> Slightly better than JSON + Elasticsearch case.
>> I also tried using socketTextStream instead of FlinkKafkaConsumer, in
>> that case, the result was
>> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket
>> server was the bottleneck)
>> That was amazing, although Flink's fault tolerance feature is not
>> available with socketTextStream.
>>
>> Regards,
>> Hironori
>>
>> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
>> > Hi,
>> > Another interesting test would be a combination of 3) and 2). I.e. no
>> > JSON parsing and no sink. This would show what the raw throughput can be
>> > before being slowed down by writing to Elasticsearch.
>> >
>> > Also .print() is also not feasible for production since it just prints
>> > every element to the stdout log on the TaskManagers, which itself can cause
>> > quite a slowdown. You could try:
>> >
>> > datastream.addSink(new DiscardingSink())
>> >
>> > which is a dummy sink that does nothing.
>> >
>> > Cheers,
>> > Aljoscha
>> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>> >>
>> >> Stephan,
>> >>
>> >> Sorry for the delay in my response.
>> >> I tried 3 cases you suggested.
>> >>
>> >> This time, I set parallelism to 1 for simpicity.
>> >>
>> >> 0) base performance (same as the first e-mail): 1,480msg/sec
>> >> 1) Disable checkpointing : almost same as 0)
>> >> 2) No ES sink. just print() : 1,510msg/sec
>> >> 3) JSON to TSV : 8,000msg/sec
>> >>
>> >> So, as you can see, the bottleneck was JSON parsing. I also want to
>> >> try eliminating Kafka to see
>> >> if there is a room to improve performance.(Currently, I am using
>> >> FlinkKafkaConsumer082 with Kafka 0.9
>> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
>> >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
>> >> Flink's scalability and fault tolerance.
>> >> Thank you for your advice.
>> >>
>> >> Regards,
>> >> Hironori Ogibayashi
>> >>
>> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>:
>> >>> Stephan,
>> >>>
>> >>> Thank you for your quick response.
>> >>> I will try and post the result later.
>> >>>
>> >>> Regards,
>> >>> Hironori
>> >>>
>> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
>> >>>> Hi!
>> >>>>
>> >>>> I would try and dig bit by bit into what the bottleneck is:
>> >>>>
>> >>>> 1) Disable the checkpointing, see what difference that makes
>> >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see
>> >>>> if that
>> >>>> is limiting
>> >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive
>> >>>> and
>> >>>> easily dominate the entire pipeline.
>> >>>>
>> >>>> Greetings,
>> >>>> Stephan
>> >>>>
>> >>>>
>> >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hello,
>> >>>>>
>> >>>>> I started evaluating Flink and tried simple performance test.
>> >>>>> The result was just about 4000 messages/sec with 300% CPU usage. I
>> >>>>> think this is quite low and wondering if it is a reasonable result.
>> >>>>> If someone could check it, it would be great.
>> >>>>>
>> >>>>> Here is the detail:
>> >>>>>
>> >>>>> [servers]
>> >>>>> - 3 Kafka broker with 3 partitions
>> >>>>> - 3 Flink TaskManager + 1 JobManager
>> >>>>> - 1 Elasticsearch
>> >>>>> All of them are separate VM with 8vCPU, 8GB memory
>> >>>>>
>> >>>>> [test case]
>> >>>>> The application counts access log by URI with in 1 minute window and
>> >>>>> send the result to Elasticsearch. The actual code is below.
>> >>>>> I used '-p 3' option to flink run command, so the task was
>> >>>>> distributed
>> >>>>> to 3 TaskManagers.
>> >>>>> In the test, I sent about 5000 logs/sec to Kafka.
>> >>>>>
>> >>>>> [result]
>> >>>>> - From Elasticsearch records, the total access count for all URI was
>> >>>>> about 260,000/min = 4300/sec. This is the entire throughput.
>> >>>>> - Kafka consumer lag was keep growing.
>> >>>>> - The CPU usage of each TaskManager machine was about 13-14%. From
>> >>>>> top
>> >>>>> command output, Flink java process was using 100%(1 CPU full)
>> >>>>>
>> >>>>> So I thought the bottleneck here was CPU used by Flink Tasks.
>> >>>>>
>> >>>>> Here is the application code.
>> >>>>> ---
>> >>>>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>> >>>>>    env.enableCheckpointing(1000)
>> >>>>> ...
>> >>>>>    val stream = env
>> >>>>>      .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>> >>>>> SimpleStringSchema(), properties))
>> >>>>>      .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>> >>>>> AnyRef]] }
>> >>>>>      .map{ x => x.get("uri") match {
>> >>>>>        case Some(y) => (y.asInstanceOf[String],1)
>> >>>>>        case None => ("", 1)
>> >>>>>      }}
>> >>>>>      .keyBy(0)
>> >>>>>      .timeWindow(Time.of(1, TimeUnit.MINUTES))
>> >>>>>      .sum(1)
>> >>>>>      .map{ x => (System.currentTimeMillis(), x)}
>> >>>>>      .addSink(new ElasticsearchSink(config, transports, new
>> >>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>> >>>>>        override def createIndexRequest(element: Tuple2[Long,
>> >>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>> >>>>>          val json = new HashMap[String, AnyRef]
>> >>>>>          json.put("@timestamp", new Timestamp(element._1))
>> >>>>>          json.put("uri", element._2._1)
>> >>>>>          json.put("count", element._2._2: java.lang.Integer)
>> >>>>>          println("SENDING: " + element)
>> >>>>>
>> >>>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>> >>>>>        }
>> >>>>>      }))
>> >>>>> ---
>> >>>>>
>> >>>>> Regards,
>> >>>>> Hironori Ogibayashi
>> >>>>
>> >>>>
>> >
>
>

Reply via email to