Hi Andy,

without being an expert of Akka's http client, I think you should not
create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`.
What I would recommend you instead is to implement a `RichAsyncFunction`
with a transient field for `ActorMaterializer` which you initialize in the
`RichAsyncFunction#open` method. That way you only create the
`ActorMaterialier` on the `TaskManager` where the operator is executed and
solve the problem of serializability and you make it much more efficient
because you don't create a new `ActorSystem` for every request.

Cheers,
Till

On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <a...@parcelperform.com> wrote:

> Hi guys,
>
> I’m try to decide which http client to go with Flink, currently I tested
> with scalaj and akka http client and both work ok with our current dev
> environment.
> For scalaj its is pretty straight forward since its is just calling an
> http request with its timeout.
>
> For akka http client its a bit more complicated (I’m new to scala and
> all), so I’m asking if am I doing it right by create a AsyncFunction like
> this
> ```
>
> class AsyncHttpClient( args: Array[String] = Array()) extends 
> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>
>   override def asyncInvoke(input: Shipment, resultFuture: 
> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>
>     PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>     implicit val system = ActorSystem("my-system")
>     implicit val executionContext = system.dispatcher
>     implicit val materializer: ActorMaterializer = ActorMaterializer()
>     val resultFutureRequested: Future[HttpResponse] = 
> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json";))
>     PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>
>
>     resultFutureRequested.onComplete {
>       case Success(res) => {
>         resultFuture.complete(Iterable(Right(res.entity)))
>       }
>       case Failure(x)   => {
>         resultFuture.complete(Iterable(Left(x)))
>       }
>     }
>
>   }
>
>   override def timeout(input: Shipment, resultFuture: 
> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>     resultFuture.complete(Iterable(Left(new TimeoutException("Async function 
> call has timed out."))))
>   }
> }
>
> ```
> I notice that I have to implicit create a bunch of variable inside the
> asyncInvoke method. I’m not sure if I’m doing it right, or just adding the
> overhead. I did try to init them in the constructor of the class but the
> compiler just throw a bunch of Not implemented Serializer error.
>
> My lib:
>    "com.typesafe.akka" %% "akka-http" % "10.1.8",
>   "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,
>
> My flink:
> scala 2.12
> flink 1.70
>
>
>
> Any reply are appreciated!
>
> Thanks a bunch
>
> Andy,
>
>
>
>

Reply via email to