twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237399713
########## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala ########## @@ -31,30 +31,27 @@ import org.apache.flink.types.Row class IterativeConditionRunner( name: String, code: String) - extends IterativeCondition[Row] - with Compiler[IterativeCondition[Row]] + extends RichIterativeCondition[Row] + with Compiler[RichIterativeCondition[Row]] with Logging { - @transient private var function: IterativeCondition[Row] = _ + @transient private var function: RichIterativeCondition[Row] = _ - def init(): Unit = { + override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code") - // We cannot get user's classloader currently, see FLINK-6938 for details - val clazz = compile(Thread.currentThread().getContextClassLoader, name, code) + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating IterativeCondition.") function = clazz.newInstance() - // TODO add logic for opening and closing the function once it can be a RichFunction + FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) + FunctionUtils.openFunction(function, parameters) } override def filter(value: Row, ctx: IterativeCondition.Context[Row]): Boolean = { function.filter(value, ctx) } - @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream): Unit = { - in.defaultReadObject() - if (function == null) { - init() - } + override def close(): Unit = { + super.close() + function.close() Review comment: Use `org.apache.flink.api.common.functions.util.FunctionUtils#closeFunction` instead? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services