[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311792#comment-16311792 ]
ASF GitHub Bot commented on FLINK-7797: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159700432 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -344,23 +434,42 @@ abstract class TimeBoundedStreamInnerJoin( * @param removeLeft whether to remove the left rows */ private def removeExpiredRows( + collector: EmitAwareCollector, expirationTime: Long, - rowCache: MapState[Long, JList[Row]], + rowCache: MapState[Long, JList[JTuple2[Row, Boolean]]], timerState: ValueState[Long], ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, removeLeft: Boolean): Unit = { - val keysIterator = rowCache.keys().iterator() + val iterator = rowCache.iterator() var earliestTimestamp: Long = -1L - var rowTime: Long = 0L // We remove all expired keys and do not leave the loop early. // Hence, we do a full pass over the state. - while (keysIterator.hasNext) { - rowTime = keysIterator.next + while (iterator.hasNext) { + val entry = iterator.next + val rowTime = entry.getKey if (rowTime <= expirationTime) { - keysIterator.remove() + if ((joinType == JoinType.RIGHT_OUTER && !removeLeft) || --- End diff -- Refactor to ``` if (removeLeft && (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER)) { val rows = entry.getValue var i = 0 while (i < rows.size) { val tuple = rows.get(i) if (!tuple.f1) { // Emit a null padding result if the row has never been successfully joined. collector.collect(paddingUtil.padLeft(tuple.f0)) } i += 1 } } else if (!removeLeft && (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER)) { val rows = entry.getValue var i = 0 while (i < rows.size) { val tuple = rows.get(i) if (!tuple.f1) { // Emit a null padding result if the row has never been successfully joined. collector.collect(paddingUtil.padRight(tuple.f0)) } i += 1 } } iterator.remove() ``` to reduce the number of conditions. > Add support for windowed outer joins for streaming tables > --------------------------------------------------------- > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: Fabian Hueske > Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)