[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-05-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5327


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-05-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r188532278
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
 ---
@@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode {
 */
   def consumesRetractions: Boolean = false
 
+  /**
+* Whether the [[DataStreamRel]] produces retraction messages.
+*/
+  def producesRetractions: Boolean = false
--- End diff --

Thanks for the explanation.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-05-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r188531785
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -176,14 +179,34 @@ class DataStreamJoin(
   body,
   returnType)
 
-val coMapFun =
-  new NonWindowInnerJoin(
-leftSchema.typeInfo,
-rightSchema.typeInfo,
-CRowTypeInfo(returnType),
-genFunction.name,
-genFunction.code,
-queryConfig)
+val coMapFun = joinType match {
+  case JoinRelType.INNER =>
+new NonWindowInnerJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  queryConfig)
+  case JoinRelType.LEFT if joinInfo.isEqui =>
+new NonWindowLeftRightJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  joinType == JoinRelType.LEFT,
+  queryConfig)
+  case JoinRelType.LEFT =>
--- End diff --

We can also do it as part of FLINK-8429.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-05-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r188531694
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -60,6 +60,9 @@ class DataStreamJoin(
 
   override def needsUpdatesAsRetraction: Boolean = true
 
+  // outer join will generate retractions
+  override def producesRetractions: Boolean = joinType != JoinRelType.INNER
--- End diff --

Thanks, now I understand the terminology between producing and just 
forwarding retractions.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995939
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
@@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase {
 expected.add("D,R-8,null")
 StreamITCase.compareWithList(expected)
   }
+
+  /** test non-window inner join **/
+  @Test
+  def testNonWindowInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+data1.+=((1, 9L, "Hi6"))
+data1.+=((1, 8L, "Hi8"))
+data1.+=((3, 8L, "Hi9"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+data2.+=((3, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.b > t2.b
+|""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList(
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi3",
+  "1,HiHi,Hi6",
+  "1,HiHi,Hi8",
+  "2,HeHe,Hi5",
+  "null,HeHe,Hi9")
+
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
+
+val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo 
Welt")
+val results = result.toRetractStream[Row]
+results.addSink(new StreamITCase.RetractingSink)
+env.execute()
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testJoinWithFilter(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
+
+val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hi,Hallo")
+val results = result.toRetractStream[Row]
+results.addSink(new StreamITCase.RetractingSink)
+env.execute()
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 
AND h < b"
+
+val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hello world, how are 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -176,14 +179,34 @@ class DataStreamJoin(
   body,
   returnType)
 
-val coMapFun =
-  new NonWindowInnerJoin(
-leftSchema.typeInfo,
-rightSchema.typeInfo,
-CRowTypeInfo(returnType),
-genFunction.name,
-genFunction.code,
-queryConfig)
+val coMapFun = joinType match {
+  case JoinRelType.INNER =>
+new NonWindowInnerJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  queryConfig)
+  case JoinRelType.LEFT if joinInfo.isEqui =>
+new NonWindowLeftRightJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  joinType == JoinRelType.LEFT,
+  queryConfig)
+  case JoinRelType.LEFT =>
--- End diff --

I planed to add right join in FLINK-8429. It's ok to add right join in this 
pr if you prefer.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995668
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
 ---
@@ -302,8 +303,87 @@ class RetractionRulesTest extends TableTestBase {
   )
 util.verifyTableTrait(resultTable, expected)
   }
-}
 
+  @Test
+  def testInnerJoinWithoutAgg(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, Int)]('bb, 'c)
+
+val resultTable = lTable
+  .join(rTable)
+  .where('b === 'bb)
+  .select('a, 'b, 'c)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+binaryNode(
+  "DataStreamJoin",
+  "DataStreamScan(true, Acc)",
+  "DataStreamScan(true, Acc)",
+  "false, Acc"
+),
+"false, Acc"
+  )
+util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testLeftJoin(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+val resultTable = lTable
+  .leftOuterJoin(rTable, 'b === 'bb)
+  .select('a, 'b, 'c)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+binaryNode(
+  "DataStreamJoin",
+  "DataStreamScan(true, Acc)",
+  "DataStreamScan(true, Acc)",
+  "false, AccRetract"
+),
+"false, AccRetract"
+  )
+util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testAggFollowedWithLeftJoin(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+val countDistinct = new CountDistinct
+val resultTable = lTable
+  .leftOuterJoin(rTable, 'b === 'bb)
+  .select('a, 'b, 'c)
+  .groupBy('a)
+  .select('a, countDistinct('c))
+
+val expected =
+  unaryNode(
+"DataStreamGroupAggregate",
+unaryNode(
+  "DataStreamCalc",
+  binaryNode(
+"DataStreamJoin",
+"DataStreamScan(true, Acc)",
--- End diff --

`testJoin()` has covered this case.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995557
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -230,8 +230,12 @@ abstract class StreamTableEnvironment(
 tableKeys match {
   case Some(keys) => upsertSink.setKeyFields(keys)
   case None if isAppendOnlyTable => upsertSink.setKeyFields(null)
-  case None if !isAppendOnlyTable => throw new TableException(
-"UpsertStreamTableSink requires that Table has a full primary 
keys if it is updated.")
+  case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() 
== null =>
--- End diff --

OK.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995438
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995228
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184994673
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184994503
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184994194
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
--- End diff --

OK. I create a base class for outer Join with non-equal 
predicates(`NonWindowOuterJoinWithNonEquiPredicates`).


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184993658
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -60,6 +60,9 @@ class DataStreamJoin(
 
   override def needsUpdatesAsRetraction: Boolean = true
 
+  // outer join will generate retractions
+  override def producesRetractions: Boolean = joinType != JoinRelType.INNER
--- End diff --

Inner join doesn't produce retractions, left/right/full join does, for 
example, left join will retract the previous non-matched output when new 
matched row comes from the right side.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184993457
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
 ---
@@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode {
 */
   def consumesRetractions: Boolean = false
 
+  /**
+* Whether the [[DataStreamRel]] produces retraction messages.
+*/
+  def producesRetractions: Boolean = false
--- End diff --

A join generates retraction if it's type is left/right/full. It is 
different from agg which generates retractions if 
`sendsUpdatesAsRetraction(node) && node.producesUpdates` is true.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182691350
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182441219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
--- End diff --

Convert this to left/right variable as we did it at other locations as 
well? In any case this state belongs to 
`NonWindowLeftRightJoinWithNonEquiPredicates` and does not need to be 
initialized for `NonWindowLeftRightJoin`.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182660019
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182407417
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
 ---
@@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode {
 */
   def consumesRetractions: Boolean = false
 
+  /**
+* Whether the [[DataStreamRel]] produces retraction messages.
+*/
+  def producesRetractions: Boolean = false
--- End diff --

Why isn't `producesUpdates` enough?


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182660370
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182488812
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
@@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase {
 expected.add("D,R-8,null")
 StreamITCase.compareWithList(expected)
   }
+
+  /** test non-window inner join **/
+  @Test
+  def testNonWindowInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+data1.+=((1, 9L, "Hi6"))
+data1.+=((1, 8L, "Hi8"))
+data1.+=((3, 8L, "Hi9"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+data2.+=((3, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.b > t2.b
+|""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList(
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi3",
+  "1,HiHi,Hi6",
+  "1,HiHi,Hi8",
+  "2,HeHe,Hi5",
+  "null,HeHe,Hi9")
+
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
+
+val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo 
Welt")
+val results = result.toRetractStream[Row]
+results.addSink(new StreamITCase.RetractingSink)
+env.execute()
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testJoinWithFilter(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
+
+val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hi,Hallo")
+val results = result.toRetractStream[Row]
+results.addSink(new StreamITCase.RetractingSink)
+env.execute()
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 
AND h < b"
+
+val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hello world, how are 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182470078
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182658427
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182692511
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
 ---
@@ -302,8 +303,87 @@ class RetractionRulesTest extends TableTestBase {
   )
 util.verifyTableTrait(resultTable, expected)
   }
-}
 
+  @Test
+  def testInnerJoinWithoutAgg(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, Int)]('bb, 'c)
+
+val resultTable = lTable
+  .join(rTable)
+  .where('b === 'bb)
+  .select('a, 'b, 'c)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+binaryNode(
+  "DataStreamJoin",
+  "DataStreamScan(true, Acc)",
+  "DataStreamScan(true, Acc)",
+  "false, Acc"
+),
+"false, Acc"
+  )
+util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testLeftJoin(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+val resultTable = lTable
+  .leftOuterJoin(rTable, 'b === 'bb)
+  .select('a, 'b, 'c)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+binaryNode(
+  "DataStreamJoin",
+  "DataStreamScan(true, Acc)",
+  "DataStreamScan(true, Acc)",
+  "false, AccRetract"
+),
+"false, AccRetract"
+  )
+util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testAggFollowedWithLeftJoin(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+val countDistinct = new CountDistinct
+val resultTable = lTable
+  .leftOuterJoin(rTable, 'b === 'bb)
+  .select('a, 'b, 'c)
+  .groupBy('a)
+  .select('a, countDistinct('c))
+
+val expected =
+  unaryNode(
+"DataStreamGroupAggregate",
+unaryNode(
+  "DataStreamCalc",
+  binaryNode(
+"DataStreamJoin",
+"DataStreamScan(true, Acc)",
--- End diff --

Can you also add a test for a join that consumes from an aggregation?


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182476347
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182700177
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
@@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase {
 expected.add("D,R-8,null")
 StreamITCase.compareWithList(expected)
   }
+
+  /** test non-window inner join **/
+  @Test
+  def testNonWindowInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+data1.+=((1, 9L, "Hi6"))
+data1.+=((1, 8L, "Hi8"))
+data1.+=((3, 8L, "Hi9"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+data2.+=((3, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.b > t2.b
+|""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList(
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi3",
+  "1,HiHi,Hi6",
+  "1,HiHi,Hi8",
+  "2,HeHe,Hi5",
+  "null,HeHe,Hi9")
+
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
+
+val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo 
Welt")
+val results = result.toRetractStream[Row]
+results.addSink(new StreamITCase.RetractingSink)
+env.execute()
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testJoinWithFilter(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
+
+val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hi,Hallo")
+val results = result.toRetractStream[Row]
+results.addSink(new StreamITCase.RetractingSink)
+env.execute()
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 
AND h < b"
+
+val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hello world, how are 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182407785
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -60,6 +60,9 @@ class DataStreamJoin(
 
   override def needsUpdatesAsRetraction: Boolean = true
 
+  // outer join will generate retractions
+  override def producesRetractions: Boolean = joinType != JoinRelType.INNER
--- End diff --

Could you clarify this? A inner join is producing retractions.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182687448
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
--- End diff --

Explain return type.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182664353
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -230,8 +230,12 @@ abstract class StreamTableEnvironment(
 tableKeys match {
   case Some(keys) => upsertSink.setKeyFields(keys)
   case None if isAppendOnlyTable => upsertSink.setKeyFields(null)
-  case None if !isAppendOnlyTable => throw new TableException(
-"UpsertStreamTableSink requires that Table has a full primary 
keys if it is updated.")
+  case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() 
== null =>
--- End diff --

Can we move this change into a separate issue and PR? It is not related to 
outer joins and breaks existing table sinks for Java developers. 


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182492444
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182427888
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
 ---
@@ -54,238 +44,51 @@ class NonWindowInnerJoin(
 genJoinFuncName: String,
 genJoinFuncCode: String,
 queryConfig: StreamQueryConfig)
-  extends CoProcessFunction[CRow, CRow, CRow]
-  with Compiler[FlatJoinFunction[Row, Row, Row]]
-  with Logging {
-
-  // check if input types implement proper equals/hashCode
-  validateEqualsHashCode("join", leftType)
-  validateEqualsHashCode("join", rightType)
-
-  // state to hold left stream element
-  private var leftState: MapState[Row, JTuple2[Int, Long]] = _
-  // state to hold right stream element
-  private var rightState: MapState[Row, JTuple2[Int, Long]] = _
-  private var cRowWrapper: CRowWrappingMultiOutputCollector = _
-
-  private val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
-  private val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
-  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
-
-  // state to record last timer of left stream, 0 means no timer
-  private var leftTimer: ValueState[Long] = _
-  // state to record last timer of right stream, 0 means no timer
-  private var rightTimer: ValueState[Long] = _
-
-  // other condition function
-  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
 
   override def open(parameters: Configuration): Unit = {
-LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
-s"Code:\n$genJoinFuncCode")
-val clazz = compile(
-  getRuntimeContext.getUserCodeClassLoader,
-  genJoinFuncName,
-  genJoinFuncCode)
-LOG.debug("Instantiating JoinFunction.")
-joinFunction = clazz.newInstance()
-
-// initialize left and right state, the first element of tuple2 
indicates how many rows of
-// this row, while the second element represents the expired time of 
this row.
-val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
-val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
-  "left", leftType, tupleTypeInfo)
-val rightStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
-  "right", rightType, tupleTypeInfo)
-leftState = getRuntimeContext.getMapState(leftStateDescriptor)
-rightState = getRuntimeContext.getMapState(rightStateDescriptor)
-
-// initialize timer state
-val valueStateDescriptor1 = new 
ValueStateDescriptor[Long]("timervaluestate1", classOf[Long])
-leftTimer = getRuntimeContext.getState(valueStateDescriptor1)
-val valueStateDescriptor2 = new 
ValueStateDescriptor[Long]("timervaluestate2", classOf[Long])
-rightTimer = getRuntimeContext.getState(valueStateDescriptor2)
-
-cRowWrapper = new CRowWrappingMultiOutputCollector()
-  }
-
-  /**
-* Process left stream records
-*
-* @param valueC The input value.
-* @param ctxThe ctx to register timer or get current time
-* @param outThe collector for returning result values.
-*
-*/
-  override def processElement1(
-  valueC: CRow,
-  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-  out: Collector[CRow]): Unit = {
-
-processElement(valueC, ctx, out, leftTimer, leftState, rightState, 
isLeft = true)
-  }
-
-  /**
-* Process right stream records
-*
-* @param valueC The input value.
-* @param ctxThe ctx to register timer or get current time
-* @param outThe collector for returning result values.
-*
-*/
-  override def processElement2(
-  valueC: CRow,
-  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-  out: Collector[CRow]): Unit = {
-
-processElement(valueC, ctx, out, rightTimer, rightState, leftState, 
isLeft = false)
-  }
-
-
-  /**
-* Called when a processing timer trigger.
-* Expire left/right records which are expired in left and right state.
-*
-* @param timestamp The timestamp of the firing timer.
-* @param ctx   The ctx to register timer or get current time
-* @param out   The collector for returning result values.
-*/
-  override def onTimer(
-  timestamp: Long,
-  ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
-  out: 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182683951
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182431730
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for left or 
right join without
+  * non-equal predicates.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code without any non-equi condition
+  * @param genJoinFuncCode the function name without any non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftRightJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowOuterJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+isLeftJoin,
+queryConfig) {
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+val joinType = if (isLeftJoin) "Left" else "Right"
+LOG.debug(s"Instantiating NonWindow${joinType}OuterJoin")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The input row will be preserved 
and appended with null, if
+* there is no match. Records will be expired in state if state 
retention time has been
+* specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Long, Long]],
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  recordFromLeft: Boolean): Unit = {
+
+val inputRow = value.row
+val (curProcessTime, _) = updateCurrentSide(value, ctx, timerState, 
currentSideState)
+
+cRowWrapper.reset()
+cRowWrapper.setCollector(out)
+cRowWrapper.setChange(value.change)
+cRowWrapper.setEmitCnt(0)
--- End diff --

Remove this line.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182445400
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/CRowWrappingMultiOutputCollector.scala
 ---
@@ -16,35 +16,61 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime
+package org.apache.flink.table.runtime.join
 
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 
 /**
-  * The collector to wrap a [[Row]] into a [[CRow]] and collect it 
multiple times.
+  * The collector to wrap a [[Row]] into a [[CRow]] and collect it 
multiple times. This collector
+  * can also used to count output record number and do lazy output.
   */
 class CRowWrappingMultiOutputCollector() extends Collector[Row] {
 
   private var out: Collector[CRow] = _
-  private val outCRow: CRow = new CRow()
+  private val outCRow: CRow = new CRow(null, true)
+  // times for collect
   private var times: Long = 0L
+  // count how many records have been emitted
+  private var emitCnt: Long = 0L
+  // don't collect to downstream if set lazyOutput to true
+  private var lazyOutput: Boolean = false
 
   def setCollector(collector: Collector[CRow]): Unit = this.out = collector
 
   def setChange(change: Boolean): Unit = this.outCRow.change = change
 
+  def setRow(row: Row): Unit = this.outCRow.row = row
+
+  def getRow(): Row = this.outCRow.row
+
   def setTimes(times: Long): Unit = this.times = times
 
+  def setEmitCnt(emitted: Long): Unit = this.emitCnt = emitted
+
+  def getEmitCnt(): Long = emitCnt
+
+  def setLazyOutput(lazyOutput: Boolean): Unit = this.lazyOutput = 
lazyOutput
+
   override def collect(record: Row): Unit = {
 outCRow.row = record
-var i: Long = 0L
-while (i < times) {
-  out.collect(outCRow)
-  i += 1
+if (!lazyOutput) {
+  emitCnt += times
+  var i: Long = 0L
+  while (i < times) {
+out.collect(outCRow)
+i += 1
+  }
 }
   }
 
+  def reset(): Unit = {
+this.outCRow.change = true
--- End diff --

Remove this line. The change must be set after every reset call anyway.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182695351
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -176,14 +179,34 @@ class DataStreamJoin(
   body,
   returnType)
 
-val coMapFun =
-  new NonWindowInnerJoin(
-leftSchema.typeInfo,
-rightSchema.typeInfo,
-CRowTypeInfo(returnType),
-genFunction.name,
-genFunction.code,
-queryConfig)
+val coMapFun = joinType match {
+  case JoinRelType.INNER =>
+new NonWindowInnerJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  queryConfig)
+  case JoinRelType.LEFT if joinInfo.isEqui =>
+new NonWindowLeftRightJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  joinType == JoinRelType.LEFT,
+  queryConfig)
+  case JoinRelType.LEFT =>
--- End diff --

Is there a reason why we don't support right outer joins here?


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-19 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r182431840
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for left or 
right join without
+  * non-equal predicates.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code without any non-equi condition
+  * @param genJoinFuncCode the function name without any non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftRightJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowOuterJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+isLeftJoin,
+queryConfig) {
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+val joinType = if (isLeftJoin) "Left" else "Right"
+LOG.debug(s"Instantiating NonWindow${joinType}OuterJoin")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The input row will be preserved 
and appended with null, if
+* there is no match. Records will be expired in state if state 
retention time has been
+* specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Long, Long]],
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  recordFromLeft: Boolean): Unit = {
+
+val inputRow = value.row
+val (curProcessTime, _) = updateCurrentSide(value, ctx, timerState, 
currentSideState)
--- End diff --

Same object creation issue as above.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-03-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r172408177
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, 
ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for 
stream-stream non-window Join.
+  *
+  * @param leftType  the input type of left stream
+  * @param rightType the input type of right stream
+  * @param resultTypethe output type of join
+  * @param genJoinFuncName   the function code of other non-equi condition
+  * @param genJoinFuncCode   the function name of other non-equi condition
+  * @param queryConfig   the configuration for the query to generate
+  */
+abstract class NonWindowJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  // check if input types implement proper equals/hashCode
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  // state to hold left stream element
+  protected var leftState: MapState[Row, JTuple2[Int, Long]] = _
+  // state to hold right stream element
+  protected var rightState: MapState[Row, JTuple2[Int, Long]] = _
+  protected var cRowWrapper: CRowWrappingMultiOutputCollector = _
+
+  protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // state to record last timer of left stream, 0 means no timer
+  protected var leftTimer: ValueState[Long] = _
+  // state to record last timer of right stream, 0 means no timer
+  protected var rightTimer: ValueState[Long] = _
+
+  // other condition function
+  protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+// initialize left and right state, the first element of tuple2 
indicates how many rows of
+// this row, while the second element represents the expired time of 
this row.
+val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
+val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
+  "left", leftType, tupleTypeInfo)
+val 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-03-05 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r172408402
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
with NonEquiPredicates.
+  * An MapState of type [Row, Long] is added to record how many rows from 
the right table can be
+  * matched for each left row. Left join without NonEquiPredicates doesn't 
need it because
+  * left rows can always join right rows as long as join keys are same.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoinWithNonEquiPredicates(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all field from right will be null
+  private var resultRow: Row = _
+  // how many matched rows from the right table for each left row
+  private var leftJoinCnt: MapState[Row, Long] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long](
--- End diff --

I think either is fine as long as they are consistent.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170936623
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, 
ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for 
stream-stream non-window Join.
+  *
+  * @param leftType  the input type of left stream
+  * @param rightType the input type of right stream
+  * @param resultTypethe output type of join
+  * @param genJoinFuncName   the function code of other non-equi condition
+  * @param genJoinFuncCode   the function name of other non-equi condition
+  * @param queryConfig   the configuration for the query to generate
+  */
+abstract class NonWindowJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  // check if input types implement proper equals/hashCode
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  // state to hold left stream element
+  protected var leftState: MapState[Row, JTuple2[Int, Long]] = _
+  // state to hold right stream element
+  protected var rightState: MapState[Row, JTuple2[Int, Long]] = _
+  protected var cRowWrapper: CRowWrappingMultiOutputCollector = _
+
+  protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // state to record last timer of left stream, 0 means no timer
+  protected var leftTimer: ValueState[Long] = _
+  // state to record last timer of right stream, 0 means no timer
+  protected var rightTimer: ValueState[Long] = _
+
+  // other condition function
+  protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+// initialize left and right state, the first element of tuple2 
indicates how many rows of
+// this row, while the second element represents the expired time of 
this row.
+val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
+val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
+  "left", leftType, tupleTypeInfo)
+val 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170936833
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
with NonEquiPredicates.
+  * An MapState of type [Row, Long] is added to record how many rows from 
the right table can be
+  * matched for each left row. Left join without NonEquiPredicates doesn't 
need it because
+  * left rows can always join right rows as long as join keys are same.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoinWithNonEquiPredicates(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all field from right will be null
+  private var resultRow: Row = _
+  // how many matched rows from the right table for each left row
+  private var leftJoinCnt: MapState[Row, Long] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+leftJoinCnt = getRuntimeContext.getMapState(leftJoinCntDescriptor)
+resultRow = new Row(resultType.getArity)
+
+LOG.debug("Instantiating NonWindowLeftJoinWithNonEquiPredicates.")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The result is NULL from the right 
side, if there is no
+* match. Records will be expired in state if state retention time has 
been specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Int, Long]],
+  otherSideState: MapState[Row, JTuple2[Int, Long]],
+  isLeft: Boolean): Unit = {
+
+val inputRow = value.row
+cRowWrapper.reset()
+cRowWrapper.setCollector(out)
+cRowWrapper.setChange(value.change)
+
+val curProcessTime = ctx.timerService.currentProcessingTime
+val oldCntAndExpiredTime = currentSideState.get(inputRow)
+val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
+  JTuple2.of(0, -1L)
+} else {
+  oldCntAndExpiredTime
+}
+
+cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, 
cntAndExpiredTime.f1)
+if 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170936767
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
without NonEquiPredicates.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null
+  private var resultRow: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+resultRow = new Row(resultType.getArity)
+LOG.debug("Instantiating NonWindowLeftJoin.")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The result is NULL from the right 
side, if there is no
+* match. Records will be expired in state if state retention time has 
been specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Int, Long]],
+  otherSideState: MapState[Row, JTuple2[Int, Long]],
+  isLeft: Boolean): Unit = {
+
+val inputRow = value.row
+cRowWrapper.reset()
+cRowWrapper.setCollector(out)
+cRowWrapper.setChange(value.change)
+
+val curProcessTime = ctx.timerService.currentProcessingTime
+val oldCntAndExpiredTime = currentSideState.get(inputRow)
+val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
+  JTuple2.of(0, -1L)
+} else {
+  oldCntAndExpiredTime
+}
+
+cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, 
cntAndExpiredTime.f1)
+if (stateCleaningEnabled && timerState.value() == 0) {
+  timerState.update(cntAndExpiredTime.f1)
+  ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1)
+}
+
+// update current side stream state
+if (!value.change) {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1
+  if (cntAndExpiredTime.f0 <= 0) {
+currentSideState.remove(inputRow)
+  } else {
+currentSideState.put(inputRow, cntAndExpiredTime)
+  }
+} else {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1
+  currentSideState.put(inputRow, cntAndExpiredTime)
+}
+
+val otherSideIterator = 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170936524
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ---
@@ -149,45 +148,39 @@ object UpdatingPlanChecker {
   }
 
 case j: DataStreamJoin =>
-  val joinType = j.getJoinType
-  joinType match {
-case JoinRelType.INNER =>
-  // get key(s) for inner join
-  val lInKeys = visit(j.getLeft)
-  val rInKeys = visit(j.getRight)
-  if (lInKeys.isEmpty || rInKeys.isEmpty) {
-None
-  } else {
-// Output of inner join must have keys if left and right 
both contain key(s).
-// Key groups from both side will be merged by join 
equi-predicates
-val lInNames: Seq[String] = 
j.getLeft.getRowType.getFieldNames
-val rInNames: Seq[String] = 
j.getRight.getRowType.getFieldNames
-val joinNames = j.getRowType.getFieldNames
-
-// if right field names equal to left field names, calcite 
will rename right
-// field names. For example, T1(pk, a) join T2(pk, b), 
calcite will rename T2(pk, b)
-// to T2(pk0, b).
-val rInNamesToJoinNamesMap = rInNames
-  .zip(joinNames.subList(lInNames.size, joinNames.length))
-  .toMap
+  // get key(s) for inner join
+  val lInKeys = visit(j.getLeft)
+  val rInKeys = visit(j.getRight)
+  if (lInKeys.isEmpty || rInKeys.isEmpty) {
+None
+  } else {
+// Output of inner join must have keys if left and right both 
contain key(s).
--- End diff --

Yes, thank you


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170936813
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
with NonEquiPredicates.
+  * An MapState of type [Row, Long] is added to record how many rows from 
the right table can be
+  * matched for each left row. Left join without NonEquiPredicates doesn't 
need it because
+  * left rows can always join right rows as long as join keys are same.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoinWithNonEquiPredicates(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all field from right will be null
+  private var resultRow: Row = _
+  // how many matched rows from the right table for each left row
+  private var leftJoinCnt: MapState[Row, Long] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long](
--- End diff --

Long is more safe. I will change all count type to Long. What do you think? 


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170936660
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
without NonEquiPredicates.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null
+  private var resultRow: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+resultRow = new Row(resultType.getArity)
+LOG.debug("Instantiating NonWindowLeftJoin.")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The result is NULL from the right 
side, if there is no
+* match. Records will be expired in state if state retention time has 
been specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Int, Long]],
+  otherSideState: MapState[Row, JTuple2[Int, Long]],
+  isLeft: Boolean): Unit = {
+
+val inputRow = value.row
+cRowWrapper.reset()
+cRowWrapper.setCollector(out)
+cRowWrapper.setChange(value.change)
+
+val curProcessTime = ctx.timerService.currentProcessingTime
+val oldCntAndExpiredTime = currentSideState.get(inputRow)
+val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
+  JTuple2.of(0, -1L)
+} else {
+  oldCntAndExpiredTime
+}
+
+cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, 
cntAndExpiredTime.f1)
+if (stateCleaningEnabled && timerState.value() == 0) {
+  timerState.update(cntAndExpiredTime.f1)
+  ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1)
+}
+
+// update current side stream state
+if (!value.change) {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1
+  if (cntAndExpiredTime.f0 <= 0) {
+currentSideState.remove(inputRow)
+  } else {
+currentSideState.put(inputRow, cntAndExpiredTime)
+  }
+} else {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1
+  currentSideState.put(inputRow, cntAndExpiredTime)
+}
+
+val otherSideIterator = 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170610689
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
 ---
@@ -201,18 +202,294 @@ class JoinITCase extends StreamingWithStateTestBase {
 // Proctime window output uncertain results, so assert has been 
ignored here.
   }
 
+  @Test
+  def testJoin(): Unit = {
--- End diff --

Can this be more specific? like, inner equality join


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170296184
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ---
@@ -149,45 +148,39 @@ object UpdatingPlanChecker {
   }
 
 case j: DataStreamJoin =>
-  val joinType = j.getJoinType
-  joinType match {
-case JoinRelType.INNER =>
-  // get key(s) for inner join
-  val lInKeys = visit(j.getLeft)
-  val rInKeys = visit(j.getRight)
-  if (lInKeys.isEmpty || rInKeys.isEmpty) {
-None
-  } else {
-// Output of inner join must have keys if left and right 
both contain key(s).
-// Key groups from both side will be merged by join 
equi-predicates
-val lInNames: Seq[String] = 
j.getLeft.getRowType.getFieldNames
-val rInNames: Seq[String] = 
j.getRight.getRowType.getFieldNames
-val joinNames = j.getRowType.getFieldNames
-
-// if right field names equal to left field names, calcite 
will rename right
-// field names. For example, T1(pk, a) join T2(pk, b), 
calcite will rename T2(pk, b)
-// to T2(pk0, b).
-val rInNamesToJoinNamesMap = rInNames
-  .zip(joinNames.subList(lInNames.size, joinNames.length))
-  .toMap
+  // get key(s) for inner join
+  val lInKeys = visit(j.getLeft)
+  val rInKeys = visit(j.getRight)
+  if (lInKeys.isEmpty || rInKeys.isEmpty) {
+None
+  } else {
+// Output of inner join must have keys if left and right both 
contain key(s).
--- End diff --

remove "inner"?


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170613748
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
without NonEquiPredicates.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null
+  private var resultRow: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+resultRow = new Row(resultType.getArity)
+LOG.debug("Instantiating NonWindowLeftJoin.")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The result is NULL from the right 
side, if there is no
+* match. Records will be expired in state if state retention time has 
been specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Int, Long]],
+  otherSideState: MapState[Row, JTuple2[Int, Long]],
+  isLeft: Boolean): Unit = {
+
+val inputRow = value.row
+cRowWrapper.reset()
+cRowWrapper.setCollector(out)
+cRowWrapper.setChange(value.change)
+
+val curProcessTime = ctx.timerService.currentProcessingTime
+val oldCntAndExpiredTime = currentSideState.get(inputRow)
+val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
+  JTuple2.of(0, -1L)
+} else {
+  oldCntAndExpiredTime
+}
+
+cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, 
cntAndExpiredTime.f1)
+if (stateCleaningEnabled && timerState.value() == 0) {
+  timerState.update(cntAndExpiredTime.f1)
+  ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1)
+}
+
+// update current side stream state
+if (!value.change) {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1
+  if (cntAndExpiredTime.f0 <= 0) {
+currentSideState.remove(inputRow)
+  } else {
+currentSideState.put(inputRow, cntAndExpiredTime)
+  }
+} else {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1
+  currentSideState.put(inputRow, cntAndExpiredTime)
+}
+
+val otherSideIterator = 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170610792
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
@@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase {
 expected.add("D,R-8,null")
 StreamITCase.compareWithList(expected)
   }
+
+  /** test non-window inner join **/
+  @Test
+  def testNonWindowInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+data1.+=((1, 9L, "Hi6"))
+data1.+=((1, 8L, "Hi8"))
+data1.+=((3, 8L, "Hi9"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+data2.+=((3, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.b > t2.b
+|""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList(
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi3",
+  "1,HiHi,Hi6",
+  "1,HiHi,Hi8",
+  "2,HeHe,Hi5",
+  "null,HeHe,Hi9")
+
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testJoin(): Unit = {
--- End diff --

Can this be more specific? like, inner equality join


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170619950
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
 ---
@@ -985,4 +1017,232 @@ class JoinHarnessTest extends HarnessTestBase {
 
 testHarness.close()
   }
+
+  @Test
+  def testNonWindowLeftJoinWithOutNonEqualPred() {
--- End diff --

`WithOut` ==> `Without`


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170617462
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
with NonEquiPredicates.
+  * An MapState of type [Row, Long] is added to record how many rows from 
the right table can be
+  * matched for each left row. Left join without NonEquiPredicates doesn't 
need it because
+  * left rows can always join right rows as long as join keys are same.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoinWithNonEquiPredicates(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all field from right will be null
+  private var resultRow: Row = _
+  // how many matched rows from the right table for each left row
+  private var leftJoinCnt: MapState[Row, Long] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+leftJoinCnt = getRuntimeContext.getMapState(leftJoinCntDescriptor)
+resultRow = new Row(resultType.getArity)
+
+LOG.debug("Instantiating NonWindowLeftJoinWithNonEquiPredicates.")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The result is NULL from the right 
side, if there is no
+* match. Records will be expired in state if state retention time has 
been specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Int, Long]],
+  otherSideState: MapState[Row, JTuple2[Int, Long]],
+  isLeft: Boolean): Unit = {
+
+val inputRow = value.row
+cRowWrapper.reset()
+cRowWrapper.setCollector(out)
+cRowWrapper.setChange(value.change)
+
+val curProcessTime = ctx.timerService.currentProcessingTime
+val oldCntAndExpiredTime = currentSideState.get(inputRow)
+val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
+  JTuple2.of(0, -1L)
+} else {
+  oldCntAndExpiredTime
+}
+
+cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, 
cntAndExpiredTime.f1)
+if 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170618769
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
with NonEquiPredicates.
+  * An MapState of type [Row, Long] is added to record how many rows from 
the right table can be
+  * matched for each left row. Left join without NonEquiPredicates doesn't 
need it because
+  * left rows can always join right rows as long as join keys are same.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoinWithNonEquiPredicates(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all field from right will be null
+  private var resultRow: Row = _
+  // how many matched rows from the right table for each left row
+  private var leftJoinCnt: MapState[Row, Long] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long](
--- End diff --

probably use `[Row, Int]`, to match with the type for count in 
`LeftSideState` and `RightSideState`?


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170601596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
without NonEquiPredicates.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null
+  private var resultRow: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+resultRow = new Row(resultType.getArity)
+LOG.debug("Instantiating NonWindowLeftJoin.")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The result is NULL from the right 
side, if there is no
+* match. Records will be expired in state if state retention time has 
been specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Int, Long]],
+  otherSideState: MapState[Row, JTuple2[Int, Long]],
+  isLeft: Boolean): Unit = {
+
+val inputRow = value.row
+cRowWrapper.reset()
+cRowWrapper.setCollector(out)
+cRowWrapper.setChange(value.change)
+
+val curProcessTime = ctx.timerService.currentProcessingTime
+val oldCntAndExpiredTime = currentSideState.get(inputRow)
+val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
+  JTuple2.of(0, -1L)
+} else {
+  oldCntAndExpiredTime
+}
+
+cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, 
cntAndExpiredTime.f1)
+if (stateCleaningEnabled && timerState.value() == 0) {
+  timerState.update(cntAndExpiredTime.f1)
+  ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1)
+}
+
+// update current side stream state
+if (!value.change) {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1
+  if (cntAndExpiredTime.f0 <= 0) {
+currentSideState.remove(inputRow)
+  } else {
+currentSideState.put(inputRow, cntAndExpiredTime)
+  }
+} else {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1
+  currentSideState.put(inputRow, cntAndExpiredTime)
+}
+
+val otherSideIterator = 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170613883
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
without NonEquiPredicates.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null
+  private var resultRow: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+resultRow = new Row(resultType.getArity)
+LOG.debug("Instantiating NonWindowLeftJoin.")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The result is NULL from the right 
side, if there is no
+* match. Records will be expired in state if state retention time has 
been specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Int, Long]],
+  otherSideState: MapState[Row, JTuple2[Int, Long]],
+  isLeft: Boolean): Unit = {
+
+val inputRow = value.row
+cRowWrapper.reset()
+cRowWrapper.setCollector(out)
+cRowWrapper.setChange(value.change)
+
+val curProcessTime = ctx.timerService.currentProcessingTime
+val oldCntAndExpiredTime = currentSideState.get(inputRow)
+val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
+  JTuple2.of(0, -1L)
+} else {
+  oldCntAndExpiredTime
+}
+
+cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, 
cntAndExpiredTime.f1)
+if (stateCleaningEnabled && timerState.value() == 0) {
+  timerState.update(cntAndExpiredTime.f1)
+  ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1)
+}
+
+// update current side stream state
+if (!value.change) {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1
+  if (cntAndExpiredTime.f0 <= 0) {
+currentSideState.remove(inputRow)
+  } else {
+currentSideState.put(inputRow, cntAndExpiredTime)
+  }
+} else {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1
+  currentSideState.put(inputRow, cntAndExpiredTime)
+}
+
+val otherSideIterator = 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170298988
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, 
ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for 
stream-stream non-window Join.
+  *
+  * @param leftType  the input type of left stream
+  * @param rightType the input type of right stream
+  * @param resultTypethe output type of join
+  * @param genJoinFuncName   the function code of other non-equi condition
+  * @param genJoinFuncCode   the function name of other non-equi condition
+  * @param queryConfig   the configuration for the query to generate
+  */
+abstract class NonWindowJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends CoProcessFunction[CRow, CRow, CRow]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  // check if input types implement proper equals/hashCode
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  // state to hold left stream element
+  protected var leftState: MapState[Row, JTuple2[Int, Long]] = _
+  // state to hold right stream element
+  protected var rightState: MapState[Row, JTuple2[Int, Long]] = _
+  protected var cRowWrapper: CRowWrappingMultiOutputCollector = _
+
+  protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
+  protected val maxRetentionTime: Long = 
queryConfig.getMaxIdleStateRetentionTime
+  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  // state to record last timer of left stream, 0 means no timer
+  protected var leftTimer: ValueState[Long] = _
+  // state to record last timer of right stream, 0 means no timer
+  protected var rightTimer: ValueState[Long] = _
+
+  // other condition function
+  protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+// initialize left and right state, the first element of tuple2 
indicates how many rows of
+// this row, while the second element represents the expired time of 
this row.
+val tupleTypeInfo = new TupleTypeInfo[JTuple2[Int, Long]](Types.INT, 
Types.LONG)
+val leftStateDescriptor = new MapStateDescriptor[Row, JTuple2[Int, 
Long]](
+  "left", leftType, tupleTypeInfo)
+val 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-02-26 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r170608444
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoin.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Only use for LeftJoin 
without NonEquiPredicates.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param queryConfig the configuration for the query to generate
+  */
+class NonWindowLeftJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null
+  private var resultRow: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+resultRow = new Row(resultType.getArity)
+LOG.debug("Instantiating NonWindowLeftJoin.")
+  }
+
+  /**
+* Puts or Retract an element from the input stream into state and 
search the other state to
+* output records meet the condition. The result is NULL from the right 
side, if there is no
+* match. Records will be expired in state if state retention time has 
been specified.
+*/
+  override def processElement(
+  value: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow],
+  timerState: ValueState[Long],
+  currentSideState: MapState[Row, JTuple2[Int, Long]],
+  otherSideState: MapState[Row, JTuple2[Int, Long]],
+  isLeft: Boolean): Unit = {
+
+val inputRow = value.row
+cRowWrapper.reset()
+cRowWrapper.setCollector(out)
+cRowWrapper.setChange(value.change)
+
+val curProcessTime = ctx.timerService.currentProcessingTime
+val oldCntAndExpiredTime = currentSideState.get(inputRow)
+val cntAndExpiredTime = if (null == oldCntAndExpiredTime) {
+  JTuple2.of(0, -1L)
+} else {
+  oldCntAndExpiredTime
+}
+
+cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, 
cntAndExpiredTime.f1)
+if (stateCleaningEnabled && timerState.value() == 0) {
+  timerState.update(cntAndExpiredTime.f1)
+  ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1)
+}
+
+// update current side stream state
+if (!value.change) {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 - 1
+  if (cntAndExpiredTime.f0 <= 0) {
+currentSideState.remove(inputRow)
+  } else {
+currentSideState.put(inputRow, cntAndExpiredTime)
+  }
+} else {
+  cntAndExpiredTime.f0 = cntAndExpiredTime.f0 + 1
+  currentSideState.put(inputRow, cntAndExpiredTime)
+}
+
+val otherSideIterator = 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-01-20 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/5327

[FLINK-8428] [table] Implement stream-stream non-window left outer join


## What is the purpose of the change

Implement stream-stream non-window left outer join for sql/table-api. A 
simple design doc can be found 
[here](https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing)


## Brief change log

  - Add left join
- with non-equal predicates
- without non-equal predicates
  - Adapt retraction rules to left join. Outer join will generate 
retractions
  - Adapt `UpsertTableSink`. Table mode of dynamic table produced by left 
join is Update Mode, even if the table does not include a key definition
  - Add inner join test cases which consistent with test cases in batch.
  - Add left join test cases which consistent with test cases in batch.


## Verifying this change

This change added tests and can be verified as follows:

  - Added integration tests for left join with or without non-equal 
predicates.
  - Added HarnessTests left join with or without non-equal predicates.
  - Add tests for AccMode generate by left join.
  - Add tests for UpsertSink followed left join.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (already docs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink leftjoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5327.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5327


commit 2766de56d47e1a82f6605eb1dd80d8ea5e697a29
Author: hequn8128 
Date:   2018-01-21T04:54:08Z

[FLINK-8428] [table] Implement stream-stream non-window left outer join




---