Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182445400 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CRowWrappingMultiOutputCollector.scala --- @@ -16,35 +16,61 @@ * limitations under the License. */ -package org.apache.flink.table.runtime +package org.apache.flink.table.runtime.join import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row import org.apache.flink.util.Collector /** - * The collector to wrap a [[Row]] into a [[CRow]] and collect it multiple times. + * The collector to wrap a [[Row]] into a [[CRow]] and collect it multiple times. This collector + * can also used to count output record number and do lazy output. */ class CRowWrappingMultiOutputCollector() extends Collector[Row] { private var out: Collector[CRow] = _ - private val outCRow: CRow = new CRow() + private val outCRow: CRow = new CRow(null, true) + // times for collect private var times: Long = 0L + // count how many records have been emitted + private var emitCnt: Long = 0L + // don't collect to downstream if set lazyOutput to true + private var lazyOutput: Boolean = false def setCollector(collector: Collector[CRow]): Unit = this.out = collector def setChange(change: Boolean): Unit = this.outCRow.change = change + def setRow(row: Row): Unit = this.outCRow.row = row + + def getRow(): Row = this.outCRow.row + def setTimes(times: Long): Unit = this.times = times + def setEmitCnt(emitted: Long): Unit = this.emitCnt = emitted + + def getEmitCnt(): Long = emitCnt + + def setLazyOutput(lazyOutput: Boolean): Unit = this.lazyOutput = lazyOutput + override def collect(record: Row): Unit = { outCRow.row = record - var i: Long = 0L - while (i < times) { - out.collect(outCRow) - i += 1 + if (!lazyOutput) { + emitCnt += times + var i: Long = 0L + while (i < times) { + out.collect(outCRow) + i += 1 + } } } + def reset(): Unit = { + this.outCRow.change = true --- End diff -- Remove this line. The change must be set after every reset call anyway.
---