[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655138#comment-16655138
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski closed pull request #6776: [FLINK-9715][table] Support temporal join 
with event time
URL: https://github.com/apache/flink/pull/6776
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
index d54fd785862..3a38bc58800 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
@@ -109,7 +109,7 @@ class DataStreamJoin(
 val joinTranslator = createTranslator(tableEnv)
 
 val joinOpName = joinToString(getRowType, joinCondition, joinType, 
getExpressionString)
-val coProcessFunction = joinTranslator.getCoProcessFunction(
+val joinOperator = joinTranslator.getJoinOperator(
   joinType,
   schema.fieldNames,
   ruleDescription,
@@ -118,9 +118,10 @@ class DataStreamJoin(
   .keyBy(
 joinTranslator.getLeftKeySelector(),
 joinTranslator.getRightKeySelector())
-  .process(coProcessFunction)
-  .name(joinOpName)
-  .returns(CRowTypeInfo(schema.typeInfo))
+  .transform(
+joinOpName,
+CRowTypeInfo(schema.typeInfo),
+joinOperator)
   }
 
   private def validateKeyTypes(): Unit = {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
index 054476a7036..e8f9ff4efdb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
@@ -22,7 +22,8 @@ import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
 import org.apache.flink.table.api.{StreamQueryConfig, TableConfig}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.schema.RowSchema
@@ -58,11 +59,11 @@ class DataStreamJoinToCoProcessTranslator(
   rightSchema.projectedTypeInfo(joinInfo.rightKeys.toIntArray))
   }
 
-  def getCoProcessFunction(
+  def getJoinOperator(
   joinType: JoinRelType,
   returnFieldNames: Seq[String],
   ruleDescription: String,
-  queryConfig: StreamQueryConfig): CoProcessFunction[CRow, CRow, CRow] = {
+  queryConfig: StreamQueryConfig): TwoInputStreamOperator[CRow, CRow, 
CRow] = {
 // input must not be nullable, because the runtime join function will make 
sure
 // the code-generated function won't process null inputs
 val generator = new FunctionCodeGenerator(
@@ -97,16 +98,16 @@ class DataStreamJoinToCoProcessTranslator(
   body,
   returnType)
 
-createCoProcessFunction(joinType, queryConfig, genFunction)
+createJoinOperator(joinType, queryConfig, genFunction)
   }
 
-  protected def createCoProcessFunction(
+  protected def createJoinOperator(
 joinType: JoinRelType,
 queryConfig: StreamQueryConfig,
 genFunction: GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row])
-: CoProcessFunction[CRow, CRow, CRow] = {
+: TwoInputStreamOperator[CRow, CRow, CRow] = {
 
-joinType match {
+val joinFunction = joinType match {
   case JoinRelType.INNER =>
 new NonWindowInnerJoin(
   leftSchema.typeInfo,
@@ -145,5 +146,6 @@ class DataStreamJoinToCoProcessTranslator(
   genFunction.code,
   queryConfig)
 }
+new KeyedCoProcessOperator(joinFunction)
   }
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655135#comment-16655135
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on issue #6776: [FLINK-9715][table] Support temporal join 
with event time
URL: https://github.com/apache/flink/pull/6776#issuecomment-430985221
 
 
   Thanks for the review! Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653705#comment-16653705
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225970725
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -272,24 +264,30 @@ class TemporalRowtimeJoin(
 * Binary search `rightRowsSorted` to find the latest right row to join 
with `leftTime`.
 * Latest means a right row with largest time that is still smaller or 
equal to `leftTime`.
 *
-* @return index of such element. If such row was not found (either 
`rightRowsSorted` is empty
-* or all `rightRowsSorted` are are newer) return -1.
+* @return found element or `Option.empty` If such row was not found 
(either `rightRowsSorted`
+* is empty or all `rightRowsSorted` are are newer).
 */
-  private def latestRightRowToJoin(rightRowsSorted: util.List[Row], leftTime: 
Long): Int = {
+  private def latestRightRowToJoin(rightRowsSorted: util.List[Row], leftTime: 
Long): Option[Row] = {
 
 Review comment:
   `Option` > `NullPointerException` :( Do not use nulls without working 
`@Nullable` annotations


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653369#comment-16653369
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225877956
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -272,24 +264,30 @@ class TemporalRowtimeJoin(
 * Binary search `rightRowsSorted` to find the latest right row to join 
with `leftTime`.
 * Latest means a right row with largest time that is still smaller or 
equal to `leftTime`.
 *
-* @return index of such element. If such row was not found (either 
`rightRowsSorted` is empty
-* or all `rightRowsSorted` are are newer) return -1.
+* @return found element or `Option.empty` If such row was not found 
(either `rightRowsSorted`
+* is empty or all `rightRowsSorted` are are newer).
 */
-  private def latestRightRowToJoin(rightRowsSorted: util.List[Row], leftTime: 
Long): Int = {
+  private def latestRightRowToJoin(rightRowsSorted: util.List[Row], leftTime: 
Long): Option[Row] = {
 
 Review comment:
   nit: use `null` instead of option


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653270#comment-16653270
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225844025
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -237,15 +238,17 @@ class TemporalRowtimeJoin(
 * be joined with it later
 */
   private def cleanUpState(timerTimestamp: Long, rightRowsSorted: 
util.List[Row]) = {
-for (i <- 0 until firstIndexToKeep(timerTimestamp, rightRowsSorted)) {
+var i = 0
+while (i < firstIndexToKeep(timerTimestamp, rightRowsSorted)) {
 
 Review comment:
   Ok changed, however, it shouldn't matter here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16652003#comment-16652003
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

kl0u commented on a change in pull request #6776: [FLINK-9715][table] Support 
temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225606620
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651839#comment-16651839
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225570366
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651837#comment-16651837
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225570366
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651831#comment-16651831
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225570366
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651828#comment-16651828
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225570366
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651778#comment-16651778
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225541824
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -145,22 +146,22 @@ class TemporalRowtimeJoin(
   }
 
   override def processElement1(element: StreamRecord[CRow]): Unit = {
-if (!element.getValue.change) {
 
 Review comment:
   We should reintroduce the `if` to make sure that exception creation does not 
affect runtime. The JVM might be smart here but a `if` branch doesn't harm. 
Otherwise an array is created for the arguments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651777#comment-16651777
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225543261
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -237,15 +238,17 @@ class TemporalRowtimeJoin(
 * be joined with it later
 */
   private def cleanUpState(timerTimestamp: Long, rightRowsSorted: 
util.List[Row]) = {
-for (i <- 0 until firstIndexToKeep(timerTimestamp, rightRowsSorted)) {
+var i = 0
+while (i < firstIndexToKeep(timerTimestamp, rightRowsSorted)) {
   val rightTime = getRightTime(rightRowsSorted.get(i))
   rightState.remove(rightTime)
+  i += 1
 }
   }
 
   private def firstIndexToKeep(timerTimestamp: Long, rightRowsSorted: 
util.List[Row]): Int = {
 val firstIndexNewerThenTimer =
-  rightRowsSorted.indexWhere(row => getRightTime(row) > timerTimestamp)
+  indexOf[Row](rightRowsSorted, row => getRightTime(row) > timerTimestamp)
 
 Review comment:
   Inline the Scala closure into `indexOf`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651779#comment-16651779
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225554368
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -237,15 +238,17 @@ class TemporalRowtimeJoin(
 * be joined with it later
 */
   private def cleanUpState(timerTimestamp: Long, rightRowsSorted: 
util.List[Row]) = {
-for (i <- 0 until firstIndexToKeep(timerTimestamp, rightRowsSorted)) {
+var i = 0
+while (i < firstIndexToKeep(timerTimestamp, rightRowsSorted)) {
 
 Review comment:
   Call `firstIndexToKeep()` only once.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651676#comment-16651676
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225536215
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651651#comment-16651651
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

fhueske commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225530690
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651595#comment-16651595
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

kl0u commented on a change in pull request #6776: [FLINK-9715][table] Support 
temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225514886
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651584#comment-16651584
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

kl0u commented on a change in pull request #6776: [FLINK-9715][table] Support 
temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225511761
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651433#comment-16651433
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225477141
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651432#comment-16651432
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225477141
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651426#comment-16651426
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225475981
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651379#comment-16651379
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

kl0u commented on a change in pull request #6776: [FLINK-9715][table] Support 
temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225459775
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651380#comment-16651380
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

kl0u commented on a change in pull request #6776: [FLINK-9715][table] Support 
temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225459329
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647867#comment-16647867
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

hequn8128 commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r224769626
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Rows, because we 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647707#comment-16647707
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r224727893
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Rows, because we 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647705#comment-16647705
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r224727893
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Rows, because we 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646603#comment-16646603
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

hequn8128 commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r224492754
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Rows, because we 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645102#comment-16645102
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r224112862
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Rows, because we 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645104#comment-16645104
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r224078378
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
 ##
 @@ -60,32 +61,28 @@ class TemporalJoin(
 val rightStateDescriptor = new ValueStateDescriptor[Row]("right", 
rightType)
 rightState = getRuntimeContext.getState(rightStateDescriptor)
 
+collector = new TimestampedCollector[CRow](output)
 cRowWrapper = new CRowWrappingCollector()
+cRowWrapper.out = collector
   }
 
-  override def processElement1(
-  value: CRow,
-  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-  out: Collector[CRow]): Unit = {
+  override def processElement1(element: StreamRecord[CRow]): Unit = {
 
 if (rightState.value() == null) {
   return
 }
 
-cRowWrapper.out = out
-cRowWrapper.setChange(value.change)
+cRowWrapper.setChange(element.getValue.change)
+collector.setTimestamp(element)
 
 Review comment:
   I removed this line


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645103#comment-16645103
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r224112920
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Rows, because we 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16644960#comment-16644960
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r224075507
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
 ##
 @@ -20,24 +20,24 @@ package org.apache.flink.table.runtime.join
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
TimestampedCollector, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.CRowWrappingCollector
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
 
-class TemporalJoin(
+class TemporalProcessTimeJoin(
 
 Review comment:
   Yes we should and it's under the development. It will be covered in separate 
PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641009#comment-16641009
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

hequn8128 commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r223206553
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Rows, because we 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641007#comment-16641007
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

hequn8128 commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r223206551
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
 ##
 @@ -60,32 +61,28 @@ class TemporalJoin(
 val rightStateDescriptor = new ValueStateDescriptor[Row]("right", 
rightType)
 rightState = getRuntimeContext.getState(rightStateDescriptor)
 
+collector = new TimestampedCollector[CRow](output)
 cRowWrapper = new CRowWrappingCollector()
+cRowWrapper.out = collector
   }
 
-  override def processElement1(
-  value: CRow,
-  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-  out: Collector[CRow]): Unit = {
+  override def processElement1(element: StreamRecord[CRow]): Unit = {
 
 if (rightState.value() == null) {
   return
 }
 
-cRowWrapper.out = out
-cRowWrapper.setChange(value.change)
+cRowWrapper.setChange(element.getValue.change)
+collector.setTimestamp(element)
 
 Review comment:
   Why call this function? The value of StreamRecord timestamp has already been 
erased. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641010#comment-16641010
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

hequn8128 commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r223206547
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
 ##
 @@ -20,24 +20,24 @@ package org.apache.flink.table.runtime.join
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
TimestampedCollector, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.CRowWrappingCollector
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
 
-class TemporalJoin(
+class TemporalProcessTimeJoin(
 
 Review comment:
   Should we also support state retention time for Temporal proctime join? 
State size may grow infinitely.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641008#comment-16641008
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

hequn8128 commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r223206555
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Rows, because we 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16637913#comment-16637913
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r222564049
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -210,32 +215,46 @@ class TemporalRowtimeJoin(
 if (rightRowIndex >= 0) {
   val rightRow = rightRowsSorted.get(rightRowIndex)
 
-  cRowWrapper.setChange(true)
-  collector.setAbsoluteTimestamp(leftTime)
   joinFunction.join(leftRow, rightRow, cRowWrapper)
 }
 leftIterator.remove()
   }
   else {
-lastUnprocessedTime = Some(
-  Math.min(
-lastUnprocessedTime.getOrElse(Long.MaxValue),
-leftTime))
+lastUnprocessedTime = Math.min(lastUnprocessedTime, leftTime)
   }
 }
 
-// remove all right entries older then the watermark, except the latest one
-// for example  with rightState = [1, 5, 9] and watermark = 6
-// we can not remove "5" from rightState, because left elements with 
rowtime of 7 or 8
-// could be joined with it later
-rightRowsSorted.map(rightRow => getRightTime(rightRow))
-  .filter(rightTime => rightTime <= timerTimestamp)
-  .dropRight(1)
-  .foreach(rightKey => rightState.remove(rightKey))
-
+cleanUpState(timerTimestamp, rightRowsSorted)
 lastUnprocessedTime
   }
 
+  /**
+* Removes all right entries older then the watermark, except the latest 
one. For example with:
+* rightState = [1, 5, 9]
+* and
+* watermark = 6
+* we can not remove "5" from rightState, because left elements with 
rowtime of 7 or 8 could
+* be joined with it later
+*/
+  private def cleanUpState(timerTimestamp: Long, rightRowsSorted: 
util.List[Row]) = {
+for (i <- 0 until firstIndexToKeep(timerTimestamp, rightRowsSorted)) {
 
 Review comment:
   Use a while loop instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16637912#comment-16637912
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r222564861
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -210,32 +215,46 @@ class TemporalRowtimeJoin(
 if (rightRowIndex >= 0) {
   val rightRow = rightRowsSorted.get(rightRowIndex)
 
-  cRowWrapper.setChange(true)
-  collector.setAbsoluteTimestamp(leftTime)
   joinFunction.join(leftRow, rightRow, cRowWrapper)
 }
 leftIterator.remove()
   }
   else {
-lastUnprocessedTime = Some(
-  Math.min(
-lastUnprocessedTime.getOrElse(Long.MaxValue),
-leftTime))
+lastUnprocessedTime = Math.min(lastUnprocessedTime, leftTime)
   }
 }
 
-// remove all right entries older then the watermark, except the latest one
-// for example  with rightState = [1, 5, 9] and watermark = 6
-// we can not remove "5" from rightState, because left elements with 
rowtime of 7 or 8
-// could be joined with it later
-rightRowsSorted.map(rightRow => getRightTime(rightRow))
-  .filter(rightTime => rightTime <= timerTimestamp)
-  .dropRight(1)
-  .foreach(rightKey => rightState.remove(rightKey))
-
+cleanUpState(timerTimestamp, rightRowsSorted)
 lastUnprocessedTime
   }
 
+  /**
+* Removes all right entries older then the watermark, except the latest 
one. For example with:
+* rightState = [1, 5, 9]
+* and
+* watermark = 6
+* we can not remove "5" from rightState, because left elements with 
rowtime of 7 or 8 could
+* be joined with it later
+*/
+  private def cleanUpState(timerTimestamp: Long, rightRowsSorted: 
util.List[Row]) = {
+for (i <- 0 until firstIndexToKeep(timerTimestamp, rightRowsSorted)) {
+  val rightTime = getRightTime(rightRowsSorted.get(i))
+  rightState.remove(rightTime)
+}
+  }
+
+  private def firstIndexToKeep(timerTimestamp: Long, rightRowsSorted: 
util.List[Row]): Int = {
+val firstIndexNewerThenTimer =
+  rightRowsSorted.indexWhere(row => getRightTime(row) > timerTimestamp)
 
 Review comment:
   You are using implicit Java<->Scala conversion here such that `indexWhere` 
can be used. So my original concern does still exist.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635718#comment-16635718
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221643699
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635722#comment-16635722
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221627515
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
 
 Review comment:
   Renamed to `TIMERS_STATE_NAME`, since those were the timers affecting both 
left and right side anyways.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635717#comment-16635717
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r222000146
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635715#comment-16635715
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221626059
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
 
 Review comment:
   Again I don't understand what is the problem? `CoProcessOperator` doesn't 
contain any logic besides wrapping/proxying to user function. Also what do you 
mean by:
   >  this class uses a lot of internal code.
   
   Regular `CoProcessFunction` and `CoProcessOperator` have fixed (different) 
semantic of what even time is set on emitted results when processing timers 
(check usage of `collector.setAbsoluteTimestamp` in this class compared to 
`KeyedCoProcessOperator`).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635721#comment-16635721
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221642460
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635720#comment-16635720
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221988168
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -92,6 +102,186 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
 
   private val rexBuilder = new RexBuilder(typeFactory)
 
+  @Test
+  def testRowtime() {
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 0L), 0L))
 
 Review comment:
   I think this is resolved by the comment bellow?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635719#comment-16635719
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221622010
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
 ##
 @@ -20,24 +20,24 @@ package org.apache.flink.table.runtime.join
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
TimestampedCollector, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.CRowWrappingCollector
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
 
-class TemporalJoin(
+class TemporalProcessTimeJoin(
 leftType: TypeInformation[Row],
 rightType: TypeInformation[Row],
 genJoinFuncName: String,
 genJoinFuncCode: String,
 queryConfig: StreamQueryConfig)
-  extends CoProcessFunction[CRow, CRow, CRow]
+  extends AbstractStreamOperator[CRow]
 
 Review comment:
   The problem is that we need custom timestamp handling in `onEvenTime` 
callbacks. As we discussed, modifying `CoProcessFunction` to suite our needs 
would make it more complicated for other users.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635716#comment-16635716
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221638717
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634008#comment-16634008
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221604828
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -92,6 +102,186 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
 
   private val rexBuilder = new RexBuilder(typeFactory)
 
+  @Test
+  def testRowtime() {
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 0L), 0L))
 
 Review comment:
   Btw Stream records should not have timestamps. We rely on the rowtime index.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633993#comment-16633993
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221593180
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
 
 Review comment:
   Btw why is a regular CoProcessFunction not enough?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634021#comment-16634021
 ] 

Hequn Cheng commented on FLINK-9715:


[~pnowojski] Ok, we can plan it later, thanks. 

> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633992#comment-16633992
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221574949
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
 ##
 @@ -196,30 +217,27 @@ object DataStreamTemporalJoinToCoProcessTranslator {
   s"Only single column join key is supported. " +
 s"Found ${joinInfo.rightKeys} in [$textualRepresentation]")
   }
-  val rightKey = joinInfo.rightKeys.get(0) + rightKeysStartingOffset
+  val rightJoinKeyInputReference = joinInfo.rightKeys.get(0) + 
rightKeysStartingOffset
 
-  val primaryKeyVisitor = new PrimaryKeyVisitor(textualRepresentation)
-  rightPrimaryKey.accept(primaryKeyVisitor)
+  val rightPrimaryKeyInputReference = extractInputReference(
+rightPrimaryKey,
+textualRepresentation)
 
-  primaryKeyVisitor.inputReference match {
-case None =>
-  throw new IllegalStateException(
-s"Failed to find primary key reference in 
[$textualRepresentation]")
-case Some(primaryKeyInputReference) if primaryKeyInputReference != 
rightKey =>
-  throw new ValidationException(
-s"Join key [$rightKey] must be the same as " +
-  s"temporal table's primary key [$primaryKeyInputReference] " +
-  s"in [$textualRepresentation]")
-case _ =>
-  rightPrimaryKey
+  if (rightPrimaryKeyInputReference != rightJoinKeyInputReference) {
+throw new ValidationException(
+  s"Join key [$rightJoinKeyInputReference] must be the same as " +
+s"temporal table's primary key [$rightPrimaryKey] " +
+s"in [$textualRepresentation]")
   }
+
+  rightPrimaryKey
 }
   }
 
   /**
 * Extracts input references from primary key expression.
 */
-  private class PrimaryKeyVisitor(textualRepresentation: String)
+  private class InputReferenceVisitor(textualRepresentation: String)
 
 Review comment:
   Update comment and exception as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633999#comment-16633999
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221578677
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
 
 Review comment:
   As mentioned before, we should split this class into a `CoProcessFunction` 
and a specialized `CoProcessOperator` that only contains minimal extension. 
Currently, this class uses a lot of internal code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>  

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634012#comment-16634012
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221605524
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634004#comment-16634004
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221586676
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634009#comment-16634009
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221581884
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634007#comment-16634007
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221583374
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
 
 Review comment:
   `net` -> `next`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>   

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633991#comment-16633991
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221597635
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633990#comment-16633990
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221590550
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634003#comment-16634003
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221585951
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634000#comment-16634000
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221592223
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634002#comment-16634002
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221572043
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamTemporalJoinToCoProcessTranslator.scala
 ##
 @@ -121,12 +130,26 @@ object DataStreamTemporalJoinToCoProcessTranslator {
   rightSchema,
   joinInfo,
   rexBuilder,
-  temporalJoinConditionExtractor.leftTimeAttribute.get,
-  temporalJoinConditionExtractor.rightTimeAttribute,
-  temporalJoinConditionExtractor.rightPrimaryKeyExpression.get,
+  extractInputReference(
+temporalJoinConditionExtractor.leftTimeAttribute.get,
+textualRepresentation),
+  temporalJoinConditionExtractor.rightTimeAttribute.map(
+rightTimeAttribute =>
+  extractInputReference(rightTimeAttribute, textualRepresentation)
+- leftSchema.typeInfo.getTotalFields),
 
 Review comment:
   Use `leftSchema.arity` instead. `getTotalFields` is physical type 
information that includes nested field counts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634011#comment-16634011
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221581739
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
 
 Review comment:
   Explicitly add description for the key of the map. Because when I 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633996#comment-16633996
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221604468
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -92,6 +102,186 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
 
   private val rexBuilder = new RexBuilder(typeFactory)
 
+  @Test
+  def testRowtime() {
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 0L), 0L))
 
 Review comment:
   Introduce helper method to reduce duplicate code for `new 
StreamRecord(CRow(v1, v2, t), t)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634001#comment-16634001
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221556774
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoinToCoProcessTranslator.scala
 ##
 @@ -23,6 +23,8 @@ import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction
 
 Review comment:
   Unused import.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633998#comment-16633998
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r22123
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ##
 @@ -24,6 +24,7 @@ import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.calcite.rex.RexNode
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
 
 Review comment:
   Unused import.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633997#comment-16633997
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221569725
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalProcessTimeJoin.scala
 ##
 @@ -20,24 +20,24 @@ package org.apache.flink.table.runtime.join
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
TimestampedCollector, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.CRowWrappingCollector
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
 
-class TemporalJoin(
+class TemporalProcessTimeJoin(
 leftType: TypeInformation[Row],
 rightType: TypeInformation[Row],
 genJoinFuncName: String,
 genJoinFuncCode: String,
 queryConfig: StreamQueryConfig)
-  extends CoProcessFunction[CRow, CRow, CRow]
+  extends AbstractStreamOperator[CRow]
 
 Review comment:
   We should have a clear separation between Flink functions and operators. All 
operator extensions should end up in `org.apache.flink.table.runtime.operators` 
and should only contain minimal changes that we could contribute back to 
`flink-core`. I suggest to undo the changes in this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634005#comment-16634005
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221579869
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
 
 Review comment:
   not older or equal then currentWatermark?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633989#comment-16633989
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221583273
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
 
 Review comment:
   btw I find the mixture of left/right, probe/build confusing. Maybe call this 
`rightTimersStateName` instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634006#comment-16634006
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221586412
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633995#comment-16633995
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221595631
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* This could have been a MultiMap indexed by rowtime, but we have to 
handle rows with
+* duplicated rowtimes. We can not use List, because we need efficient 
deletes of 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634010#comment-16634010
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221580253
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
+  private val leftStateName = "left"
+  private val rightStateName = "right"
+  private val registeredTimerStateName = "timer"
+  private val probteTimersStateName = "probe-timers"
 
 Review comment:
   typo


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> 

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633994#comment-16633994
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

twalthr commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r221580369
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val netLeftIndexStateName = "next-index"
 
 Review comment:
   Use upper case to explicitly mark constants.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
>   

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633747#comment-16633747
 ] 

Piotr Nowojski commented on FLINK-9715:
---

[~hequn8128] probably something like that would be possible to implement, but 
it isn't right now nor is it planned.  Also it would need separate discussion 
to define the semantic what should it mean. For append only tables it would 
make sense (return only the rows between {{rowtime_s}} and {{rowtime_e}} ?), 
but what about retractions and/or updates? If row is removed/updated in this 
period, what should be returned?

> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-09-29 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633187#comment-16633187
 ] 

Hequn Cheng commented on FLINK-9715:


Could the temporal table function returns a table within a time range, i.e., 
Rates(rowtime_s, rowtime_e) ?  

> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631806#comment-16631806
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski opened a new pull request #6776: [FLINK-9715][table] Support temporal 
join with event time
URL: https://github.com/apache/flink/pull/6776
 
 
   This PR adds support for temporal joins with event time. I haven't yet 
written the documentation for this feature, since it will have to base on the 
documentation that is still undergoing the review: 
https://github.com/apache/flink/pull/6741
   
   ## Verifying this change
   
   Refactoring is covered by existing tests, while for the new features there 
are added new unit tests and IT test cases.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (**yes** / no 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)