twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237399713
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
 ##########
 @@ -31,30 +31,27 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
     name: String,
     code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
     LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-    // We cannot get user's classloader currently, see FLINK-6938 for details
-    val clazz = compile(Thread.currentThread().getContextClassLoader, name, 
code)
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
     LOG.debug("Instantiating IterativeCondition.")
     function = clazz.newInstance()
-    // TODO add logic for opening and closing the function once it can be a 
RichFunction
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(function, parameters)
   }
 
   override def filter(value: Row, ctx: IterativeCondition.Context[Row]): 
Boolean = {
     function.filter(value, ctx)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-    in.defaultReadObject()
-    if (function == null) {
-      init()
-    }
+  override def close(): Unit = {
+    super.close()
+    function.close()
 
 Review comment:
   Use `org.apache.flink.api.common.functions.util.FunctionUtils#closeFunction` 
instead?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to