Hi Arvid, The http client is not my buttoleneck as I said before I check the async and I have a delay until it enters to asyncinvoke about 80 ms if some can explained me why we have such big delay I have attached a sample code in my previous email can some one explain the delay
Thanks On Mon, 6 Jul 2020, 23:31 Arvid Heise, <ar...@ververica.com> wrote: > Hi Mark, > > Async wait operators cannot be chained to sources so the messages go > through the network stack. Thus, having some latency is normal and cannot > be avoided. It can be tuned though, but I don't think that this is the > issue at hand as it should mostly impact latency and affect throughput > less. Since external I/O calls are much more heavy weight than our internal > communication, both the drop of throughput and the increase in latency are > usually dwarfed by the external I/O call costs. > > Please try to increase the thread pool for akka as written in my previous > email and report back. > > On Mon, Jul 6, 2020 at 9:44 PM Mark Zitnik <mark.zit...@gmail.com> wrote: > >> Hi Benchao, >> >> i have run this in the code: >> >> println(env.getConfig.getAutoWatermarkInterval) >> >> and got 200 i do fully understand how watermarks and AsyncOperator >> operator works, but >> i have decided to make a simple test that should evaluate the time it >> takes to enter to the asyncInvoke method and it looks that it takes about >> 80ms witch is longer than the time it take to get a response from my >> micro-service >> >> code below >> >> class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, >> String)] { >> >> implicit lazy val executor: ExecutionContext = >> ExecutionContext.fromExecutor(Executors.directExecutor()) >> >> /* >> implicit val actorSystem = ActorSystem.apply("test", None, None, >> Some(executor)) >> implicit val materializer = ActorMaterializer() >> implicit val executionContext = actorSystem.dispatcher >> >> >> println(materializer.system.name) >> println("start") >> */ >> // redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com >> >> // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com >> var actorSystem: ActorSystem = null >> var materializer: ActorMaterializer = null >> var executionContext: ExecutionContextExecutor = null >> //var akkaHttp: HttpExt = null >> >> override def open(parameters: Configuration): Unit = { >> actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString, >> Some(ConfigFactory.load("application.conf")), None, Some(executor)) >> materializer = ActorMaterializer()(actorSystem) >> executionContext = actorSystem.dispatcher >> //akkaHttp = Http(actorSystem) >> } >> >> override def close(): Unit = { >> actorSystem.terminate() >> } >> >> override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, >> String)]): Unit = { >> val start = str.toLong >> val delta = System.currentTimeMillis() - start >> resultFuture.complete(Iterable((str, s"${delta}"))) >> } >> } >> >> >> object Job { >> def main(args: Array[String]): Unit = { >> // set up the execution environment >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> //env.enableCheckpointing(10) >> env.setParallelism(1) >> >> val someIntegers: DataStream[Long] = env.generateSequence(1, 100) >> //someIntegers.map { _ => System.currentTimeMillis()}.map{ s => >> System.currentTimeMillis()-s}.print() >> val x : DataStream[String] = someIntegers.map( _ => >> s"${System.currentTimeMillis()}") >> val resultStream: DataStream[(String, String)] = >> AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L, >> TimeUnit.MILLISECONDS, 100)//.setParallelism(16) >> //AsyncDataStream.unorderedWait(data , new >> AsyncDatabaseRequest,3L,TimeUnit.SECONDS) >> resultStream.print() >> println(env.getConfig.getAutoWatermarkInterval) >> env.execute("Flink Scala API Skeleton") >> } >> } >> >> is this normal behavior? >> >> >> On Mon, Jul 6, 2020 at 2:45 PM Benchao Li <libenc...@apache.org> wrote: >> >>> Hi Mark, >>> >>> According to your data, I think the config of AsyncOperator is OK. >>> There is one more config that might affect the throughput of >>> AsyncOperator, it's watermark. >>> Because unordered async operator still keeps the order between >>> watermarks, did you use >>> event time in your job, and if yes, what's the watermark interval in >>> your job? >>> >>> Mark Zitnik <mark.zit...@gmail.com> 于2020年7月5日周日 下午7:44写道: >>> >>>> Hi Benchao >>>> >>>> The capacity is 100 >>>> Parallelism is 8 >>>> Rpc req is 20ms >>>> >>>> Thanks >>>> >>>> >>>> On Sun, 5 Jul 2020, 6:16 Benchao Li, <libenc...@apache.org> wrote: >>>> >>>>> Hi Mark, >>>>> >>>>> Could you give more details about your Flink job? >>>>> - the capacity of AsyncDataStream >>>>> - the parallelism of AsyncDataStream operator >>>>> - the time of per blocked rpc request >>>>> >>>>> Mark Zitnik <mark.zit...@gmail.com> 于2020年7月5日周日 上午3:48写道: >>>>> >>>>>> Hi >>>>>> >>>>>> In my flink application I need to enrich data using >>>>>> AsyncDataStream.unorderedWait >>>>>> but I am getting poor perforce at the beginning I was just working >>>>>> with http call, but I have switched to grpc, I running on 8 core node and >>>>>> getting total of 3200 events per second my service that I am using is not >>>>>> fully utilized and can produce up to 10000 req/seq >>>>>> >>>>>> Flink job flow >>>>>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> >>>>>> write to Kafka >>>>>> >>>>>> Using Akkad grpc code written in scala >>>>>> >>>>>> Thanks >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Best, >>>>> Benchao Li >>>>> >>>> >>> >>> -- >>> >>> Best, >>> Benchao Li >>> >> > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >