Dear Flink community, I started to use Async Functions in Scala, Flink 1.2.0, in order to retrieve enriching information from MariaDB database. In order to do that, I firstly employed classical jdbc library (org.mariadb.jdbc) and it worked has expected.
Due to the blocking behavior of jdbc, I'm trying to use this library https://github.com/mauricio/postgresql-async/tree/master/mysql-async which promises to offer a subset of features in a non-blocking fashion. Sadly I'm not able to use it. Following the async function code. * object AsyncEnricher { case class OutputType(field1: FieldType, field2: FieldType) } class AsyncEnricher(configuration: MariaDBConfig) extends AsyncFunction[InputType, OutputType] with Serializable with AutoCloseable with LazyLogging { private val queryString = s"SELECT <column> FROM [table] WHERE <column_name> = <value>;" implicit lazy val executor = ExecutionContext.fromExecutor(Executors.directExecutor()) private lazy val mariaDBClient: Connection = { val config = createConfiguration(configuration) val connection = new MySQLConnection(config) Await.result(connection.connect, 5 seconds) } override def asyncInvoke(input: InputType, collector: AsyncCollector[OutputType]): Unit = { val queryResult = mariaDBClient.sendPreparedStatement(queryString, Seq(input.fieldToSearch)) queryResult.map(_.rows) onSuccess { case Some(resultSet) => Try { resultSet.head(0).asInstanceOf[FieldType] } match { case Success(value) => collector.collect(Iterable(OutputType(value, value))) case Failure(e) => logger.error(s"retrieving value from MariaDB raised $e: $queryString executed") } case _ => logger.error(s"value not found: $queryString executed") } queryResult onFailure { case e: Throwable => logger.error(s"retrieving location volume from MariaDB raised $e: $queryString executed") } } override def close(): Unit = { Try(mariaDBClient.disconnect).recover { case t: Throwable => logger.info(s"MariaDB cannot be closed - ${t.getMessage}") } } } * Follows the stack / TimerException{java.lang.IllegalStateException: Timer service is shut down} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: Timer service is shut down at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.registerTimer(SystemProcessingTimeService.java:118) at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:82) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218) ... 7 more java.lang.NullPointerException at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:343) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:320) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) / I think it's involving connection.connect returning object which is a Future and so the Await. This is different than jdbc driver, which worked like a charm. I tried to move away the await from the lazy val. Can't wait for your opinion. Thank you so much in advance. Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Functions-and-Scala-async-client-for-mySql-MariaDB-database-connection-tp12469.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.