[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5140 ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159695882 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -196,35 +181,38 @@ abstract class TimeBoundedStreamJoin( if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) { val rightRows = rightEntry.getValue var i = 0 - var markEmitted = false + var entryUpdated = false while (i < rightRows.size) { -joinCollector.resetThisTurn() +joinCollector.reset() val tuple = rightRows.get(i) joinFunction.join(leftRow, tuple.f0, joinCollector) -if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) { - if (!tuple.f1 && joinCollector.everEmittedThisTurn) { -// Mark the right row as being successfully joined and emitted. -tuple.f1 = true -markEmitted = true +if (joinCollector.emitted) { --- End diff -- change to ``` emitted = emitted || joinCollector.emitted if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) { if (!tuple.f1 && joinCollector.emitted) { // Mark the right row as being successfully joined and emitted. tuple.f1 = true entryUpdated = true } } ``` to avoid a condition for inner and left joins ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159719430 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -183,23 +190,48 @@ class DataStreamWindowJoin( } } - def createEmptyJoin( + def createNegativeWindowSizeJoin( --- End diff -- I think we can make this even more efficient if we implement this as: ``` def createNegativeWindowSizeJoin( joinType: JoinType, leftInput: DataStream[CRow], rightInput: DataStream[CRow], leftArity: Int, rightArity: Int, returnType: TypeInformation[CRow]): DataStream[CRow] = { // we filter all records instead of adding an empty source to preserve the watermarks val allFilter = new FlatMapFunction[CRow, CRow] with ResultTypeQueryable[CRow] { override def flatMap(value: CRow, out: Collector[CRow]): Unit = { } override def getProducedType: TypeInformation[CRow] = returnType } val leftPadder = new MapFunction[CRow, CRow] with ResultTypeQueryable[CRow] { val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity) override def map(value: CRow): CRow = new CRow(paddingUtil.padLeft(value.row), true) override def getProducedType: TypeInformation[CRow] = returnType } val rightPadder = new MapFunction[CRow, CRow] with ResultTypeQueryable[CRow] { val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity) override def map(value: CRow): CRow = new CRow(paddingUtil.padRight(value.row), true) override def getProducedType: TypeInformation[CRow] = returnType } val leftP = leftInput.getParallelism val rightP = rightInput.getParallelism joinType match { case JoinType.INNER => leftInput.flatMap(allFilter).name("Empty Inner Join").setParallelism(leftP) .union(rightInput.flatMap(allFilter).name("Empty Inner Join").setParallelism(rightP)) case JoinType.LEFT_OUTER => leftInput.map(leftPadder).name("Left Outer Join").setParallelism(leftP) .union(rightInput.flatMap(allFilter).name("Left Outer Join").setParallelism(rightP)) case JoinType.RIGHT_OUTER => leftInput.flatMap(allFilter).name("Right Outer Join").setParallelism(leftP) .union(rightInput.map(rightPadder).name("Right Outer Join").setParallelism(rightP)) case JoinType.FULL_OUTER => leftInput.map(leftPadder).name("Full Outer Join").setParallelism(leftP) .union(rightInput.map(rightPadder).name("Full Outer Join").setParallelism(rightP)) } } ``` We also need to make `OuterJoinPaddingUtil` extend `java.io.Serializable` for this. ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159697707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -437,15 +427,14 @@ abstract class TimeBoundedStreamJoin( * Remove the expired rows. Register a new timer if the cache still holds valid rows * after the cleaning up. * -* @param collector the collector to emit results --- End diff -- Don't remove parameter documentation for `collector` ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159700432 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -344,23 +434,42 @@ abstract class TimeBoundedStreamInnerJoin( * @param removeLeft whether to remove the left rows */ private def removeExpiredRows( + collector: EmitAwareCollector, expirationTime: Long, - rowCache: MapState[Long, JList[Row]], + rowCache: MapState[Long, JList[JTuple2[Row, Boolean]]], timerState: ValueState[Long], ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, removeLeft: Boolean): Unit = { -val keysIterator = rowCache.keys().iterator() +val iterator = rowCache.iterator() var earliestTimestamp: Long = -1L -var rowTime: Long = 0L // We remove all expired keys and do not leave the loop early. // Hence, we do a full pass over the state. -while (keysIterator.hasNext) { - rowTime = keysIterator.next +while (iterator.hasNext) { + val entry = iterator.next + val rowTime = entry.getKey if (rowTime <= expirationTime) { -keysIterator.remove() +if ((joinType == JoinType.RIGHT_OUTER && !removeLeft) || --- End diff -- Refactor to ``` if (removeLeft && (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER)) { val rows = entry.getValue var i = 0 while (i < rows.size) { val tuple = rows.get(i) if (!tuple.f1) { // Emit a null padding result if the row has never been successfully joined. collector.collect(paddingUtil.padLeft(tuple.f0)) } i += 1 } } else if (!removeLeft && (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER)) { val rows = entry.getValue var i = 0 while (i < rows.size) { val tuple = rows.get(i) if (!tuple.f1) { // Emit a null padding result if the row has never been successfully joined. collector.collect(paddingUtil.padRight(tuple.f0)) } i += 1 } } iterator.remove() ``` to reduce the number of conditions. ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159698104 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -437,15 +427,14 @@ abstract class TimeBoundedStreamJoin( * Remove the expired rows. Register a new timer if the cache still holds valid rows * after the cleaning up. * -* @param collector the collector to emit results * @param expirationTime the expiration time for this cache * @param rowCache the row cache * @param timerState timer state for the opposite stream * @param ctxthe context to register the cleanup timer * @param removeLeft whether to remove the left rows */ private def removeExpiredRows( - collector: Collector[Row], --- End diff -- Why did you change the type? `EmitAwareCollector` is a `Collector[Row]`. ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159697015 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -288,34 +276,37 @@ abstract class TimeBoundedStreamJoin( if (leftTime >= leftQualifiedLowerBound && leftTime <= leftQualifiedUpperBound) { val leftRows = leftEntry.getValue var i = 0 - var markEmitted = false + var entryUpdated = false while (i < leftRows.size) { -joinCollector.resetThisTurn() +joinCollector.reset() val tuple = leftRows.get(i) joinFunction.join(tuple.f0, rightRow, joinCollector) -if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) { - if (!tuple.f1 && joinCollector.everEmittedThisTurn) { -// Mark the left row as being successfully joined and emitted. -tuple.f1 = true -markEmitted = true +if (joinCollector.emitted) { --- End diff -- same as for `processElement1()` ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159069304 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin( if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) { val rightRows = rightEntry.getValue var i = 0 + var markEmitted = false while (i < rightRows.size) { -joinFunction.join(leftRow, rightRows.get(i), cRowWrapper) +joinCollector.resetThisTurn() +val tuple = rightRows.get(i) +joinFunction.join(leftRow, tuple.f0, joinCollector) +if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) { + if (!tuple.f1 && joinCollector.everEmittedThisTurn) { +// Mark the right row as being successfully joined and emitted. +tuple.f1 = true +markEmitted = true + } +} i += 1 } + if (markEmitted) { +// Write back the edited entry (mark emitted) for the right cache. +rightEntry.setValue(rightRows) + } } if (rightTime <= rightExpirationTime) { + if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) { --- End diff -- Yes, I should have added a harness test for that. ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159069074 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Collector to track whether there's a joined result. + */ +class JoinAwareCollector extends Collector[Row]{ + + private var emitted = false + private var emittedThisTurn = false + private var innerCollector: Collector[CRow] = _ + private val cRow: CRow = new CRow() --- End diff -- I'll add a function to set this value in the Collector. ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159023858 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -142,50 +143,47 @@ class DataStreamWindowJoin( s"${joinConditionToString(schema.relDataType, joinCondition, getExpressionString)}), " + s"join: (${joinSelectionToString(schema.relDataType)})" -joinType match { - case JoinRelType.INNER => -if (relativeWindowSize < 0) { - LOG.warn(s"The relative window size $relativeWindowSize is negative," + -" please check the join conditions.") - createEmptyInnerJoin(leftDataStream, rightDataStream, returnTypeInfo) -} else { - if (isRowTime) { -createRowTimeInnerJoin( - leftDataStream, - rightDataStream, - returnTypeInfo, - joinOpName, - joinFunction.name, - joinFunction.code, - leftKeys, - rightKeys -) - } else { -createProcTimeInnerJoin( - leftDataStream, - rightDataStream, - returnTypeInfo, - joinOpName, - joinFunction.name, - joinFunction.code, - leftKeys, - rightKeys -) - } -} - case JoinRelType.FULL => -throw new TableException( - "Full join between stream and stream is not supported yet.") - case JoinRelType.LEFT => -throw new TableException( - "Left join between stream and stream is not supported yet.") - case JoinRelType.RIGHT => -throw new TableException( - "Right join between stream and stream is not supported yet.") +val flinkJoinType = joinType match { + case JoinRelType.INNER => JoinType.INNER + case JoinRelType.FULL => JoinType.FULL_OUTER + case JoinRelType.LEFT => JoinType.LEFT_OUTER + case JoinRelType.RIGHT => JoinType.RIGHT_OUTER +} + +if (relativeWindowSize < 0) { + LOG.warn(s"The relative window size $relativeWindowSize is negative," + +" please check the join conditions.") + createEmptyJoin(leftDataStream, rightDataStream, returnTypeInfo) --- End diff -- Yes, your are right. I'll add this part. ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159012674 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -268,13 +364,16 @@ abstract class TimeBoundedStreamInnerJoin( ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { +joinCollector.setCollector(out) +joinCollector.reset() --- End diff -- No need to reset ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r158738627 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Collector to track whether there's a joined result. + */ +class JoinAwareCollector extends Collector[Row]{ + + private var emitted = false + private var emittedThisTurn = false + private var innerCollector: Collector[CRow] = _ + private val cRow: CRow = new CRow() --- End diff -- explicitly set `change` value of`CRow` ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159012976 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -382,6 +498,29 @@ abstract class TimeBoundedStreamInnerJoin( } } + /** +* Return a padding result with the given left/right row. +* @param row the row to pad +* @param paddingLeft pad left or right +* @return the null padding result +*/ + private def paddingResult(row: Row, paddingLeft: Boolean): Row = { --- End diff -- Move this method into a util class and split it into two method (`padLeft` and `padRight`). The methods should be `final`. ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159014190 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -133,6 +149,19 @@ abstract class TimeBoundedStreamInnerJoin( val rightTimerStateDesc: ValueStateDescriptor[Long] = new ValueStateDescriptor[Long]("InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) + + +// Initialize the two reusable padding results. +var i = 0 +while (i < leftArity) { --- End diff -- Move to util class ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159011164 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin( if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) { val rightRows = rightEntry.getValue var i = 0 + var markEmitted = false while (i < rightRows.size) { -joinFunction.join(leftRow, rightRows.get(i), cRowWrapper) +joinCollector.resetThisTurn() +val tuple = rightRows.get(i) +joinFunction.join(leftRow, tuple.f0, joinCollector) +if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) { + if (!tuple.f1 && joinCollector.everEmittedThisTurn) { +// Mark the right row as being successfully joined and emitted. +tuple.f1 = true +markEmitted = true + } +} i += 1 } + if (markEmitted) { +// Write back the edited entry (mark emitted) for the right cache. +rightEntry.setValue(rightRows) + } } if (rightTime <= rightExpirationTime) { + if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) { --- End diff -- This should be `joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER` because we preserve the records of the right side. This should be covered by a harness test. ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159012517 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -241,17 +288,66 @@ abstract class TimeBoundedStreamInnerJoin( if (leftTime >= leftQualifiedLowerBound && leftTime <= leftQualifiedUpperBound) { val leftRows = leftEntry.getValue var i = 0 + var markEmitted = false while (i < leftRows.size) { -joinFunction.join(leftRows.get(i), rightRow, cRowWrapper) +joinCollector.resetThisTurn() +val tuple = leftRows.get(i) +joinFunction.join(tuple.f0, rightRow, joinCollector) +if (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER) { + if (!tuple.f1 && joinCollector.everEmittedThisTurn) { +// Mark the left row as being successfully joined and emitted. +tuple.f1 = true +markEmitted = true + } +} i += 1 } + if (markEmitted) { +// Write back the edited entry (mark emitted) for the right cache. +leftEntry.setValue(leftRows) + } } if (leftTime <= leftExpirationTime) { + if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) { --- End diff -- `JoinType.RIGHT_OUTER` should be `JoinType.LEFT_OUTER` ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159013964 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -461,6 +462,302 @@ class JoinITCase extends StreamingWithStateTestBase { StreamITCase.compareWithList(expected) } + // Tests for left outer join + @Test + def testProcTimeLeftOuterJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear +env.setParallelism(1) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 LEFT OUTER JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND +|t2.proctime + INTERVAL '3' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + } + + @Test + def testRowTimeLeftOuterJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.clear + +val sqlQuery = + """ +|SELECT t2.key, t2.id, t1.id +|FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON +| t1.key = t2.key AND +| t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND +|t2.rt + INTERVAL '6' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(String, String, Long)] +// for boundary test +data1.+=(("A", "L-1", 1000L)) +data1.+=(("A", "L-2", 2000L)) +data1.+=(("B", "L-4", 4000L)) +data1.+=(("A", "L-6", 6000L)) +data1.+=(("C", "L-7", 7000L)) +data1.+=(("A", "L-10", 1L)) +data1.+=(("A", "L-12", 12000L)) +data1.+=(("A", "L-20", 2L)) + +val data2 = new mutable.MutableList[(String, String, Long)] --- End diff -- Add a row to the right data set such that one left row joins with two right rows. ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159013510 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StreamWindowJoinHarnessTest.scala --- @@ -20,20 +20,25 @@ package org.apache.flink.table.runtime.harness import java.lang.{Long => JLong} import java.util.concurrent.ConcurrentLinkedQueue +import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.runtime.streamrecord.StreamRecord import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness import org.apache.flink.table.api.Types import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks, TupleRowKeySelector} -import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamInnerJoin, RowTimeBoundedStreamInnerJoin} +import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamJoin, RowTimeBoundedStreamJoin} import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row import org.junit.Assert.assertEquals import org.junit.Test -class JoinHarnessTest extends HarnessTestBase { +/** + * Since the runtime logic for different stream window joins are identical, we only test on + * inner join. + */ +class StreamWindowJoinHarnessTest extends HarnessTestBase { --- End diff -- We should also add harness tests for the outer joins. These are the only tests that can test certain edge cases because the order of inputs can be precisely controlled. ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159009795 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -62,15 +65,23 @@ abstract class TimeBoundedStreamInnerJoin( with Compiler[FlatJoinFunction[Row, Row, Row]] with Logging { - private var cRowWrapper: CRowWrappingCollector = _ + private val leftArity = leftType.getArity + private val rightArity = rightType.getArity + private val resultArity = leftArity + rightArity + + // two reusable padding results + private val leftNullPaddingResult = new Row(resultArity) --- End diff -- I think we can move the code to generate padded results into a util class that can be reused by other joins. ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159010693 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin( if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) { val rightRows = rightEntry.getValue var i = 0 + var markEmitted = false --- End diff -- rename to `entryUpdated` ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159011797 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Collector to track whether there's a joined result. + */ +class JoinAwareCollector extends Collector[Row]{ --- End diff -- I would make this class simpler and move some of the logic into the join class. Instead of tracking if something was emitted across multiple resets, I'd just check if something was emitted since a single reset. The join can have a flag that checks the input row was emitted by joining against the state. Moreover, I'd make some variables public and remove the accessors to reduce the number of method calls. If we do this, we don't need - `setCollector` (can be set by directly modifying the public var) - `emittedThisTurn` (we only need one emission flag) - `resetThisTurn()` (we only need one emission flag) - `everEmitted` (emitted can be directly accessed as public var) - `everEmittedThisTurn` (only one emission flag) - `collectWithoutNotifying` (we can simply emit because we don't have an emission flag across multiple resets) ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159014072 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -461,6 +462,302 @@ class JoinITCase extends StreamingWithStateTestBase { StreamITCase.compareWithList(expected) } + // Tests for left outer join + @Test + def testProcTimeLeftOuterJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear +env.setParallelism(1) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 LEFT OUTER JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND +|t2.proctime + INTERVAL '3' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + } + + @Test + def testRowTimeLeftOuterJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.clear + +val sqlQuery = + """ +|SELECT t2.key, t2.id, t1.id +|FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON +| t1.key = t2.key AND +| t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND +|t2.rt + INTERVAL '6' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(String, String, Long)] +// for boundary test +data1.+=(("A", "L-1", 1000L)) +data1.+=(("A", "L-2", 2000L)) +data1.+=(("B", "L-4", 4000L)) +data1.+=(("A", "L-6", 6000L)) +data1.+=(("C", "L-7", 7000L)) +data1.+=(("A", "L-10", 1L)) +data1.+=(("A", "L-12", 12000L)) +data1.+=(("A", "L-20", 2L)) + +val data2 = new mutable.MutableList[(String, String, Long)] +data2.+=(("A", "R-6", 6000L)) +data2.+=(("B", "R-7", 7000L)) +data2.+=(("D", "R-8", 8000L)) + +val t1 = env.fromCollection(data1) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) +val t2 = env.fromCollection(data2) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() +val expected = new java.util.ArrayList[String] +expected.add("A,R-6,L-1") +expected.add("A,R-6,L-2") +expected.add("A,R-6,L-6") +expected.add("A,R-6,L-10") +expected.add("A,R-6,L-12") +expected.add("B,R-7,L-4") +expected.add("null,null,L-7") +expected.add("null,null,L-20") +StreamITCase.compareWithList(expected) + } + + // Tests for right outer join + @Test + def testProcTimeRightOuterJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear +env.setParallelism(1) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 RIGHT OUTER JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND +|t2.proctime + INTERVAL '3' SECOND +|""".stripMargin + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +da
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159010195 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala --- @@ -143,29 +172,14 @@ abstract class TimeBoundedStreamInnerJoin( ctx: CoProcessFunction[CRow, CRow, CRow]#Context, out: Collector[CRow]): Unit = { +joinCollector.setCollector(out) --- End diff -- Directly set the variable to avoid the method call (like `CRowWrappingCollector`). ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r158737260 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -142,50 +143,47 @@ class DataStreamWindowJoin( s"${joinConditionToString(schema.relDataType, joinCondition, getExpressionString)}), " + s"join: (${joinSelectionToString(schema.relDataType)})" -joinType match { - case JoinRelType.INNER => -if (relativeWindowSize < 0) { - LOG.warn(s"The relative window size $relativeWindowSize is negative," + -" please check the join conditions.") - createEmptyInnerJoin(leftDataStream, rightDataStream, returnTypeInfo) -} else { - if (isRowTime) { -createRowTimeInnerJoin( - leftDataStream, - rightDataStream, - returnTypeInfo, - joinOpName, - joinFunction.name, - joinFunction.code, - leftKeys, - rightKeys -) - } else { -createProcTimeInnerJoin( - leftDataStream, - rightDataStream, - returnTypeInfo, - joinOpName, - joinFunction.name, - joinFunction.code, - leftKeys, - rightKeys -) - } -} - case JoinRelType.FULL => -throw new TableException( - "Full join between stream and stream is not supported yet.") - case JoinRelType.LEFT => -throw new TableException( - "Left join between stream and stream is not supported yet.") - case JoinRelType.RIGHT => -throw new TableException( - "Right join between stream and stream is not supported yet.") +val flinkJoinType = joinType match { + case JoinRelType.INNER => JoinType.INNER + case JoinRelType.FULL => JoinType.FULL_OUTER + case JoinRelType.LEFT => JoinType.LEFT_OUTER + case JoinRelType.RIGHT => JoinType.RIGHT_OUTER +} + +if (relativeWindowSize < 0) { + LOG.warn(s"The relative window size $relativeWindowSize is negative," + +" please check the join conditions.") + createEmptyJoin(leftDataStream, rightDataStream, returnTypeInfo) --- End diff -- Empty outer joins need to be handled differently than empty inner joins because the records of the outer side(s) must be preserved and padded with nulls. Hence, we need to pass the join type and the generated code. ---
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5140 [FLINK-7797] [table] Add support for windowed outer joins for streaming tables ## What is the purpose of the change This PR adds support for windowed outer joins for streaming tables. ## Brief change log - Adjusts the plan translation logic to accept stream window outer join. - Adheres an ever emitted flag to each row. When a row is removed from the cache (or detected as not cached), a null padding join result will be emitted if necessary. - Adds a custom `JoinAwareCollector` to track whether there's a successfully joined result for both sides in each join loop. - Adds table/SQL translation tests, and also join integration tests. Since the runtime logic is built on the existing window inner join, no new harness tests are added. - Updates the SQL/Table API docs. ## Verifying this change This PR can be verified by the cases added in `JoinTest` and `JoinITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (**yes**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (**yes**) - If yes, how is the feature documented? (**removes the restriction notes**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-7797 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5140.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5140 commit 34d3fde8049ec407849b61901acd8258a6a1f919 Author: Xingcan Cui Date: 2017-12-07T17:28:40Z [FLINK-7797] [table] Add support for windowed outer joins for streaming tables ---