[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16318351#comment-16318351
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5140


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316873#comment-16316873
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5140
  
Thanks for the update and excellent work.
PR is good to merge :-)


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16314404#comment-16314404
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5140
  
Hi @fhueske, thanks for your concrete suggestions! IMO, the refactorings in 
`TimeBoundedStreamJoin` are quite reasonable, while the refactoring for 
`createNegativeWindowSizeJoin()` may not be so significant as the negative 
window size should be taken as an exception. Anyway, I've applied them for 
better efficiency. 😄

Thanks, Xingcan


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311794#comment-16311794
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159719430
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -183,23 +190,48 @@ class DataStreamWindowJoin(
 }
   }
 
-  def createEmptyJoin(
+  def createNegativeWindowSizeJoin(
--- End diff --

I think we can make this even more efficient if we implement this as:

```
def createNegativeWindowSizeJoin(
joinType: JoinType,
leftInput: DataStream[CRow],
rightInput: DataStream[CRow],
leftArity: Int,
rightArity: Int,
returnType: TypeInformation[CRow]): DataStream[CRow] = {

  // we filter all records instead of adding an empty source to preserve 
the watermarks
  val allFilter = new FlatMapFunction[CRow, CRow] with 
ResultTypeQueryable[CRow] {
override def flatMap(value: CRow, out: Collector[CRow]): Unit = { }
override def getProducedType: TypeInformation[CRow] = returnType
  }

  val leftPadder = new MapFunction[CRow, CRow] with 
ResultTypeQueryable[CRow] {
val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity)
override def map(value: CRow): CRow = new 
CRow(paddingUtil.padLeft(value.row), true)
override def getProducedType: TypeInformation[CRow] = returnType
  }

  val rightPadder = new MapFunction[CRow, CRow] with 
ResultTypeQueryable[CRow] {
val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity)
override def map(value: CRow): CRow = new 
CRow(paddingUtil.padRight(value.row), true)
override def getProducedType: TypeInformation[CRow] = returnType
  }

  val leftP = leftInput.getParallelism
  val rightP = rightInput.getParallelism

  joinType match {
case JoinType.INNER =>
  leftInput.flatMap(allFilter).name("Empty Inner 
Join").setParallelism(leftP)
.union(rightInput.flatMap(allFilter).name("Empty Inner 
Join").setParallelism(rightP))
case JoinType.LEFT_OUTER =>
  leftInput.map(leftPadder).name("Left Outer 
Join").setParallelism(leftP)
.union(rightInput.flatMap(allFilter).name("Left Outer 
Join").setParallelism(rightP))
case JoinType.RIGHT_OUTER =>
  leftInput.flatMap(allFilter).name("Right Outer 
Join").setParallelism(leftP)
.union(rightInput.map(rightPadder).name("Right Outer 
Join").setParallelism(rightP))
case JoinType.FULL_OUTER =>
  leftInput.map(leftPadder).name("Full Outer 
Join").setParallelism(leftP)
.union(rightInput.map(rightPadder).name("Full Outer 
Join").setParallelism(rightP))
  }
}
```

We also need to make `OuterJoinPaddingUtil` extend `java.io.Serializable` 
for this.



> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311793#comment-16311793
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159695882
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -196,35 +181,38 @@ abstract class TimeBoundedStreamJoin(
 if (rightTime >= rightQualifiedLowerBound && rightTime <= 
rightQualifiedUpperBound) {
   val rightRows = rightEntry.getValue
   var i = 0
-  var markEmitted = false
+  var entryUpdated = false
   while (i < rightRows.size) {
-joinCollector.resetThisTurn()
+joinCollector.reset()
 val tuple = rightRows.get(i)
 joinFunction.join(leftRow, tuple.f0, joinCollector)
-if (joinType == JoinType.RIGHT_OUTER || joinType == 
JoinType.FULL_OUTER) {
-  if (!tuple.f1 && joinCollector.everEmittedThisTurn) {
-// Mark the right row as being successfully joined and 
emitted.
-tuple.f1 = true
-markEmitted = true
+if (joinCollector.emitted) {
--- End diff --

change to

```
emitted = emitted || joinCollector.emitted
if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
  if (!tuple.f1 && joinCollector.emitted) {
// Mark the right row as being successfully joined and emitted.
tuple.f1 = true
entryUpdated = true
  }
}
```

to avoid a condition for inner and left joins
  


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311792#comment-16311792
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159700432
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -344,23 +434,42 @@ abstract class TimeBoundedStreamInnerJoin(
 * @param removeLeft whether to remove the left rows
 */
   private def removeExpiredRows(
+  collector: EmitAwareCollector,
   expirationTime: Long,
-  rowCache: MapState[Long, JList[Row]],
+  rowCache: MapState[Long, JList[JTuple2[Row, Boolean]]],
   timerState: ValueState[Long],
   ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
   removeLeft: Boolean): Unit = {
 
-val keysIterator = rowCache.keys().iterator()
+val iterator = rowCache.iterator()
 
 var earliestTimestamp: Long = -1L
-var rowTime: Long = 0L
 
 // We remove all expired keys and do not leave the loop early.
 // Hence, we do a full pass over the state.
-while (keysIterator.hasNext) {
-  rowTime = keysIterator.next
+while (iterator.hasNext) {
+  val entry = iterator.next
+  val rowTime = entry.getKey
   if (rowTime <= expirationTime) {
-keysIterator.remove()
+if ((joinType == JoinType.RIGHT_OUTER && !removeLeft) ||
--- End diff --

Refactor to 

```
if (removeLeft &&
  (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER)) {
  val rows = entry.getValue
  var i = 0
  while (i < rows.size) {
val tuple = rows.get(i)
if (!tuple.f1) {
  // Emit a null padding result if the row has never been successfully 
joined.
  collector.collect(paddingUtil.padLeft(tuple.f0))
}
i += 1
  }
} else if (!removeLeft &&
  (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER)) {
  val rows = entry.getValue
  var i = 0
  while (i < rows.size) {
val tuple = rows.get(i)
if (!tuple.f1) {
  // Emit a null padding result if the row has never been successfully 
joined.
  collector.collect(paddingUtil.padRight(tuple.f0))
}
i += 1
  }
}
iterator.remove()
```

to reduce the number of conditions.
  


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311789#comment-16311789
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159698104
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -437,15 +427,14 @@ abstract class TimeBoundedStreamJoin(
 * Remove the expired rows. Register a new timer if the cache still 
holds valid rows
 * after the cleaning up.
 *
-* @param collector  the collector to emit results
 * @param expirationTime the expiration time for this cache
 * @param rowCache   the row cache
 * @param timerState timer state for the opposite stream
 * @param ctxthe context to register the cleanup timer
 * @param removeLeft whether to remove the left rows
 */
   private def removeExpiredRows(
-  collector: Collector[Row],
--- End diff --

Why did you change the type? 
`EmitAwareCollector` is a `Collector[Row]`.


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311790#comment-16311790
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159697707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -437,15 +427,14 @@ abstract class TimeBoundedStreamJoin(
 * Remove the expired rows. Register a new timer if the cache still 
holds valid rows
 * after the cleaning up.
 *
-* @param collector  the collector to emit results
--- End diff --

Don't remove parameter documentation for `collector`


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311791#comment-16311791
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159697015
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -288,34 +276,37 @@ abstract class TimeBoundedStreamJoin(
 if (leftTime >= leftQualifiedLowerBound && leftTime <= 
leftQualifiedUpperBound) {
   val leftRows = leftEntry.getValue
   var i = 0
-  var markEmitted = false
+  var entryUpdated = false
   while (i < leftRows.size) {
-joinCollector.resetThisTurn()
+joinCollector.reset()
 val tuple = leftRows.get(i)
 joinFunction.join(tuple.f0, rightRow, joinCollector)
-if (joinType == JoinType.LEFT_OUTER || joinType == 
JoinType.FULL_OUTER) {
-  if (!tuple.f1 && joinCollector.everEmittedThisTurn) {
-// Mark the left row as being successfully joined and 
emitted.
-tuple.f1 = true
-markEmitted = true
+if (joinCollector.emitted) {
--- End diff --

same as for `processElement1()`


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16307886#comment-16307886
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5140
  
Hi @fhueske, thanks for your review. I've made the following changes to the 
PR.
1. Fixes the "wrong sides" problem in `TimeBoundedStreamJoin`.
2. Adds the logic for outer-joins with negative window size in 
`DataStreamWindowJoin`.
3. Refines the `EmitAwareCollector` according to your suggestions.
4. Uses a separate class `OuterJoinPaddingUtil` to deal with results 
padding.
5. Adds some test cases to `JoinITCase` and `JoinHarnessTest`.
6. Other minor changes to attribute/method/class names.

Thanks, Xingcan


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16306321#comment-16306321
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159069304
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin(
 if (rightTime >= rightQualifiedLowerBound && rightTime <= 
rightQualifiedUpperBound) {
   val rightRows = rightEntry.getValue
   var i = 0
+  var markEmitted = false
   while (i < rightRows.size) {
-joinFunction.join(leftRow, rightRows.get(i), cRowWrapper)
+joinCollector.resetThisTurn()
+val tuple = rightRows.get(i)
+joinFunction.join(leftRow, tuple.f0, joinCollector)
+if (joinType == JoinType.RIGHT_OUTER || joinType == 
JoinType.FULL_OUTER) {
+  if (!tuple.f1 && joinCollector.everEmittedThisTurn) {
+// Mark the right row as being successfully joined and 
emitted.
+tuple.f1 = true
+markEmitted = true
+  }
+}
 i += 1
   }
+  if (markEmitted) {
+// Write back the edited entry (mark emitted) for the right 
cache.
+rightEntry.setValue(rightRows)
+  }
 }
 
 if (rightTime <= rightExpirationTime) {
+  if (joinType == JoinType.LEFT_OUTER || joinType == 
JoinType.FULL_OUTER) {
--- End diff --

Yes, I should have added a harness test for that.


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16306318#comment-16306318
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159069074
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Collector to track whether there's a joined result.
+  */
+class JoinAwareCollector extends Collector[Row]{
+
+  private var emitted = false
+  private var emittedThisTurn = false
+  private var innerCollector: Collector[CRow] = _
+  private val cRow: CRow = new CRow()
--- End diff --

I'll add a function to set this value in the Collector.


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305966#comment-16305966
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159023858
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -142,50 +143,47 @@ class DataStreamWindowJoin(
 s"${joinConditionToString(schema.relDataType, joinCondition, 
getExpressionString)}), " +
 s"join: (${joinSelectionToString(schema.relDataType)})"
 
-joinType match {
-  case JoinRelType.INNER =>
-if (relativeWindowSize < 0) {
-  LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
-" please check the join conditions.")
-  createEmptyInnerJoin(leftDataStream, rightDataStream, 
returnTypeInfo)
-} else {
-  if (isRowTime) {
-createRowTimeInnerJoin(
-  leftDataStream,
-  rightDataStream,
-  returnTypeInfo,
-  joinOpName,
-  joinFunction.name,
-  joinFunction.code,
-  leftKeys,
-  rightKeys
-)
-  } else {
-createProcTimeInnerJoin(
-  leftDataStream,
-  rightDataStream,
-  returnTypeInfo,
-  joinOpName,
-  joinFunction.name,
-  joinFunction.code,
-  leftKeys,
-  rightKeys
-)
-  }
-}
-  case JoinRelType.FULL =>
-throw new TableException(
-  "Full join between stream and stream is not supported yet.")
-  case JoinRelType.LEFT =>
-throw new TableException(
-  "Left join between stream and stream is not supported yet.")
-  case JoinRelType.RIGHT =>
-throw new TableException(
-  "Right join between stream and stream is not supported yet.")
+val flinkJoinType = joinType match {
+  case JoinRelType.INNER => JoinType.INNER
+  case JoinRelType.FULL => JoinType.FULL_OUTER
+  case JoinRelType.LEFT => JoinType.LEFT_OUTER
+  case JoinRelType.RIGHT => JoinType.RIGHT_OUTER
+}
+
+if (relativeWindowSize < 0) {
+  LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
+" please check the join conditions.")
+  createEmptyJoin(leftDataStream, rightDataStream, returnTypeInfo)
--- End diff --

Yes, your are right. I'll add this part.


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305847#comment-16305847
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159009795
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -62,15 +65,23 @@ abstract class TimeBoundedStreamInnerJoin(
   with Compiler[FlatJoinFunction[Row, Row, Row]]
   with Logging {
 
-  private var cRowWrapper: CRowWrappingCollector = _
+  private val leftArity = leftType.getArity
+  private val rightArity = rightType.getArity
+  private val resultArity = leftArity + rightArity
+
+  // two reusable padding results
+  private val leftNullPaddingResult = new Row(resultArity)
--- End diff --

I think we can move the code to generate padded results into a util class 
that can be reused by other joins.


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305855#comment-16305855
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159012517
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -241,17 +288,66 @@ abstract class TimeBoundedStreamInnerJoin(
 if (leftTime >= leftQualifiedLowerBound && leftTime <= 
leftQualifiedUpperBound) {
   val leftRows = leftEntry.getValue
   var i = 0
+  var markEmitted = false
   while (i < leftRows.size) {
-joinFunction.join(leftRows.get(i), rightRow, cRowWrapper)
+joinCollector.resetThisTurn()
+val tuple = leftRows.get(i)
+joinFunction.join(tuple.f0, rightRow, joinCollector)
+if (joinType == JoinType.LEFT_OUTER || joinType == 
JoinType.FULL_OUTER) {
+  if (!tuple.f1 && joinCollector.everEmittedThisTurn) {
+// Mark the left row as being successfully joined and 
emitted.
+tuple.f1 = true
+markEmitted = true
+  }
+}
 i += 1
   }
+  if (markEmitted) {
+// Write back the edited entry (mark emitted) for the right 
cache.
+leftEntry.setValue(leftRows)
+  }
 }
 if (leftTime <= leftExpirationTime) {
+  if (joinType == JoinType.RIGHT_OUTER || joinType == 
JoinType.FULL_OUTER) {
--- End diff --

`JoinType.RIGHT_OUTER` should be `JoinType.LEFT_OUTER`


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305851#comment-16305851
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r158737260
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -142,50 +143,47 @@ class DataStreamWindowJoin(
 s"${joinConditionToString(schema.relDataType, joinCondition, 
getExpressionString)}), " +
 s"join: (${joinSelectionToString(schema.relDataType)})"
 
-joinType match {
-  case JoinRelType.INNER =>
-if (relativeWindowSize < 0) {
-  LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
-" please check the join conditions.")
-  createEmptyInnerJoin(leftDataStream, rightDataStream, 
returnTypeInfo)
-} else {
-  if (isRowTime) {
-createRowTimeInnerJoin(
-  leftDataStream,
-  rightDataStream,
-  returnTypeInfo,
-  joinOpName,
-  joinFunction.name,
-  joinFunction.code,
-  leftKeys,
-  rightKeys
-)
-  } else {
-createProcTimeInnerJoin(
-  leftDataStream,
-  rightDataStream,
-  returnTypeInfo,
-  joinOpName,
-  joinFunction.name,
-  joinFunction.code,
-  leftKeys,
-  rightKeys
-)
-  }
-}
-  case JoinRelType.FULL =>
-throw new TableException(
-  "Full join between stream and stream is not supported yet.")
-  case JoinRelType.LEFT =>
-throw new TableException(
-  "Left join between stream and stream is not supported yet.")
-  case JoinRelType.RIGHT =>
-throw new TableException(
-  "Right join between stream and stream is not supported yet.")
+val flinkJoinType = joinType match {
+  case JoinRelType.INNER => JoinType.INNER
+  case JoinRelType.FULL => JoinType.FULL_OUTER
+  case JoinRelType.LEFT => JoinType.LEFT_OUTER
+  case JoinRelType.RIGHT => JoinType.RIGHT_OUTER
+}
+
+if (relativeWindowSize < 0) {
+  LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
+" please check the join conditions.")
+  createEmptyJoin(leftDataStream, rightDataStream, returnTypeInfo)
--- End diff --

Empty outer joins need to be handled differently than empty inner joins 
because the records of the outer side(s) must be preserved and padded with 
nulls. Hence, we need to pass the join type and the generated code.


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305850#comment-16305850
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159012976
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -382,6 +498,29 @@ abstract class TimeBoundedStreamInnerJoin(
 }
   }
 
+  /**
+* Return a padding result with the given left/right row.
+* @param row the row to pad
+* @param paddingLeft pad left or right
+* @return the null padding result
+*/
+  private def paddingResult(row: Row, paddingLeft: Boolean): Row = {
--- End diff --

Move this method into a util class and split it into two method (`padLeft` 
and `padRight`). The methods should be `final`.


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305856#comment-16305856
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159013510
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StreamWindowJoinHarnessTest.scala
 ---
@@ -20,20 +20,25 @@ package org.apache.flink.table.runtime.harness
 import java.lang.{Long => JLong}
 import java.util.concurrent.ConcurrentLinkedQueue
 
+import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import 
org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
 import org.apache.flink.table.api.Types
 import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator,
 RowResultSortComparatorWithWatermarks, TupleRowKeySelector}
-import 
org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamInnerJoin, 
RowTimeBoundedStreamInnerJoin}
+import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamJoin, 
RowTimeBoundedStreamJoin}
 import 
org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
 import org.junit.Test
 
-class JoinHarnessTest extends HarnessTestBase {
+/**
+  * Since the runtime logic for different stream window joins are 
identical, we only test on
+  * inner join.
+  */
+class StreamWindowJoinHarnessTest extends HarnessTestBase {
--- End diff --

We should also add harness tests for the outer joins. These are the only 
tests that can test certain edge cases because the order of inputs can be 
precisely controlled.


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305854#comment-16305854
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159011797
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Collector to track whether there's a joined result.
+  */
+class JoinAwareCollector extends Collector[Row]{
--- End diff --

I would make this class simpler and move some of the logic into the join 
class.
Instead of tracking if something was emitted across multiple resets, I'd 
just check if something was emitted since a single reset. The join can have a 
flag that checks the input row was emitted by joining against the state. 
Moreover, I'd make some variables public and remove the accessors to reduce the 
number of method calls.

If we do this, we don't need 

- `setCollector` (can be set by directly modifying the public var)
- `emittedThisTurn` (we only need one emission flag)
- `resetThisTurn()` (we only need one emission flag)
- `everEmitted` (emitted can be directly accessed as public var)
- `everEmittedThisTurn` (only one emission flag)
- `collectWithoutNotifying` (we can simply emit because we don't have an 
emission flag across multiple resets)


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305859#comment-16305859
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159013964
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
@@ -461,6 +462,302 @@ class JoinITCase extends StreamingWithStateTestBase {
 StreamITCase.compareWithList(expected)
   }
 
+  // Tests for left outer join
+  @Test
+  def testProcTimeLeftOuterJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+env.setParallelism(1)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 LEFT OUTER JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND
+|t2.proctime + INTERVAL '3' SECOND
+|""".stripMargin
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+  .select('a, 'b, 'c, 'proctime)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+  .select('a, 'b, 'c, 'proctime)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+  }
+
+  @Test
+  def testRowTimeLeftOuterJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.clear
+
+val sqlQuery =
+  """
+|SELECT t2.key, t2.id, t1.id
+|FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON
+|  t1.key = t2.key AND
+|  t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
+|t2.rt + INTERVAL '6' SECOND
+|""".stripMargin
+
+val data1 = new mutable.MutableList[(String, String, Long)]
+// for boundary test
+data1.+=(("A", "L-1", 1000L))
+data1.+=(("A", "L-2", 2000L))
+data1.+=(("B", "L-4", 4000L))
+data1.+=(("A", "L-6", 6000L))
+data1.+=(("C", "L-7", 7000L))
+data1.+=(("A", "L-10", 1L))
+data1.+=(("A", "L-12", 12000L))
+data1.+=(("A", "L-20", 2L))
+
+val data2 = new mutable.MutableList[(String, String, Long)]
--- End diff --

Add a row to the right data set such that one left row joins with two right 
rows.


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305857#comment-16305857
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159014072
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
@@ -461,6 +462,302 @@ class JoinITCase extends StreamingWithStateTestBase {
 StreamITCase.compareWithList(expected)
   }
 
+  // Tests for left outer join
+  @Test
+  def testProcTimeLeftOuterJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+env.setParallelism(1)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 LEFT OUTER JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND
+|t2.proctime + INTERVAL '3' SECOND
+|""".stripMargin
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+  .select('a, 'b, 'c, 'proctime)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+  .select('a, 'b, 'c, 'proctime)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+  }
+
+  @Test
+  def testRowTimeLeftOuterJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.clear
+
+val sqlQuery =
+  """
+|SELECT t2.key, t2.id, t1.id
+|FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON
+|  t1.key = t2.key AND
+|  t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
+|t2.rt + INTERVAL '6' SECOND
+|""".stripMargin
+
+val data1 = new mutable.MutableList[(String, String, Long)]
+// for boundary test
+data1.+=(("A", "L-1", 1000L))
+data1.+=(("A", "L-2", 2000L))
+data1.+=(("B", "L-4", 4000L))
+data1.+=(("A", "L-6", 6000L))
+data1.+=(("C", "L-7", 7000L))
+data1.+=(("A", "L-10", 1L))
+data1.+=(("A", "L-12", 12000L))
+data1.+=(("A", "L-20", 2L))
+
+val data2 = new mutable.MutableList[(String, String, Long)]
+data2.+=(("A", "R-6", 6000L))
+data2.+=(("B", "R-7", 7000L))
+data2.+=(("D", "R-8", 8000L))
+
+val t1 = env.fromCollection(data1)
+  .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+  .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+val t2 = env.fromCollection(data2)
+  .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+  .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+val expected = new java.util.ArrayList[String]
+expected.add("A,R-6,L-1")
+expected.add("A,R-6,L-2")
+expected.add("A,R-6,L-6")
+expected.add("A,R-6,L-10")
+expected.add("A,R-6,L-12")
+expected.add("B,R-7,L-4")
+expected.add("null,null,L-7")
+expected.add("null,null,L-20")
+StreamITCase.compareWithList(expected)
+  }
+
+  // Tests for right outer join
+  @Test
+  def testProcTimeRightOuterJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+env.setParallelism(1)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 RIGHT OUTER JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.proctime BETWEEN t2.proctime - I

[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305852#comment-16305852
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159011164
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin(
 if (rightTime >= rightQualifiedLowerBound && rightTime <= 
rightQualifiedUpperBound) {
   val rightRows = rightEntry.getValue
   var i = 0
+  var markEmitted = false
   while (i < rightRows.size) {
-joinFunction.join(leftRow, rightRows.get(i), cRowWrapper)
+joinCollector.resetThisTurn()
+val tuple = rightRows.get(i)
+joinFunction.join(leftRow, tuple.f0, joinCollector)
+if (joinType == JoinType.RIGHT_OUTER || joinType == 
JoinType.FULL_OUTER) {
+  if (!tuple.f1 && joinCollector.everEmittedThisTurn) {
+// Mark the right row as being successfully joined and 
emitted.
+tuple.f1 = true
+markEmitted = true
+  }
+}
 i += 1
   }
+  if (markEmitted) {
+// Write back the edited entry (mark emitted) for the right 
cache.
+rightEntry.setValue(rightRows)
+  }
 }
 
 if (rightTime <= rightExpirationTime) {
+  if (joinType == JoinType.LEFT_OUTER || joinType == 
JoinType.FULL_OUTER) {
--- End diff --

This should be `joinType == JoinType.RIGHT_OUTER || joinType == 
JoinType.FULL_OUTER` because we preserve the records of the right side.

This should be covered by a harness test.


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305858#comment-16305858
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159012674
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -268,13 +364,16 @@ abstract class TimeBoundedStreamInnerJoin(
   ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
   out: Collector[CRow]): Unit = {
 
+joinCollector.setCollector(out)
+joinCollector.reset()
--- End diff --

No need to reset


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305853#comment-16305853
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159010195
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -143,29 +172,14 @@ abstract class TimeBoundedStreamInnerJoin(
   ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
   out: Collector[CRow]): Unit = {
 
+joinCollector.setCollector(out)
--- End diff --

Directly set the variable to avoid the method call (like 
`CRowWrappingCollector`).


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305849#comment-16305849
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159010693
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin(
 if (rightTime >= rightQualifiedLowerBound && rightTime <= 
rightQualifiedUpperBound) {
   val rightRows = rightEntry.getValue
   var i = 0
+  var markEmitted = false
--- End diff --

rename to `entryUpdated`


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305860#comment-16305860
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159014190
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -133,6 +149,19 @@ abstract class TimeBoundedStreamInnerJoin(
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
   new ValueStateDescriptor[Long]("InnerJoinRightTimerState", 
classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
+
+
+// Initialize the two reusable padding results.
+var i = 0
+while (i < leftArity) {
--- End diff --

Move to util class


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305848#comment-16305848
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r158738627
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Collector to track whether there's a joined result.
+  */
+class JoinAwareCollector extends Collector[Row]{
+
+  private var emitted = false
+  private var emittedThisTurn = false
+  private var innerCollector: Collector[CRow] = _
+  private val cRow: CRow = new CRow()
--- End diff --

explicitly set `change` value of`CRow`


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16285756#comment-16285756
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5140
  
Thanks for the PR @xccui.
I'll try to have a look at it sometime this week.

Best, Fabian


> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284543#comment-16284543
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5140

[FLINK-7797] [table] Add support for windowed outer joins for streaming 
tables

## What is the purpose of the change

This PR adds support for windowed outer joins for streaming tables.

## Brief change log

  - Adjusts the plan translation logic to accept stream window outer join.
  - Adheres an ever emitted flag to each row. When a row is removed from 
the cache (or detected as not cached), a null padding join result will be 
emitted if necessary.
  - Adds a custom `JoinAwareCollector` to track whether there's a 
successfully joined result for both sides in each join loop.
  - Adds table/SQL translation tests, and also join integration tests. 
Since the runtime logic is built on the existing window inner join, no new 
harness tests are added.
 - Updates the SQL/Table API docs.

## Verifying this change

This PR can be verified by the cases added in `JoinTest` and `JoinITCase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (**yes**)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (**yes**)
  - If yes, how is the feature documented? (**removes the restriction 
notes**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-7797

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5140.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5140


commit 34d3fde8049ec407849b61901acd8258a6a1f919
Author: Xingcan Cui 
Date:   2017-12-07T17:28:40Z

[FLINK-7797] [table] Add support for windowed outer joins for streaming 
tables




> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-04 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276856#comment-16276856
 ] 

Fabian Hueske commented on FLINK-7797:
--

This sounds good! 
+1 for adding the boolean flag. I think we can have this also for inner joins 
to reduce code duplication as it adds only very little overhead.

Fabian

> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-04 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276772#comment-16276772
 ] 

Xingcan Cui commented on FLINK-7797:


Hi [~fhueske], I'd like to share my basic thoughts about the implementation. 

For each cached record, an extra boolean flag will be added to indicate whether 
the record has ever been successfully joined. We need a custom data collector 
for the internal join function to update this flag. When a record is removed 
from the cache and its flag is {{false}}, a null padding result will be emitted.

Since the majority codes for the window inner join could be reused, maybe it's 
better to do some refactorings first. 

Other details will be considered (and discussed) during the implementation. 
What do you think of the plan?

Best, Xingcan

> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-11-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262185#comment-16262185
 ] 

Fabian Hueske commented on FLINK-7797:
--

Excellent! Thank you! :-)

> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-11-22 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262178#comment-16262178
 ] 

Xingcan Cui commented on FLINK-7797:


Hi [~fhueske], I'd like to work on it. :D

> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-11-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262171#comment-16262171
 ] 

Fabian Hueske commented on FLINK-7797:
--

[~xccui], you implemented the INNER JOIN. Do you want to work on this issue for 
Flink 1.5?

Thanks, Fabian

> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)