Hi Benchao,

i have run this in the code:

println(env.getConfig.getAutoWatermarkInterval)

and got 200 i do fully understand how watermarks and AsyncOperator operator
works, but
i have decided to make a simple test that should evaluate the time it takes
to enter to the asyncInvoke method  and it looks that it takes about 80ms
witch is longer than the time it take to get a response from my
micro-service

code below

class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, String)] {

  implicit lazy val executor: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())

  /*
  implicit val actorSystem = ActorSystem.apply("test", None, None,
Some(executor))
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = actorSystem.dispatcher


  println(materializer.system.name)
  println("start")
  */
// redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com

  // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
  var actorSystem: ActorSystem = null
  var materializer: ActorMaterializer = null
  var executionContext: ExecutionContextExecutor = null
  //var akkaHttp: HttpExt = null

  override def open(parameters: Configuration): Unit = {
    actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString,
Some(ConfigFactory.load("application.conf")), None, Some(executor))
    materializer = ActorMaterializer()(actorSystem)
    executionContext = actorSystem.dispatcher
    //akkaHttp = Http(actorSystem)
  }

  override def close(): Unit = {
    actorSystem.terminate()
  }

  override def asyncInvoke(str: String, resultFuture:
ResultFuture[(String, String)]): Unit = {
        val start = str.toLong
        val delta = System.currentTimeMillis() - start
        resultFuture.complete(Iterable((str, s"${delta}")))
  }
}


object Job {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //env.enableCheckpointing(10)
    env.setParallelism(1)

    val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
    //someIntegers.map { _ => System.currentTimeMillis()}.map{ s =>
System.currentTimeMillis()-s}.print()
    val x : DataStream[String] = someIntegers.map( _ =>
s"${System.currentTimeMillis()}")
    val resultStream: DataStream[(String, String)] =
AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L,
TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
      //AsyncDataStream.unorderedWait(data , new
AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
    resultStream.print()
    println(env.getConfig.getAutoWatermarkInterval)
    env.execute("Flink Scala API Skeleton")
  }
}

is this normal behavior?


On Mon, Jul 6, 2020 at 2:45 PM Benchao Li <libenc...@apache.org> wrote:

> Hi Mark,
>
> According to your data, I think the config of AsyncOperator is OK.
> There is one more config that might affect the throughput of
> AsyncOperator, it's watermark.
> Because unordered async operator still keeps the order between watermarks,
> did you use
> event time in your job, and if yes, what's the watermark interval in your
> job?
>
> Mark Zitnik <mark.zit...@gmail.com> 于2020年7月5日周日 下午7:44写道:
>
>> Hi Benchao
>>
>> The capacity is 100
>> Parallelism is 8
>> Rpc req is 20ms
>>
>> Thanks
>>
>>
>> On Sun, 5 Jul 2020, 6:16 Benchao Li, <libenc...@apache.org> wrote:
>>
>>> Hi Mark,
>>>
>>> Could you give more details about your Flink job?
>>> - the capacity of AsyncDataStream
>>> - the parallelism of AsyncDataStream operator
>>> - the time of per blocked rpc request
>>>
>>> Mark Zitnik <mark.zit...@gmail.com> 于2020年7月5日周日 上午3:48写道:
>>>
>>>> Hi
>>>>
>>>> In my flink application I need to enrich data using 
>>>> AsyncDataStream.unorderedWait
>>>> but I am getting poor perforce at the beginning I was just working with
>>>> http call, but I have switched to grpc, I running on 8 core node and
>>>> getting total of 3200 events per second my service that I am using is not
>>>> fully utilized and can produce up to 10000 req/seq
>>>>
>>>> Flink job flow
>>>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
>>>> to Kafka
>>>>
>>>> Using Akkad grpc code written in scala
>>>>
>>>> Thanks
>>>>
>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Best,
> Benchao Li
>

Reply via email to