[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5327 ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188532278 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala --- @@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode { */ def consumesRetractions: Boolean = false + /** +* Whether the [[DataStreamRel]] produces retraction messages. +*/ + def producesRetractions: Boolean = false --- End diff -- Thanks for the explanation. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188531785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -176,14 +179,34 @@ class DataStreamJoin( body, returnType) -val coMapFun = - new NonWindowInnerJoin( -leftSchema.typeInfo, -rightSchema.typeInfo, -CRowTypeInfo(returnType), -genFunction.name, -genFunction.code, -queryConfig) +val coMapFun = joinType match { + case JoinRelType.INNER => +new NonWindowInnerJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + queryConfig) + case JoinRelType.LEFT if joinInfo.isEqui => +new NonWindowLeftRightJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + joinType == JoinRelType.LEFT, + queryConfig) + case JoinRelType.LEFT => --- End diff -- We can also do it as part of FLINK-8429. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188531694 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -60,6 +60,9 @@ class DataStreamJoin( override def needsUpdatesAsRetraction: Boolean = true + // outer join will generate retractions + override def producesRetractions: Boolean = joinType != JoinRelType.INNER --- End diff -- Thanks, now I understand the terminology between producing and just forwarding retractions. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995939 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase { expected.add("D,R-8,null") StreamITCase.compareWithList(expected) } + + /** test non-window inner join **/ + @Test + def testNonWindowInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) +data1.+=((1, 9L, "Hi6")) +data1.+=((1, 8L, "Hi8")) +data1.+=((3, 8L, "Hi9")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) +data2.+=((3, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.b > t2.b +|""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList( + "1,HiHi,Hi2", + "1,HiHi,Hi2", + "1,HiHi,Hi3", + "1,HiHi,Hi6", + "1,HiHi,Hi8", + "2,HeHe,Hi5", + "null,HeHe,Hi9") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testJoinWithFilter(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testInnerJoinWithNonEquiJoinPredicate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b" + +val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hello world, how are
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -176,14 +179,34 @@ class DataStreamJoin( body, returnType) -val coMapFun = - new NonWindowInnerJoin( -leftSchema.typeInfo, -rightSchema.typeInfo, -CRowTypeInfo(returnType), -genFunction.name, -genFunction.code, -queryConfig) +val coMapFun = joinType match { + case JoinRelType.INNER => +new NonWindowInnerJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + queryConfig) + case JoinRelType.LEFT if joinInfo.isEqui => +new NonWindowLeftRightJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + joinType == JoinRelType.LEFT, + queryConfig) + case JoinRelType.LEFT => --- End diff -- I planed to add right join in FLINK-8429. It's ok to add right join in this pr if you prefer. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995596 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995668 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala --- @@ -302,8 +303,87 @@ class RetractionRulesTest extends TableTestBase { ) util.verifyTableTrait(resultTable, expected) } -} + @Test + def testInnerJoinWithoutAgg(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, Int)]('bb, 'c) + +val resultTable = lTable + .join(rTable) + .where('b === 'bb) + .select('a, 'b, 'c) + +val expected = + unaryNode( +"DataStreamCalc", +binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, Acc" +), +"false, Acc" + ) +util.verifyTableTrait(resultTable, expected) + } + + @Test + def testLeftJoin(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, String)]('bb, 'c) + +val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + +val expected = + unaryNode( +"DataStreamCalc", +binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, AccRetract" +), +"false, AccRetract" + ) +util.verifyTableTrait(resultTable, expected) + } + + @Test + def testAggFollowedWithLeftJoin(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, String)]('bb, 'c) + +val countDistinct = new CountDistinct +val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + .groupBy('a) + .select('a, countDistinct('c)) + +val expected = + unaryNode( +"DataStreamGroupAggregate", +unaryNode( + "DataStreamCalc", + binaryNode( +"DataStreamJoin", +"DataStreamScan(true, Acc)", --- End diff -- `testJoin()` has covered this case. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995557 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -230,8 +230,12 @@ abstract class StreamTableEnvironment( tableKeys match { case Some(keys) => upsertSink.setKeyFields(keys) case None if isAppendOnlyTable => upsertSink.setKeyFields(null) - case None if !isAppendOnlyTable => throw new TableException( -"UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") + case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() == null => --- End diff -- OK. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995438 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995228 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184994673 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184994503 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184994194 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ --- End diff -- OK. I create a base class for outer Join with non-equal predicates(`NonWindowOuterJoinWithNonEquiPredicates`). ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184993658 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -60,6 +60,9 @@ class DataStreamJoin( override def needsUpdatesAsRetraction: Boolean = true + // outer join will generate retractions + override def producesRetractions: Boolean = joinType != JoinRelType.INNER --- End diff -- Inner join doesn't produce retractions, left/right/full join does, for example, left join will retract the previous non-matched output when new matched row comes from the right side. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184993457 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala --- @@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode { */ def consumesRetractions: Boolean = false + /** +* Whether the [[DataStreamRel]] produces retraction messages. +*/ + def producesRetractions: Boolean = false --- End diff -- A join generates retraction if it's type is left/right/full. It is different from agg which generates retractions if `sendsUpdatesAsRetraction(node) && node.producesUpdates` is true. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182691350 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182441219 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ --- End diff -- Convert this to left/right variable as we did it at other locations as well? In any case this state belongs to `NonWindowLeftRightJoinWithNonEquiPredicates` and does not need to be initialized for `NonWindowLeftRightJoin`. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182660019 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182407417 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala --- @@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode { */ def consumesRetractions: Boolean = false + /** +* Whether the [[DataStreamRel]] produces retraction messages. +*/ + def producesRetractions: Boolean = false --- End diff -- Why isn't `producesUpdates` enough? ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182660370 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182488812 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase { expected.add("D,R-8,null") StreamITCase.compareWithList(expected) } + + /** test non-window inner join **/ + @Test + def testNonWindowInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) +data1.+=((1, 9L, "Hi6")) +data1.+=((1, 8L, "Hi8")) +data1.+=((3, 8L, "Hi9")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) +data2.+=((3, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.b > t2.b +|""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList( + "1,HiHi,Hi2", + "1,HiHi,Hi2", + "1,HiHi,Hi3", + "1,HiHi,Hi6", + "1,HiHi,Hi8", + "2,HeHe,Hi5", + "null,HeHe,Hi9") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testJoinWithFilter(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testInnerJoinWithNonEquiJoinPredicate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b" + +val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hello world, how are
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182470078 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182658427 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182692511 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala --- @@ -302,8 +303,87 @@ class RetractionRulesTest extends TableTestBase { ) util.verifyTableTrait(resultTable, expected) } -} + @Test + def testInnerJoinWithoutAgg(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, Int)]('bb, 'c) + +val resultTable = lTable + .join(rTable) + .where('b === 'bb) + .select('a, 'b, 'c) + +val expected = + unaryNode( +"DataStreamCalc", +binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, Acc" +), +"false, Acc" + ) +util.verifyTableTrait(resultTable, expected) + } + + @Test + def testLeftJoin(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, String)]('bb, 'c) + +val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + +val expected = + unaryNode( +"DataStreamCalc", +binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, AccRetract" +), +"false, AccRetract" + ) +util.verifyTableTrait(resultTable, expected) + } + + @Test + def testAggFollowedWithLeftJoin(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, String)]('bb, 'c) + +val countDistinct = new CountDistinct +val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + .groupBy('a) + .select('a, countDistinct('c)) + +val expected = + unaryNode( +"DataStreamGroupAggregate", +unaryNode( + "DataStreamCalc", + binaryNode( +"DataStreamJoin", +"DataStreamScan(true, Acc)", --- End diff -- Can you also add a test for a join that consumes from an aggregation? ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182476347 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182700177 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase { expected.add("D,R-8,null") StreamITCase.compareWithList(expected) } + + /** test non-window inner join **/ + @Test + def testNonWindowInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) +data1.+=((1, 9L, "Hi6")) +data1.+=((1, 8L, "Hi8")) +data1.+=((3, 8L, "Hi9")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) +data2.+=((3, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.b > t2.b +|""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList( + "1,HiHi,Hi2", + "1,HiHi,Hi2", + "1,HiHi,Hi3", + "1,HiHi,Hi6", + "1,HiHi,Hi8", + "2,HeHe,Hi5", + "null,HeHe,Hi9") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testJoinWithFilter(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testInnerJoinWithNonEquiJoinPredicate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b" + +val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hello world, how are
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182407785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -60,6 +60,9 @@ class DataStreamJoin( override def needsUpdatesAsRetraction: Boolean = true + // outer join will generate retractions + override def producesRetractions: Boolean = joinType != JoinRelType.INNER --- End diff -- Could you clarify this? A inner join is producing retractions. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182687448 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( --- End diff -- Explain return type. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182664353 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -230,8 +230,12 @@ abstract class StreamTableEnvironment( tableKeys match { case Some(keys) => upsertSink.setKeyFields(keys) case None if isAppendOnlyTable => upsertSink.setKeyFields(null) - case None if !isAppendOnlyTable => throw new TableException( -"UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") + case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() == null => --- End diff -- Can we move this change into a separate issue and PR? It is not related to outer joins and breaks existing table sinks for Java developers. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182492444 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182427888 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala --- @@ -54,238 +44,51 @@ class NonWindowInnerJoin( genJoinFuncName: String, genJoinFuncCode: String, queryConfig: StreamQueryConfig) - extends CoProcessFunction[CRow, CRow, CRow] - with Compiler[FlatJoinFunction[Row, Row, Row]] - with Logging { - - // check if input types implement proper equals/hashCode - validateEqualsHashCode("join", leftType) - validateEqualsHashCode("join", rightType) - - // state to hold left stream element - private var leftState: MapState[Row, JTuple2[Int, Long]] = _ - // state to hold right stream element - private var rightState: MapState[Row, JTuple2[Int, Long]] = _ - private var cRowWrapper: CRowWrappingMultiOutputCollector = _ - - private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime - private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime - private val stateCleaningEnabled: Boolean = minRetentionTime > 1 - - // state to record last timer of left stream, 0 means no timer - private var leftTimer: ValueState[Long] = _ - // state to record last timer of right stream, 0 means no timer - private var rightTimer: ValueState[Long] = _ - - // other condition function - private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { override def open(parameters: Configuration): Unit = { -LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + -s"Code:\n$genJoinFuncCode") -val clazz = compile( - getRuntimeContext.getUserCodeClassLoader, - genJoinFuncName, - genJoinFuncCode) -LOG.debug("Instantiating JoinFunction.") -joinFunction = clazz.newInstance() - -// initialize left and right state, the first element of tuple2 indicates how many rows of -// this row, while the second element represents the expired time of this row. -val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG) -val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]]( - "left", leftType, tupleTypeInfo) -val rightStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]]( - "right", rightType, tupleTypeInfo) -leftState = getRuntimeContext.getMapState(leftStateDescriptor) -rightState = getRuntimeContext.getMapState(rightStateDescriptor) - -// initialize timer state -val valueStateDescriptor1 = new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) -leftTimer = getRuntimeContext.getState(valueStateDescriptor1) -val valueStateDescriptor2 = new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long]) -rightTimer = getRuntimeContext.getState(valueStateDescriptor2) - -cRowWrapper = new CRowWrappingMultiOutputCollector() - } - - /** -* Process left stream records -* -* @param valueC The input value. -* @param ctxThe ctx to register timer or get current time -* @param outThe collector for returning result values. -* -*/ - override def processElement1( - valueC: CRow, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - -processElement(valueC, ctx, out, leftTimer, leftState, rightState, isLeft = true) - } - - /** -* Process right stream records -* -* @param valueC The input value. -* @param ctxThe ctx to register timer or get current time -* @param outThe collector for returning result values. -* -*/ - override def processElement2( - valueC: CRow, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - -processElement(valueC, ctx, out, rightTimer, rightState, leftState, isLeft = false) - } - - - /** -* Called when a processing timer trigger. -* Expire left/right records which are expired in left and right state. -* -* @param timestamp The timestamp of the firing timer. -* @param ctx The ctx to register timer or get current time -* @param out The collector for returning result values. -*/ - override def onTimer( - timestamp: Long, - ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, - out:
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182683951 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182431730 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for left or right join without + * non-equal predicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code without any non-equi condition + * @param genJoinFuncCode the function name without any non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftRightJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowOuterJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +isLeftJoin, +queryConfig) { + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +val joinType = if (isLeftJoin) "Left" else "Right" +LOG.debug(s"Instantiating NonWindow${joinType}OuterJoin") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The input row will be preserved and appended with null, if +* there is no match. Records will be expired in state if state retention time has been +* specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Long, Long]], + otherSideState: MapState[Row, JTuple2[Long, Long]], + recordFromLeft: Boolean): Unit = { + +val inputRow = value.row +val (curProcessTime, _) = updateCurrentSide(value, ctx, timerState, currentSideState) + +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) +cRowWrapper.setEmitCnt(0) --- End diff -- Remove this line. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182445400 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CRowWrappingMultiOutputCollector.scala --- @@ -16,35 +16,61 @@ * limitations under the License. */ -package org.apache.flink.table.runtime +package org.apache.flink.table.runtime.join import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row import org.apache.flink.util.Collector /** - * The collector to wrap a [[Row]] into a [[CRow]] and collect it multiple times. + * The collector to wrap a [[Row]] into a [[CRow]] and collect it multiple times. This collector + * can also used to count output record number and do lazy output. */ class CRowWrappingMultiOutputCollector() extends Collector[Row] { private var out: Collector[CRow] = _ - private val outCRow: CRow = new CRow() + private val outCRow: CRow = new CRow(null, true) + // times for collect private var times: Long = 0L + // count how many records have been emitted + private var emitCnt: Long = 0L + // don't collect to downstream if set lazyOutput to true + private var lazyOutput: Boolean = false def setCollector(collector: Collector[CRow]): Unit = this.out = collector def setChange(change: Boolean): Unit = this.outCRow.change = change + def setRow(row: Row): Unit = this.outCRow.row = row + + def getRow(): Row = this.outCRow.row + def setTimes(times: Long): Unit = this.times = times + def setEmitCnt(emitted: Long): Unit = this.emitCnt = emitted + + def getEmitCnt(): Long = emitCnt + + def setLazyOutput(lazyOutput: Boolean): Unit = this.lazyOutput = lazyOutput + override def collect(record: Row): Unit = { outCRow.row = record -var i: Long = 0L -while (i < times) { - out.collect(outCRow) - i += 1 +if (!lazyOutput) { + emitCnt += times + var i: Long = 0L + while (i < times) { +out.collect(outCRow) +i += 1 + } } } + def reset(): Unit = { +this.outCRow.change = true --- End diff -- Remove this line. The change must be set after every reset call anyway. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182695351 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -176,14 +179,34 @@ class DataStreamJoin( body, returnType) -val coMapFun = - new NonWindowInnerJoin( -leftSchema.typeInfo, -rightSchema.typeInfo, -CRowTypeInfo(returnType), -genFunction.name, -genFunction.code, -queryConfig) +val coMapFun = joinType match { + case JoinRelType.INNER => +new NonWindowInnerJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + queryConfig) + case JoinRelType.LEFT if joinInfo.isEqui => +new NonWindowLeftRightJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + joinType == JoinRelType.LEFT, + queryConfig) + case JoinRelType.LEFT => --- End diff -- Is there a reason why we don't support right outer joins here? ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182431840 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for left or right join without + * non-equal predicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code without any non-equi condition + * @param genJoinFuncCode the function name without any non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftRightJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowOuterJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +isLeftJoin, +queryConfig) { + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +val joinType = if (isLeftJoin) "Left" else "Right" +LOG.debug(s"Instantiating NonWindow${joinType}OuterJoin") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The input row will be preserved and appended with null, if +* there is no match. Records will be expired in state if state retention time has been +* specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Long, Long]], + otherSideState: MapState[Row, JTuple2[Long, Long]], + recordFromLeft: Boolean): Unit = { + +val inputRow = value.row +val (curProcessTime, _) = updateCurrentSide(value, ctx, timerState, currentSideState) --- End diff -- Same object creation issue as above. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r172408177 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream-stream non-window Join. + * + * @param leftType the input type of left stream + * @param rightType the input type of right stream + * @param resultTypethe output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { + + // check if input types implement proper equals/hashCode + validateEqualsHashCode("join", leftType) + validateEqualsHashCode("join", rightType) + + // state to hold left stream element + protected var leftState: MapState[Row, JTuple2[Int, Long]] = _ + // state to hold right stream element + protected var rightState: MapState[Row, JTuple2[Int, Long]] = _ + protected var cRowWrapper: CRowWrappingMultiOutputCollector = _ + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // state to record last timer of left stream, 0 means no timer + protected var leftTimer: ValueState[Long] = _ + // state to record last timer of right stream, 0 means no timer + protected var rightTimer: ValueState[Long] = _ + + // other condition function + protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + override def open(parameters: Configuration): Unit = { +LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + +s"Code:\n$genJoinFuncCode") +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) +LOG.debug("Instantiating JoinFunction.") +joinFunction = clazz.newInstance() + +// initialize left and right state, the first element of tuple2 indicates how many rows of +// this row, while the second element represents the expired time of this row. +val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG) +val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]]( + "left", leftType, tupleTypeInfo) +val
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r172408402 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin with NonEquiPredicates. + * An MapState of type [Row, Long] is added to record how many rows from the right table can be + * matched for each left row. Left join without NonEquiPredicates doesn't need it because + * left rows can always join right rows as long as join keys are same. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoinWithNonEquiPredicates( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all field from right will be null + private var resultRow: Row = _ + // how many matched rows from the right table for each left row + private var leftJoinCnt: MapState[Row, Long] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( --- End diff -- I think either is fine as long as they are consistent. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170936623 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream-stream non-window Join. + * + * @param leftType the input type of left stream + * @param rightType the input type of right stream + * @param resultTypethe output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { + + // check if input types implement proper equals/hashCode + validateEqualsHashCode("join", leftType) + validateEqualsHashCode("join", rightType) + + // state to hold left stream element + protected var leftState: MapState[Row, JTuple2[Int, Long]] = _ + // state to hold right stream element + protected var rightState: MapState[Row, JTuple2[Int, Long]] = _ + protected var cRowWrapper: CRowWrappingMultiOutputCollector = _ + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // state to record last timer of left stream, 0 means no timer + protected var leftTimer: ValueState[Long] = _ + // state to record last timer of right stream, 0 means no timer + protected var rightTimer: ValueState[Long] = _ + + // other condition function + protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + override def open(parameters: Configuration): Unit = { +LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + +s"Code:\n$genJoinFuncCode") +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) +LOG.debug("Instantiating JoinFunction.") +joinFunction = clazz.newInstance() + +// initialize left and right state, the first element of tuple2 indicates how many rows of +// this row, while the second element represents the expired time of this row. +val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG) +val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]]( + "left", leftType, tupleTypeInfo) +val
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170936833 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin with NonEquiPredicates. + * An MapState of type [Row, Long] is added to record how many rows from the right table can be + * matched for each left row. Left join without NonEquiPredicates doesn't need it because + * left rows can always join right rows as long as join keys are same. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoinWithNonEquiPredicates( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all field from right will be null + private var resultRow: Row = _ + // how many matched rows from the right table for each left row + private var leftJoinCnt: MapState[Row, Long] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +leftJoinCnt = getRuntimeContext.getMapState(leftJoinCntDescriptor) +resultRow = new Row(resultType.getArity) + +LOG.debug("Instantiating NonWindowLeftJoinWithNonEquiPredicates.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170936767 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin without NonEquiPredicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null + private var resultRow: Row = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +resultRow = new Row(resultType.getArity) +LOG.debug("Instantiating NonWindowLeftJoin.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if (stateCleaningEnabled && timerState.value() == 0) { + timerState.update(cntAndExpiredTime.f1) + ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) +} + +// update current side stream state +if (!value.change) { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1 + if (cntAndExpiredTime.f0 <= 0) { +currentSideState.remove(inputRow) + } else { +currentSideState.put(inputRow, cntAndExpiredTime) + } +} else { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1 + currentSideState.put(inputRow, cntAndExpiredTime) +} + +val otherSideIterator =
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170936524 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala --- @@ -149,45 +148,39 @@ object UpdatingPlanChecker { } case j: DataStreamJoin => - val joinType = j.getJoinType - joinType match { -case JoinRelType.INNER => - // get key(s) for inner join - val lInKeys = visit(j.getLeft) - val rInKeys = visit(j.getRight) - if (lInKeys.isEmpty || rInKeys.isEmpty) { -None - } else { -// Output of inner join must have keys if left and right both contain key(s). -// Key groups from both side will be merged by join equi-predicates -val lInNames: Seq[String] = j.getLeft.getRowType.getFieldNames -val rInNames: Seq[String] = j.getRight.getRowType.getFieldNames -val joinNames = j.getRowType.getFieldNames - -// if right field names equal to left field names, calcite will rename right -// field names. For example, T1(pk, a) join T2(pk, b), calcite will rename T2(pk, b) -// to T2(pk0, b). -val rInNamesToJoinNamesMap = rInNames - .zip(joinNames.subList(lInNames.size, joinNames.length)) - .toMap + // get key(s) for inner join + val lInKeys = visit(j.getLeft) + val rInKeys = visit(j.getRight) + if (lInKeys.isEmpty || rInKeys.isEmpty) { +None + } else { +// Output of inner join must have keys if left and right both contain key(s). --- End diff -- Yes, thank you ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170936813 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin with NonEquiPredicates. + * An MapState of type [Row, Long] is added to record how many rows from the right table can be + * matched for each left row. Left join without NonEquiPredicates doesn't need it because + * left rows can always join right rows as long as join keys are same. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoinWithNonEquiPredicates( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all field from right will be null + private var resultRow: Row = _ + // how many matched rows from the right table for each left row + private var leftJoinCnt: MapState[Row, Long] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( --- End diff -- Long is more safe. I will change all count type to Long. What do you think? ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170936660 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin without NonEquiPredicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null + private var resultRow: Row = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +resultRow = new Row(resultType.getArity) +LOG.debug("Instantiating NonWindowLeftJoin.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if (stateCleaningEnabled && timerState.value() == 0) { + timerState.update(cntAndExpiredTime.f1) + ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) +} + +// update current side stream state +if (!value.change) { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1 + if (cntAndExpiredTime.f0 <= 0) { +currentSideState.remove(inputRow) + } else { +currentSideState.put(inputRow, cntAndExpiredTime) + } +} else { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1 + currentSideState.put(inputRow, cntAndExpiredTime) +} + +val otherSideIterator =
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170610689 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala --- @@ -201,18 +202,294 @@ class JoinITCase extends StreamingWithStateTestBase { // Proctime window output uncertain results, so assert has been ignored here. } + @Test + def testJoin(): Unit = { --- End diff -- Can this be more specific? like, inner equality join ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170296184 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala --- @@ -149,45 +148,39 @@ object UpdatingPlanChecker { } case j: DataStreamJoin => - val joinType = j.getJoinType - joinType match { -case JoinRelType.INNER => - // get key(s) for inner join - val lInKeys = visit(j.getLeft) - val rInKeys = visit(j.getRight) - if (lInKeys.isEmpty || rInKeys.isEmpty) { -None - } else { -// Output of inner join must have keys if left and right both contain key(s). -// Key groups from both side will be merged by join equi-predicates -val lInNames: Seq[String] = j.getLeft.getRowType.getFieldNames -val rInNames: Seq[String] = j.getRight.getRowType.getFieldNames -val joinNames = j.getRowType.getFieldNames - -// if right field names equal to left field names, calcite will rename right -// field names. For example, T1(pk, a) join T2(pk, b), calcite will rename T2(pk, b) -// to T2(pk0, b). -val rInNamesToJoinNamesMap = rInNames - .zip(joinNames.subList(lInNames.size, joinNames.length)) - .toMap + // get key(s) for inner join + val lInKeys = visit(j.getLeft) + val rInKeys = visit(j.getRight) + if (lInKeys.isEmpty || rInKeys.isEmpty) { +None + } else { +// Output of inner join must have keys if left and right both contain key(s). --- End diff -- remove "inner"? ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170613748 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin without NonEquiPredicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null + private var resultRow: Row = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +resultRow = new Row(resultType.getArity) +LOG.debug("Instantiating NonWindowLeftJoin.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if (stateCleaningEnabled && timerState.value() == 0) { + timerState.update(cntAndExpiredTime.f1) + ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) +} + +// update current side stream state +if (!value.change) { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1 + if (cntAndExpiredTime.f0 <= 0) { +currentSideState.remove(inputRow) + } else { +currentSideState.put(inputRow, cntAndExpiredTime) + } +} else { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1 + currentSideState.put(inputRow, cntAndExpiredTime) +} + +val otherSideIterator =
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170610792 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase { expected.add("D,R-8,null") StreamITCase.compareWithList(expected) } + + /** test non-window inner join **/ + @Test + def testNonWindowInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) +data1.+=((1, 9L, "Hi6")) +data1.+=((1, 8L, "Hi8")) +data1.+=((3, 8L, "Hi9")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) +data2.+=((3, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.b > t2.b +|""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList( + "1,HiHi,Hi2", + "1,HiHi,Hi2", + "1,HiHi,Hi3", + "1,HiHi,Hi6", + "1,HiHi,Hi8", + "2,HeHe,Hi5", + "null,HeHe,Hi9") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testJoin(): Unit = { --- End diff -- Can this be more specific? like, inner equality join ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170619950 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala --- @@ -985,4 +1017,232 @@ class JoinHarnessTest extends HarnessTestBase { testHarness.close() } + + @Test + def testNonWindowLeftJoinWithOutNonEqualPred() { --- End diff -- `WithOut` ==> `Without` ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170617462 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin with NonEquiPredicates. + * An MapState of type [Row, Long] is added to record how many rows from the right table can be + * matched for each left row. Left join without NonEquiPredicates doesn't need it because + * left rows can always join right rows as long as join keys are same. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoinWithNonEquiPredicates( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all field from right will be null + private var resultRow: Row = _ + // how many matched rows from the right table for each left row + private var leftJoinCnt: MapState[Row, Long] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +leftJoinCnt = getRuntimeContext.getMapState(leftJoinCntDescriptor) +resultRow = new Row(resultType.getArity) + +LOG.debug("Instantiating NonWindowLeftJoinWithNonEquiPredicates.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170618769 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin with NonEquiPredicates. + * An MapState of type [Row, Long] is added to record how many rows from the right table can be + * matched for each left row. Left join without NonEquiPredicates doesn't need it because + * left rows can always join right rows as long as join keys are same. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoinWithNonEquiPredicates( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all field from right will be null + private var resultRow: Row = _ + // how many matched rows from the right table for each left row + private var leftJoinCnt: MapState[Row, Long] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( --- End diff -- probably use `[Row, Int]`, to match with the type for count in `LeftSideState` and `RightSideState`? ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170601596 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin without NonEquiPredicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null + private var resultRow: Row = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +resultRow = new Row(resultType.getArity) +LOG.debug("Instantiating NonWindowLeftJoin.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if (stateCleaningEnabled && timerState.value() == 0) { + timerState.update(cntAndExpiredTime.f1) + ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) +} + +// update current side stream state +if (!value.change) { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1 + if (cntAndExpiredTime.f0 <= 0) { +currentSideState.remove(inputRow) + } else { +currentSideState.put(inputRow, cntAndExpiredTime) + } +} else { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1 + currentSideState.put(inputRow, cntAndExpiredTime) +} + +val otherSideIterator =
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170613883 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin without NonEquiPredicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null + private var resultRow: Row = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +resultRow = new Row(resultType.getArity) +LOG.debug("Instantiating NonWindowLeftJoin.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if (stateCleaningEnabled && timerState.value() == 0) { + timerState.update(cntAndExpiredTime.f1) + ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) +} + +// update current side stream state +if (!value.change) { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1 + if (cntAndExpiredTime.f0 <= 0) { +currentSideState.remove(inputRow) + } else { +currentSideState.put(inputRow, cntAndExpiredTime) + } +} else { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1 + currentSideState.put(inputRow, cntAndExpiredTime) +} + +val otherSideIterator =
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170298988 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream-stream non-window Join. + * + * @param leftType the input type of left stream + * @param rightType the input type of right stream + * @param resultTypethe output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { + + // check if input types implement proper equals/hashCode + validateEqualsHashCode("join", leftType) + validateEqualsHashCode("join", rightType) + + // state to hold left stream element + protected var leftState: MapState[Row, JTuple2[Int, Long]] = _ + // state to hold right stream element + protected var rightState: MapState[Row, JTuple2[Int, Long]] = _ + protected var cRowWrapper: CRowWrappingMultiOutputCollector = _ + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // state to record last timer of left stream, 0 means no timer + protected var leftTimer: ValueState[Long] = _ + // state to record last timer of right stream, 0 means no timer + protected var rightTimer: ValueState[Long] = _ + + // other condition function + protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + override def open(parameters: Configuration): Unit = { +LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + +s"Code:\n$genJoinFuncCode") +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) +LOG.debug("Instantiating JoinFunction.") +joinFunction = clazz.newInstance() + +// initialize left and right state, the first element of tuple2 indicates how many rows of +// this row, while the second element represents the expired time of this row. +val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, Types.LONG) +val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, Long]]( + "left", leftType, tupleTypeInfo) +val
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r170608444 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin without NonEquiPredicates. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null + private var resultRow: Row = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +resultRow = new Row(resultType.getArity) +LOG.debug("Instantiating NonWindowLeftJoin.") + } + + /** +* Puts or Retract an element from the input stream into state and search the other state to +* output records meet the condition. The result is NULL from the right side, if there is no +* match. Records will be expired in state if state retention time has been specified. +*/ + override def processElement( + value: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + timerState: ValueState[Long], + currentSideState: MapState[Row, JTuple2[Int, Long]], + otherSideState: MapState[Row, JTuple2[Int, Long]], + isLeft: Boolean): Unit = { + +val inputRow = value.row +cRowWrapper.reset() +cRowWrapper.setCollector(out) +cRowWrapper.setChange(value.change) + +val curProcessTime = ctx.timerService.currentProcessingTime +val oldCntAndExpiredTime = currentSideState.get(inputRow) +val cntAndExpiredTime = if (null == oldCntAndExpiredTime) { + JTuple2.of(0, -1L) +} else { + oldCntAndExpiredTime +} + +cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) +if (stateCleaningEnabled && timerState.value() == 0) { + timerState.update(cntAndExpiredTime.f1) + ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) +} + +// update current side stream state +if (!value.change) { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1 + if (cntAndExpiredTime.f0 <= 0) { +currentSideState.remove(inputRow) + } else { +currentSideState.put(inputRow, cntAndExpiredTime) + } +} else { + cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1 + currentSideState.put(inputRow, cntAndExpiredTime) +} + +val otherSideIterator =
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
GitHub user hequn8128 opened a pull request: https://github.com/apache/flink/pull/5327 [FLINK-8428] [table] Implement stream-stream non-window left outer join ## What is the purpose of the change Implement stream-stream non-window left outer join for sql/table-api. A simple design doc can be found [here](https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing) ## Brief change log - Add left join - with non-equal predicates - without non-equal predicates - Adapt retraction rules to left join. Outer join will generate retractions - Adapt `UpsertTableSink`. Table mode of dynamic table produced by left join is Update Mode, even if the table does not include a key definition - Add inner join test cases which consistent with test cases in batch. - Add left join test cases which consistent with test cases in batch. ## Verifying this change This change added tests and can be verified as follows: - Added integration tests for left join with or without non-equal predicates. - Added HarnessTests left join with or without non-equal predicates. - Add tests for AccMode generate by left join. - Add tests for UpsertSink followed left join. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (already docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/hequn8128/flink leftjoin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5327.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5327 commit 2766de56d47e1a82f6605eb1dd80d8ea5e697a29 Author: hequn8128Date: 2018-01-21T04:54:08Z [FLINK-8428] [table] Implement stream-stream non-window left outer join ---