twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237398248
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
 ##########
 @@ -31,30 +31,27 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
     name: String,
     code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
     LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-    // We cannot get user's classloader currently, see FLINK-6938 for details
-    val clazz = compile(Thread.currentThread().getContextClassLoader, name, 
code)
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
     LOG.debug("Instantiating IterativeCondition.")
     function = clazz.newInstance()
-    // TODO add logic for opening and closing the function once it can be a 
RichFunction
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
 
 Review comment:
   Is there a reason why a distributed cache is not supported in a 
`CepRuntimeContext`? We don't expose many context properties in SQL UDFs but 
the distributed cache apparently seems to be important.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to