Hi Arvid,

The http client is not my buttoleneck as I said before I check the async
and I have a delay until it enters to asyncinvoke about 80 ms if some can
explained me why we have such big delay I have attached a sample code in my
previous email can some one explain the delay

Thanks

On Mon, 6 Jul 2020, 23:31 Arvid Heise, <ar...@ververica.com> wrote:

> Hi Mark,
>
> Async wait operators cannot be chained to sources so the messages go
> through the network stack. Thus, having some latency is normal and cannot
> be avoided. It can be tuned though, but I don't think that this is the
> issue at hand as it should mostly impact latency and affect throughput
> less. Since external I/O calls are much more heavy weight than our internal
> communication, both the drop of throughput and the increase in latency are
> usually dwarfed by the external I/O call costs.
>
> Please try to increase the thread pool for akka as written in my previous
> email and report back.
>
> On Mon, Jul 6, 2020 at 9:44 PM Mark Zitnik <mark.zit...@gmail.com> wrote:
>
>> Hi Benchao,
>>
>> i have run this in the code:
>>
>> println(env.getConfig.getAutoWatermarkInterval)
>>
>> and got 200 i do fully understand how watermarks and AsyncOperator
>> operator works, but
>> i have decided to make a simple test that should evaluate the time it
>> takes to enter to the asyncInvoke method  and it looks that it takes about
>> 80ms witch is longer than the time it take to get a response from my
>> micro-service
>>
>> code below
>>
>> class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, 
>> String)] {
>>
>>   implicit lazy val executor: ExecutionContext = 
>> ExecutionContext.fromExecutor(Executors.directExecutor())
>>
>>   /*
>>   implicit val actorSystem = ActorSystem.apply("test", None, None, 
>> Some(executor))
>>   implicit val materializer = ActorMaterializer()
>>   implicit val executionContext = actorSystem.dispatcher
>>
>>
>>   println(materializer.system.name)
>>   println("start")
>>   */
>> // redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com
>>
>>   // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
>>   var actorSystem: ActorSystem = null
>>   var materializer: ActorMaterializer = null
>>   var executionContext: ExecutionContextExecutor = null
>>   //var akkaHttp: HttpExt = null
>>
>>   override def open(parameters: Configuration): Unit = {
>>     actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString, 
>> Some(ConfigFactory.load("application.conf")), None, Some(executor))
>>     materializer = ActorMaterializer()(actorSystem)
>>     executionContext = actorSystem.dispatcher
>>     //akkaHttp = Http(actorSystem)
>>   }
>>
>>   override def close(): Unit = {
>>     actorSystem.terminate()
>>   }
>>
>>   override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, 
>> String)]): Unit = {
>>         val start = str.toLong
>>         val delta = System.currentTimeMillis() - start
>>         resultFuture.complete(Iterable((str, s"${delta}")))
>>   }
>> }
>>
>>
>> object Job {
>>   def main(args: Array[String]): Unit = {
>>     // set up the execution environment
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>     //env.enableCheckpointing(10)
>>     env.setParallelism(1)
>>
>>     val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
>>     //someIntegers.map { _ => System.currentTimeMillis()}.map{ s => 
>> System.currentTimeMillis()-s}.print()
>>     val x : DataStream[String] = someIntegers.map( _ => 
>> s"${System.currentTimeMillis()}")
>>     val resultStream: DataStream[(String, String)] = 
>> AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L, 
>> TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
>>       //AsyncDataStream.unorderedWait(data , new 
>> AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
>>     resultStream.print()
>>     println(env.getConfig.getAutoWatermarkInterval)
>>     env.execute("Flink Scala API Skeleton")
>>   }
>> }
>>
>> is this normal behavior?
>>
>>
>> On Mon, Jul 6, 2020 at 2:45 PM Benchao Li <libenc...@apache.org> wrote:
>>
>>> Hi Mark,
>>>
>>> According to your data, I think the config of AsyncOperator is OK.
>>> There is one more config that might affect the throughput of
>>> AsyncOperator, it's watermark.
>>> Because unordered async operator still keeps the order between
>>> watermarks, did you use
>>> event time in your job, and if yes, what's the watermark interval in
>>> your job?
>>>
>>> Mark Zitnik <mark.zit...@gmail.com> 于2020年7月5日周日 下午7:44写道:
>>>
>>>> Hi Benchao
>>>>
>>>> The capacity is 100
>>>> Parallelism is 8
>>>> Rpc req is 20ms
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On Sun, 5 Jul 2020, 6:16 Benchao Li, <libenc...@apache.org> wrote:
>>>>
>>>>> Hi Mark,
>>>>>
>>>>> Could you give more details about your Flink job?
>>>>> - the capacity of AsyncDataStream
>>>>> - the parallelism of AsyncDataStream operator
>>>>> - the time of per blocked rpc request
>>>>>
>>>>> Mark Zitnik <mark.zit...@gmail.com> 于2020年7月5日周日 上午3:48写道:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> In my flink application I need to enrich data using 
>>>>>> AsyncDataStream.unorderedWait
>>>>>> but I am getting poor perforce at the beginning I was just working
>>>>>> with http call, but I have switched to grpc, I running on 8 core node and
>>>>>> getting total of 3200 events per second my service that I am using is not
>>>>>> fully utilized and can produce up to 10000 req/seq
>>>>>>
>>>>>> Flink job flow
>>>>>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~>
>>>>>> write to Kafka
>>>>>>
>>>>>> Using Akkad grpc code written in scala
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Best,
>>>>> Benchao Li
>>>>>
>>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to