Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131761767
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala
 ---
    @@ -18,42 +18,54 @@
     
     package org.apache.flink.table.runtime
     
    -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.api.java.typeutils.ResultTypeQueryable
     import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.streaming.api.operators.TimestampedCollector
     import org.apache.flink.table.codegen.Compiler
     import org.apache.flink.table.runtime.types.CRow
     import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
     import org.slf4j.LoggerFactory
     
     /**
    -  * MapRunner with [[CRow]] output.
    +  * ProcessRunner with [[CRow]] output.
       */
    -class CRowOutputMapRunner(
    +class CRowOutputProcessRunner(
         name: String,
         code: String,
         @transient var returnType: TypeInformation[CRow])
    -  extends RichMapFunction[Any, CRow]
    +  extends ProcessFunction[Any, CRow]
       with ResultTypeQueryable[CRow]
    -  with Compiler[MapFunction[Any, Row]] {
    +  with Compiler[ProcessFunction[Any, Row]] {
     
       val LOG = LoggerFactory.getLogger(this.getClass)
     
    -  private var function: MapFunction[Any, Row] = _
    -  private var outCRow: CRow = _
    +  private var function: ProcessFunction[Any, Row] = _
    +  private var cRowWrapper: CRowWrappingCollector = _
     
       override def open(parameters: Configuration): Unit = {
         LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
         val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, 
code)
         LOG.debug("Instantiating MapFunction.")
         function = clazz.newInstance()
    -    outCRow = new CRow(null, true)
    +
    +    this.cRowWrapper = new CRowWrappingCollector()
    +    this.cRowWrapper.setChange(true)
       }
     
    -  override def map(in: Any): CRow = {
    -    outCRow.row = function.map(in)
    -    outCRow
    +  override def processElement(
    +      in: Any,
    +      ctx: ProcessFunction[Any, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    // remove timestamp from stream record
    +    val tc = out.asInstanceOf[TimestampedCollector[_]]
    --- End diff --
    
    It is not strictly required but reduces the serialization overhead by one 
Long value.
    I added this to most functions that introduce a timestamp (ProcessFunction) 
but would also be OK to remove it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to