[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16318351#comment-16318351 ] ASF GitHub Bot commented on FLINK-7797: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5140 > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316873#comment-16316873 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5140 Thanks for the update and excellent work. PR is good to merge :-) > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16314404#comment-16314404 ] ASF GitHub Bot commented on FLINK-7797: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5140 Hi @fhueske, thanks for your concrete suggestions! IMO, the refactorings in `TimeBoundedStreamJoin` are quite reasonable, while the refactoring for `createNegativeWindowSizeJoin()` may not be so significant as the negative window size should be taken as an exception. Anyway, I've applied them for better efficiency. 😄 Thanks, Xingcan > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311794#comment-16311794 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159719430 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -183,23 +190,48 @@ class DataStreamWindowJoin( } } - def createEmptyJoin( + def createNegativeWindowSizeJoin( --- End diff -- I think we can make this even more efficient if we implement this as: ``` def createNegativeWindowSizeJoin( joinType: JoinType, leftInput: DataStream[CRow], rightInput: DataStream[CRow], leftArity: Int, rightArity: Int, returnType: TypeInformation[CRow]): DataStream[CRow] = { // we filter all records instead of adding an empty source to preserve the watermarks val allFilter = new FlatMapFunction[CRow, CRow] with ResultTypeQueryable[CRow] { override def flatMap(value: CRow, out: Collector[CRow]): Unit = { } override def getProducedType: TypeInformation[CRow] = returnType } val leftPadder = new MapFunction[CRow, CRow] with ResultTypeQueryable[CRow] { val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity) override def map(value: CRow): CRow = new CRow(paddingUtil.padLeft(value.row), true) override def getProducedType: TypeInformation[CRow] = returnType } val rightPadder = new MapFunction[CRow, CRow] with ResultTypeQueryable[CRow] { val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity) override def map(value: CRow): CRow = new CRow(paddingUtil.padRight(value.row), true) override def getProducedType: TypeInformation[CRow] = returnType } val leftP = leftInput.getParallelism val rightP = rightInput.getParallelism joinType match { case JoinType.INNER => leftInput.flatMap(allFilter).name("Empty Inner Join").setParallelism(leftP) .union(rightInput.flatMap(allFilter).name("Empty Inner Join").setParallelism(rightP)) case JoinType.LEFT_OUTER => leftInput.map(leftPadder).name("Left Outer Join").setParallelism(leftP) .union(rightInput.flatMap(allFilter).name("Left Outer Join").setParallelism(rightP)) case JoinType.RIGHT_OUTER => leftInput.flatMap(allFilter).name("Right Outer Join").setParallelism(leftP) .union(rightInput.map(rightPadder).name("Right Outer Join").setParallelism(rightP)) case JoinType.FULL_OUTER => leftInput.map(leftPadder).name("Full Outer Join").setParallelism(leftP) .union(rightInput.map(rightPadder).name("Full Outer Join").setParallelism(rightP)) } } ``` We also need to make `OuterJoinPaddingUtil` extend `java.io.Serializable` for this. > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311793#comment-16311793 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159695882 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -196,35 +181,38 @@ abstract class TimeBoundedStreamJoin( if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) { val rightRows = rightEntry.getValue var i = 0 - var markEmitted = false + var entryUpdated = false while (i < rightRows.size) { -joinCollector.resetThisTurn() +joinCollector.reset() val tuple = rightRows.get(i) joinFunction.join(leftRow, tuple.f0, joinCollector) -if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) { - if (!tuple.f1 && joinCollector.everEmittedThisTurn) { -// Mark the right row as being successfully joined and emitted. -tuple.f1 = true -markEmitted = true +if (joinCollector.emitted) { --- End diff -- change to ``` emitted = emitted || joinCollector.emitted if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) { if (!tuple.f1 && joinCollector.emitted) { // Mark the right row as being successfully joined and emitted. tuple.f1 = true entryUpdated = true } } ``` to avoid a condition for inner and left joins > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311792#comment-16311792 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159700432 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -344,23 +434,42 @@ abstract class TimeBoundedStreamInnerJoin( * @param removeLeft whether to remove the left rows */ private def removeExpiredRows( + collector: EmitAwareCollector, expirationTime: Long, - rowCache: MapState[Long, JList[Row]], + rowCache: MapState[Long, JList[JTuple2[Row, Boolean]]], timerState: ValueState[Long], ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, removeLeft: Boolean): Unit = { -val keysIterator = rowCache.keys().iterator() +val iterator = rowCache.iterator() var earliestTimestamp: Long = -1L -var rowTime: Long = 0L // We remove all expired keys and do not leave the loop early. // Hence, we do a full pass over the state. -while (keysIterator.hasNext) { - rowTime = keysIterator.next +while (iterator.hasNext) { + val entry = iterator.next + val rowTime = entry.getKey if (rowTime <= expirationTime) { -keysIterator.remove() +if ((joinType == JoinType.RIGHT_OUTER && !removeLeft) || --- End diff -- Refactor to ``` if (removeLeft && (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER)) { val rows = entry.getValue var i = 0 while (i < rows.size) { val tuple = rows.get(i) if (!tuple.f1) { // Emit a null padding result if the row has never been successfully joined. collector.collect(paddingUtil.padLeft(tuple.f0)) } i += 1 } } else if (!removeLeft && (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER)) { val rows = entry.getValue var i = 0 while (i < rows.size) { val tuple = rows.get(i) if (!tuple.f1) { // Emit a null padding result if the row has never been successfully joined. collector.collect(paddingUtil.padRight(tuple.f0)) } i += 1 } } iterator.remove() ``` to reduce the number of conditions. > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311789#comment-16311789 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159698104 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -437,15 +427,14 @@ abstract class TimeBoundedStreamJoin( * Remove the expired rows. Register a new timer if the cache still holds valid rows * after the cleaning up. * -* @param collector the collector to emit results * @param expirationTime the expiration time for this cache * @param rowCache the row cache * @param timerState timer state for the opposite stream * @param ctxthe context to register the cleanup timer * @param removeLeft whether to remove the left rows */ private def removeExpiredRows( - collector: Collector[Row], --- End diff -- Why did you change the type? `EmitAwareCollector` is a `Collector[Row]`. > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311790#comment-16311790 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159697707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -437,15 +427,14 @@ abstract class TimeBoundedStreamJoin( * Remove the expired rows. Register a new timer if the cache still holds valid rows * after the cleaning up. * -* @param collector the collector to emit results --- End diff -- Don't remove parameter documentation for `collector` > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311791#comment-16311791 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159697015 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -288,34 +276,37 @@ abstract class TimeBoundedStreamJoin( if (leftTime >= leftQualifiedLowerBound && leftTime <= leftQualifiedUpperBound) { val leftRows = leftEntry.getValue var i = 0 - var markEmitted = false + var entryUpdated = false while (i < leftRows.size) { -joinCollector.resetThisTurn() +joinCollector.reset() val tuple = leftRows.get(i) joinFunction.join(tuple.f0, rightRow, joinCollector) -if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) { - if (!tuple.f1 && joinCollector.everEmittedThisTurn) { -// Mark the left row as being successfully joined and emitted. -tuple.f1 = true -markEmitted = true +if (joinCollector.emitted) { --- End diff -- same as for `processElement1()` > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307886#comment-16307886 ] ASF GitHub Bot commented on FLINK-7797: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5140 Hi @fhueske, thanks for your review. I've made the following changes to the PR. 1. Fixes the "wrong sides" problem in `TimeBoundedStreamJoin`. 2. Adds the logic for outer-joins with negative window size in `DataStreamWindowJoin`. 3. Refines the `EmitAwareCollector` according to your suggestions. 4. Uses a separate class `OuterJoinPaddingUtil` to deal with results padding. 5. Adds some test cases to `JoinITCase` and `JoinHarnessTest`. 6. Other minor changes to attribute/method/class names. Thanks, Xingcan > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16306321#comment-16306321 ] ASF GitHub Bot commented on FLINK-7797: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159069304 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin( if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) { val rightRows = rightEntry.getValue var i = 0 + var markEmitted = false while (i < rightRows.size) { -joinFunction.join(leftRow, rightRows.get(i), cRowWrapper) +joinCollector.resetThisTurn() +val tuple = rightRows.get(i) +joinFunction.join(leftRow, tuple.f0, joinCollector) +if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) { + if (!tuple.f1 && joinCollector.everEmittedThisTurn) { +// Mark the right row as being successfully joined and emitted. +tuple.f1 = true +markEmitted = true + } +} i += 1 } + if (markEmitted) { +// Write back the edited entry (mark emitted) for the right cache. +rightEntry.setValue(rightRows) + } } if (rightTime <= rightExpirationTime) { + if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) { --- End diff -- Yes, I should have added a harness test for that. > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16306318#comment-16306318 ] ASF GitHub Bot commented on FLINK-7797: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159069074 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Collector to track whether there's a joined result. + */ +class JoinAwareCollector extends Collector[Row]{ + + private var emitted = false + private var emittedThisTurn = false + private var innerCollector: Collector[CRow] = _ + private val cRow: CRow = new CRow() --- End diff -- I'll add a function to set this value in the Collector. > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305966#comment-16305966 ] ASF GitHub Bot commented on FLINK-7797: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159023858 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -142,50 +143,47 @@ class DataStreamWindowJoin( s"${joinConditionToString(schema.relDataType, joinCondition, getExpressionString)}), " + s"join: (${joinSelectionToString(schema.relDataType)})" -joinType match { - case JoinRelType.INNER => -if (relativeWindowSize < 0) { - LOG.warn(s"The relative window size $relativeWindowSize is negative," + -" please check the join conditions.") - createEmptyInnerJoin(leftDataStream, rightDataStream, returnTypeInfo) -} else { - if (isRowTime) { -createRowTimeInnerJoin( - leftDataStream, - rightDataStream, - returnTypeInfo, - joinOpName, - joinFunction.name, - joinFunction.code, - leftKeys, - rightKeys -) - } else { -createProcTimeInnerJoin( - leftDataStream, - rightDataStream, - returnTypeInfo, - joinOpName, - joinFunction.name, - joinFunction.code, - leftKeys, - rightKeys -) - } -} - case JoinRelType.FULL => -throw new TableException( - "Full join between stream and stream is not supported yet.") - case JoinRelType.LEFT => -throw new TableException( - "Left join between stream and stream is not supported yet.") - case JoinRelType.RIGHT => -throw new TableException( - "Right join between stream and stream is not supported yet.") +val flinkJoinType = joinType match { + case JoinRelType.INNER => JoinType.INNER + case JoinRelType.FULL => JoinType.FULL_OUTER + case JoinRelType.LEFT => JoinType.LEFT_OUTER + case JoinRelType.RIGHT => JoinType.RIGHT_OUTER +} + +if (relativeWindowSize < 0) { + LOG.warn(s"The relative window size $relativeWindowSize is negative," + +" please check the join conditions.") + createEmptyJoin(leftDataStream, rightDataStream, returnTypeInfo) --- End diff -- Yes, your are right. I'll add this part. > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305847#comment-16305847 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159009795 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -62,15 +65,23 @@ abstract class TimeBoundedStreamInnerJoin( with Compiler[FlatJoinFunction[Row, Row, Row]] with Logging { - private var cRowWrapper: CRowWrappingCollector = _ + private val leftArity = leftType.getArity + private val rightArity = rightType.getArity + private val resultArity = leftArity + rightArity + + // two reusable padding results + private val leftNullPaddingResult = new Row(resultArity) --- End diff -- I think we can move the code to generate padded results into a util class that can be reused by other joins. > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305855#comment-16305855 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159012517 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -241,17 +288,66 @@ abstract class TimeBoundedStreamInnerJoin( if (leftTime >= leftQualifiedLowerBound && leftTime <= leftQualifiedUpperBound) { val leftRows = leftEntry.getValue var i = 0 + var markEmitted = false while (i < leftRows.size) { -joinFunction.join(leftRows.get(i), rightRow, cRowWrapper) +joinCollector.resetThisTurn() +val tuple = leftRows.get(i) +joinFunction.join(tuple.f0, rightRow, joinCollector) +if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) { + if (!tuple.f1 && joinCollector.everEmittedThisTurn) { +// Mark the left row as being successfully joined and emitted. +tuple.f1 = true +markEmitted = true + } +} i += 1 } + if (markEmitted) { +// Write back the edited entry (mark emitted) for the right cache. +leftEntry.setValue(leftRows) + } } if (leftTime <= leftExpirationTime) { + if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) { --- End diff -- `JoinType.RIGHT_OUTER` should be `JoinType.LEFT_OUTER` > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305851#comment-16305851 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r158737260 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -142,50 +143,47 @@ class DataStreamWindowJoin( s"${joinConditionToString(schema.relDataType, joinCondition, getExpressionString)}), " + s"join: (${joinSelectionToString(schema.relDataType)})" -joinType match { - case JoinRelType.INNER => -if (relativeWindowSize < 0) { - LOG.warn(s"The relative window size $relativeWindowSize is negative," + -" please check the join conditions.") - createEmptyInnerJoin(leftDataStream, rightDataStream, returnTypeInfo) -} else { - if (isRowTime) { -createRowTimeInnerJoin( - leftDataStream, - rightDataStream, - returnTypeInfo, - joinOpName, - joinFunction.name, - joinFunction.code, - leftKeys, - rightKeys -) - } else { -createProcTimeInnerJoin( - leftDataStream, - rightDataStream, - returnTypeInfo, - joinOpName, - joinFunction.name, - joinFunction.code, - leftKeys, - rightKeys -) - } -} - case JoinRelType.FULL => -throw new TableException( - "Full join between stream and stream is not supported yet.") - case JoinRelType.LEFT => -throw new TableException( - "Left join between stream and stream is not supported yet.") - case JoinRelType.RIGHT => -throw new TableException( - "Right join between stream and stream is not supported yet.") +val flinkJoinType = joinType match { + case JoinRelType.INNER => JoinType.INNER + case JoinRelType.FULL => JoinType.FULL_OUTER + case JoinRelType.LEFT => JoinType.LEFT_OUTER + case JoinRelType.RIGHT => JoinType.RIGHT_OUTER +} + +if (relativeWindowSize < 0) { + LOG.warn(s"The relative window size $relativeWindowSize is negative," + +" please check the join conditions.") + createEmptyJoin(leftDataStream, rightDataStream, returnTypeInfo) --- End diff -- Empty outer joins need to be handled differently than empty inner joins because the records of the outer side(s) must be preserved and padded with nulls. Hence, we need to pass the join type and the generated code. > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305850#comment-16305850 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159012976 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -382,6 +498,29 @@ abstract class TimeBoundedStreamInnerJoin( } } + /** +* Return a padding result with the given left/right row. +* @param row the row to pad +* @param paddingLeft pad left or right +* @return the null padding result +*/ + private def paddingResult(row: Row, paddingLeft: Boolean): Row = { --- End diff -- Move this method into a util class and split it into two method (`padLeft` and `padRight`). The methods should be `final`. > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305856#comment-16305856 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159013510 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StreamWindowJoinHarnessTest.scala --- @@ -20,20 +20,25 @@ package org.apache.flink.table.runtime.harness import java.lang.{Long => JLong} import java.util.concurrent.ConcurrentLinkedQueue +import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.runtime.streamrecord.StreamRecord import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness import org.apache.flink.table.api.Types import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks, TupleRowKeySelector} -import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamInnerJoin, RowTimeBoundedStreamInnerJoin} +import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamJoin, RowTimeBoundedStreamJoin} import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row import org.junit.Assert.assertEquals import org.junit.Test -class JoinHarnessTest extends HarnessTestBase { +/** + * Since the runtime logic for different stream window joins are identical, we only test on + * inner join. + */ +class StreamWindowJoinHarnessTest extends HarnessTestBase { --- End diff -- We should also add harness tests for the outer joins. These are the only tests that can test certain edge cases because the order of inputs can be precisely controlled. > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305854#comment-16305854 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159011797 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Collector to track whether there's a joined result. + */ +class JoinAwareCollector extends Collector[Row]{ --- End diff -- I would make this class simpler and move some of the logic into the join class. Instead of tracking if something was emitted across multiple resets, I'd just check if something was emitted since a single reset. The join can have a flag that checks the input row was emitted by joining against the state. Moreover, I'd make some variables public and remove the accessors to reduce the number of method calls. If we do this, we don't need - `setCollector` (can be set by directly modifying the public var) - `emittedThisTurn` (we only need one emission flag) - `resetThisTurn()` (we only need one emission flag) - `everEmitted` (emitted can be directly accessed as public var) - `everEmittedThisTurn` (only one emission flag) - `collectWithoutNotifying` (we can simply emit because we don't have an emission flag across multiple resets) > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305859#comment-16305859 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159013964 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -461,6 +462,302 @@ class JoinITCase extends StreamingWithStateTestBase { StreamITCase.compareWithList(expected) } + // Tests for left outer join + @Test + def testProcTimeLeftOuterJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear +env.setParallelism(1) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 LEFT OUTER JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND +|t2.proctime + INTERVAL '3' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + } + + @Test + def testRowTimeLeftOuterJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.clear + +val sqlQuery = + """ +|SELECT t2.key, t2.id, t1.id +|FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON +| t1.key = t2.key AND +| t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND +|t2.rt + INTERVAL '6' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(String, String, Long)] +// for boundary test +data1.+=(("A", "L-1", 1000L)) +data1.+=(("A", "L-2", 2000L)) +data1.+=(("B", "L-4", 4000L)) +data1.+=(("A", "L-6", 6000L)) +data1.+=(("C", "L-7", 7000L)) +data1.+=(("A", "L-10", 1L)) +data1.+=(("A", "L-12", 12000L)) +data1.+=(("A", "L-20", 2L)) + +val data2 = new mutable.MutableList[(String, String, Long)] --- End diff -- Add a row to the right data set such that one left row joins with two right rows. > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305857#comment-16305857 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159014072 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -461,6 +462,302 @@ class JoinITCase extends StreamingWithStateTestBase { StreamITCase.compareWithList(expected) } + // Tests for left outer join + @Test + def testProcTimeLeftOuterJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear +env.setParallelism(1) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 LEFT OUTER JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND +|t2.proctime + INTERVAL '3' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + } + + @Test + def testRowTimeLeftOuterJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.clear + +val sqlQuery = + """ +|SELECT t2.key, t2.id, t1.id +|FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON +| t1.key = t2.key AND +| t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND +|t2.rt + INTERVAL '6' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(String, String, Long)] +// for boundary test +data1.+=(("A", "L-1", 1000L)) +data1.+=(("A", "L-2", 2000L)) +data1.+=(("B", "L-4", 4000L)) +data1.+=(("A", "L-6", 6000L)) +data1.+=(("C", "L-7", 7000L)) +data1.+=(("A", "L-10", 1L)) +data1.+=(("A", "L-12", 12000L)) +data1.+=(("A", "L-20", 2L)) + +val data2 = new mutable.MutableList[(String, String, Long)] +data2.+=(("A", "R-6", 6000L)) +data2.+=(("B", "R-7", 7000L)) +data2.+=(("D", "R-8", 8000L)) + +val t1 = env.fromCollection(data1) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) +val t2 = env.fromCollection(data2) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() +val expected = new java.util.ArrayList[String] +expected.add("A,R-6,L-1") +expected.add("A,R-6,L-2") +expected.add("A,R-6,L-6") +expected.add("A,R-6,L-10") +expected.add("A,R-6,L-12") +expected.add("B,R-7,L-4") +expected.add("null,null,L-7") +expected.add("null,null,L-20") +StreamITCase.compareWithList(expected) + } + + // Tests for right outer join + @Test + def testProcTimeRightOuterJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear +env.setParallelism(1) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 RIGHT OUTER JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.proctime BETWEEN t2.proctime - I
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305852#comment-16305852 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159011164 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin( if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) { val rightRows = rightEntry.getValue var i = 0 + var markEmitted = false while (i < rightRows.size) { -joinFunction.join(leftRow, rightRows.get(i), cRowWrapper) +joinCollector.resetThisTurn() +val tuple = rightRows.get(i) +joinFunction.join(leftRow, tuple.f0, joinCollector) +if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) { + if (!tuple.f1 && joinCollector.everEmittedThisTurn) { +// Mark the right row as being successfully joined and emitted. +tuple.f1 = true +markEmitted = true + } +} i += 1 } + if (markEmitted) { +// Write back the edited entry (mark emitted) for the right cache. +rightEntry.setValue(rightRows) + } } if (rightTime <= rightExpirationTime) { + if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) { --- End diff -- This should be `joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER` because we preserve the records of the right side. This should be covered by a harness test. > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305858#comment-16305858 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159012674 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -268,13 +364,16 @@ abstract class TimeBoundedStreamInnerJoin( ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { +joinCollector.setCollector(out) +joinCollector.reset() --- End diff -- No need to reset > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305853#comment-16305853 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159010195 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -143,29 +172,14 @@ abstract class TimeBoundedStreamInnerJoin( ctx: CoProcessFunction[CRow, CRow, CRow]#Context, out: Collector[CRow]): Unit = { +joinCollector.setCollector(out) --- End diff -- Directly set the variable to avoid the method call (like `CRowWrappingCollector`). > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305849#comment-16305849 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159010693 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin( if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) { val rightRows = rightEntry.getValue var i = 0 + var markEmitted = false --- End diff -- rename to `entryUpdated` > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305860#comment-16305860 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159014190 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -133,6 +149,19 @@ abstract class TimeBoundedStreamInnerJoin( val rightTimerStateDesc: ValueStateDescriptor[Long] = new ValueStateDescriptor[Long]("InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) + + +// Initialize the two reusable padding results. +var i = 0 +while (i < leftArity) { --- End diff -- Move to util class > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305848#comment-16305848 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r158738627 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Collector to track whether there's a joined result. + */ +class JoinAwareCollector extends Collector[Row]{ + + private var emitted = false + private var emittedThisTurn = false + private var innerCollector: Collector[CRow] = _ + private val cRow: CRow = new CRow() --- End diff -- explicitly set `change` value of`CRow` > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16285756#comment-16285756 ] ASF GitHub Bot commented on FLINK-7797: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5140 Thanks for the PR @xccui. I'll try to have a look at it sometime this week. Best, Fabian > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284543#comment-16284543 ] ASF GitHub Bot commented on FLINK-7797: --- GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5140 [FLINK-7797] [table] Add support for windowed outer joins for streaming tables ## What is the purpose of the change This PR adds support for windowed outer joins for streaming tables. ## Brief change log - Adjusts the plan translation logic to accept stream window outer join. - Adheres an ever emitted flag to each row. When a row is removed from the cache (or detected as not cached), a null padding join result will be emitted if necessary. - Adds a custom `JoinAwareCollector` to track whether there's a successfully joined result for both sides in each join loop. - Adds table/SQL translation tests, and also join integration tests. Since the runtime logic is built on the existing window inner join, no new harness tests are added. - Updates the SQL/Table API docs. ## Verifying this change This PR can be verified by the cases added in `JoinTest` and `JoinITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (**yes**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (**yes**) - If yes, how is the feature documented? (**removes the restriction notes**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-7797 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5140.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5140 commit 34d3fde8049ec407849b61901acd8258a6a1f919 Author: Xingcan Cui Date: 2017-12-07T17:28:40Z [FLINK-7797] [table] Add support for windowed outer joins for streaming tables > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276856#comment-16276856 ] Fabian Hueske commented on FLINK-7797: -- This sounds good! +1 for adding the boolean flag. I think we can have this also for inner joins to reduce code duplication as it adds only very little overhead. Fabian > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276772#comment-16276772 ] Xingcan Cui commented on FLINK-7797: Hi [~fhueske], I'd like to share my basic thoughts about the implementation. For each cached record, an extra boolean flag will be added to indicate whether the record has ever been successfully joined. We need a custom data collector for the internal join function to update this flag. When a record is removed from the cache and its flag is {{false}}, a null padding result will be emitted. Since the majority codes for the window inner join could be reused, maybe it's better to do some refactorings first. Other details will be considered (and discussed) during the implementation. What do you think of the plan? Best, Xingcan > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262185#comment-16262185 ] Fabian Hueske commented on FLINK-7797: -- Excellent! Thank you! :-) > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262178#comment-16262178 ] Xingcan Cui commented on FLINK-7797: Hi [~fhueske], I'd like to work on it. :D > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262171#comment-16262171 ] Fabian Hueske commented on FLINK-7797: -- [~xccui], you implemented the INNER JOIN. Do you want to work on this issue for Flink 1.5? Thanks, Fabian > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)