Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5327#discussion_r182445400
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CRowWrappingMultiOutputCollector.scala
 ---
    @@ -16,35 +16,61 @@
      * limitations under the License.
      */
     
    -package org.apache.flink.table.runtime
    +package org.apache.flink.table.runtime.join
     
     import org.apache.flink.table.runtime.types.CRow
     import org.apache.flink.types.Row
     import org.apache.flink.util.Collector
     
     /**
    -  * The collector to wrap a [[Row]] into a [[CRow]] and collect it 
multiple times.
    +  * The collector to wrap a [[Row]] into a [[CRow]] and collect it 
multiple times. This collector
    +  * can also used to count output record number and do lazy output.
       */
     class CRowWrappingMultiOutputCollector() extends Collector[Row] {
     
       private var out: Collector[CRow] = _
    -  private val outCRow: CRow = new CRow()
    +  private val outCRow: CRow = new CRow(null, true)
    +  // times for collect
       private var times: Long = 0L
    +  // count how many records have been emitted
    +  private var emitCnt: Long = 0L
    +  // don't collect to downstream if set lazyOutput to true
    +  private var lazyOutput: Boolean = false
     
       def setCollector(collector: Collector[CRow]): Unit = this.out = collector
     
       def setChange(change: Boolean): Unit = this.outCRow.change = change
     
    +  def setRow(row: Row): Unit = this.outCRow.row = row
    +
    +  def getRow(): Row = this.outCRow.row
    +
       def setTimes(times: Long): Unit = this.times = times
     
    +  def setEmitCnt(emitted: Long): Unit = this.emitCnt = emitted
    +
    +  def getEmitCnt(): Long = emitCnt
    +
    +  def setLazyOutput(lazyOutput: Boolean): Unit = this.lazyOutput = 
lazyOutput
    +
       override def collect(record: Row): Unit = {
         outCRow.row = record
    -    var i: Long = 0L
    -    while (i < times) {
    -      out.collect(outCRow)
    -      i += 1
    +    if (!lazyOutput) {
    +      emitCnt += times
    +      var i: Long = 0L
    +      while (i < times) {
    +        out.collect(outCRow)
    +        i += 1
    +      }
         }
       }
     
    +  def reset(): Unit = {
    +    this.outCRow.change = true
    --- End diff --
    
    Remove this line. The change must be set after every reset call anyway.


---

Reply via email to