[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704794#comment-16704794
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237875804
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala
 ##
 @@ -120,6 +120,16 @@ abstract class 
BaseTwoInputStreamOperatorWithStateRetention(
 latestRegisteredCleanUpTimer.update(newCleanUpTime)
   }
 
+  protected def cleanUpLastTimer(): Unit = {
 
 Review comment:
   Well it is tested in some of the testcases in the `TemporalJoinHarnessTest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704808#comment-16704808
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on issue #6871: [FLINK-10583][FLINK-10584][table] Add State 
Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#issuecomment-443220452
 
 
   Thanks for the review @pnowojski . I integrated most of your comments or I 
gave reasons on why I did not integrate the ones the I did not. Let me know 
what you think.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704805#comment-16704805
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237877929
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   0)
   }
 
+  // -- Event time TTL tests --
+
+  @Test
+  def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupShouldSucceed(): Unit = {
 
 Review comment:
   The tests have conceptually 3 phases, the build-up where we add some 
elements and set the initial cleanup timer, the update of the timer (left or 
right depending on the test), and then a final record that would otherwise fire 
but due to the cleanup, it does not.
   
   I agree that there is code duplication in the "building phase" of the test, 
but even if we merge the two tests, we will still have to go through the 
"building phase" for the second one. So I am not sure if we will save a lot of 
code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703309#comment-16703309
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237520314
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -205,6 +203,7 @@ class TemporalRowtimeJoin(
 val rightRowsSorted = getRightRowsSorted(rightRowtimeComparator)
 var lastUnprocessedTime = Long.MaxValue
 
+// TODO: here we should check if the right side has anything or not 
(optimization).
 
 Review comment:
   @pnowojski What is your opinion on this one?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703291#comment-16703291
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237515137
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   0)
   }
 
+  // -- Event time TTL tests --
+
+  @Test
+  def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = {
 
 Review comment:
   I agree that this adds nothing to the test coverage. It uses the same code 
as the following tests and it serves to illustrate that "this is what would 
happen without State Retention enabled".
   
   I will remove it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703254#comment-16703254
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237507423
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   0)
   }
 
+  // -- Event time TTL tests --
+
+  @Test
+  def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupShouldSucceed(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+// this should clean-up the state
+testHarness.setProcessingTime(5L)
+
+assert(testHarness.numKeyedStateEntries() == 1) // this is the index
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) // 
this should succeed
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupTimerGettingOverwrittenFromReadSide(): Unit = {
 
 Review comment:
   nit: rename `eventtime` to `rowtime` and also we are not using nomenclature 
"read" or "write" side, but either "probe/build" or "left/right".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703275#comment-16703275
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237509884
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   0)
   }
 
+  // -- Event time TTL tests --
+
+  @Test
+  def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupShouldSucceed(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+// this should clean-up the state
+testHarness.setProcessingTime(5L)
+
+assert(testHarness.numKeyedStateEntries() == 1) // this is the index
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) // 
this should succeed
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupTimerGettingOverwrittenFromReadSide(): Unit = {
 
 Review comment:
   Yes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703273#comment-16703273
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237509724
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   0)
   }
 
+  // -- Event time TTL tests --
+
+  @Test
+  def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupShouldSucceed(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
 
 Review comment:
   Yes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703272#comment-16703272
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237509459
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   0)
   }
 
+  // -- Event time TTL tests --
+
+  @Test
+  def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupShouldSucceed(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+// this should clean-up the state
+testHarness.setProcessingTime(5L)
+
+assert(testHarness.numKeyedStateEntries() == 1) // this is the index
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) // 
this should succeed
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupTimerGettingOverwrittenFromReadSide(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+// this should update the clean-up timer to 8
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 
0L)))
+
+// this should now do nothing (also it does not update the timer as 5 + 
2ms (min) < 8)
+testHarness.setProcessingTime(5L)
+
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 5L)))
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 5L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+// this should now clean up the state
+testHarness.setProcessingTime(8L)
+
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 7L))) // 
this should find no match
+
+testHarness.processWatermark1(new Watermark(10L))
+testHarness.processWatermark2(new Watermark(10L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupTimerGettingOverwrittenFromWriteSide(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtime

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703259#comment-16703259
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237478157
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala
 ##
 @@ -120,6 +120,16 @@ abstract class 
BaseTwoInputStreamOperatorWithStateRetention(
 latestRegisteredCleanUpTimer.update(newCleanUpTime)
   }
 
+  protected def cleanUpLastTimer(): Unit = {
 
 Review comment:
   shouldn't this be covered by some test?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703255#comment-16703255
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237484681
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -175,6 +166,13 @@ class TemporalRowtimeJoin(
 if (lastUnprocessedTime < Long.MaxValue) {
   registerTimer(lastUnprocessedTime)
 }
+
+// if we have more state at any side, then update the timer, else clean it 
up.
 
 Review comment:
   I think we shouldn't execute this if `!stateCleaningEnabled`. Calling 
`rightState.iterator().hasNext` might not be for free and maybe sometimes can 
be quite costly.
   
   Alternatively you could modify `cleanUpState` to return whether the state is 
empty or not, but it would be more complicate since `emitResultAndCleanUpState` 
already returns something.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703260#comment-16703260
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237505985
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   0)
   }
 
+  // -- Event time TTL tests --
+
+  @Test
+  def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupShouldSucceed(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
 
 Review comment:
   Now that I think about it, I would add:
   ```
public void processWatermarks(Watermark mark) throws Exception {
twoInputOperator.processWatermark1(mark);
twoInputOperator.processWatermark2(mark);
}
   ```
   method to the `TwoInputStreamOperatorTestHarness`. This pattern of advancing 
both watermarks at the same time is duplicated in multiple (if not in all) 
places...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703261#comment-16703261
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237495768
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   0)
   }
 
+  // -- Event time TTL tests --
+
+  @Test
+  def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupShouldSucceed(): Unit = {
 
 Review comment:
   I think `testEventTimeCleanupTimerGettingOverwrittenFromReadSide` supersedes 
this one (it tests for the same thing and more), so you can just drop 
`testEventTimeCleanupShouldSucceed`.
   
   Generally speaking I would also extend 
`testEventTimeCleanupTimerGettingOverwrittenFromReadSide` to cover also 
`testEventTimeCleanupTimerGettingOverwrittenFromWriteSide` in one longer test. 
   
   Problem is, that as it is now, there is a lot of code duplication between 
those tests, they have the same beginning and instead of deduplicating them IMO 
it's better/easier to just merge them into one test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703256#comment-16703256
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237490138
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   0)
   }
 
+  // -- Event time TTL tests --
+
+  @Test
+  def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = {
 
 Review comment:
   `testEventTimeScenarioWithoutAdvancingProcessingTime` I think is covered by 
already existing tests (the one that I have added originally), isn't it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703257#comment-16703257
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237500242
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
   0)
   }
 
+  // -- Event time TTL tests --
+
+  @Test
+  def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupShouldSucceed(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+// this should clean-up the state
+testHarness.setProcessingTime(5L)
+
+assert(testHarness.numKeyedStateEntries() == 1) // this is the index
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) // 
this should succeed
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupTimerGettingOverwrittenFromReadSide(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRowtimeTemporalJoinInfo())
+
+testHarness.open()
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.setProcessingTime(1L)
+
+// process without conversion rates
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L)))
+testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(2L))
+testHarness.processWatermark2(new Watermark(2L))
+
+// this should update the clean-up timer to 8
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L)))
+
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 
0L)))
+
+// this should now do nothing (also it does not update the timer as 5 + 
2ms (min) < 8)
+testHarness.setProcessingTime(5L)
+
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 5L)))
+expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 5L, "Euro", 114L, 
0L)))
+
+testHarness.processWatermark1(new Watermark(5L))
+testHarness.processWatermark2(new Watermark(5L))
+
+// this should now clean up the state
+testHarness.setProcessingTime(8L)
+
+testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 7L))) // 
this should find no match
+
+testHarness.processWatermark1(new Watermark(10L))
+testHarness.processWatermark2(new Watermark(10L))
+
+verify(expectedOutput, testHarness.getOutput)
+
+testHarness.close()
+  }
+
+  @Test
+  def testEventTimeCleanupTimerGettingOverwrittenFromWriteSide(): Unit = {
+// min=2ms max=4ms
+val testHarness = createTestHarness(new 
OrdersRatesRo

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703258#comment-16703258
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: 
[FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#discussion_r237506741
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala
 ##
 @@ -738,6 +738,136 @@ class TemporalJoinHarnessTest extends HarnessTestBase {
 testHarness.close()
   }
 
+  // -- Processing time TTL tests --
+
+  @Test
+  def testProcessingTimeScenarioWithoutAdvancingProcessingTime(): Unit = {
 
 Review comment:
   same comments as to `rowtime` versions:
   - `testProcessingTimeScenarioWithoutAdvancingProcessingTime` seems 
unnecessary
   - either deduplicate the test code between  
`testProcessingTimeCleanupShouldSucceed`, 
`testProcessingTimeCleanupTimerGettingOverwrittenFromReadSide` & 
`testProcessingTimeCleanupTimerGettingOverwrittenFromWriteSide` or collapse 
them to a single test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701610#comment-16701610
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on issue #6871: [FLINK-10583][FLINK-10584][table] Add State 
Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#issuecomment-442382181
 
 
   @pnowojski and @twalthr I updated the PR to expose the state retention logic 
to the joins. Please review and let me know if you have any comments. 
   
   Also there is a question I had that is marked with a `TODO` and has to do 
with a (small) potential optimization. Let me know if that comment seems 
correct.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701611#comment-16701611
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u edited a comment on issue #6871: [FLINK-10583][FLINK-10584][table] Add 
State Retention to temporal joins.
URL: https://github.com/apache/flink/pull/6871#issuecomment-442382181
 
 
   @pnowojski and @twalthr I updated the PR to expose the state retention logic 
to the joins. Please review and let me know if you have any comments. 
   
   Also there is a question I had that is marked with a `TODO` and has to do 
with a (small) potential optimisation. Let me know if that comment seems 
correct.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699124#comment-16699124
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on issue #6871: [FLINK-10583][table] Add base 
TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#issuecomment-441673911
 
 
   Thanks @twalthr ! So I will rebase the PR and have another look and if 
travis gives green light, I will go on and merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699042#comment-16699042
 ] 

ASF GitHub Bot commented on FLINK-10583:


twalthr commented on issue #6871: [FLINK-10583][table] Add base 
TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#issuecomment-441662394
 
 
   I think it is not worth the effort to convert code to Java that will be 
unnecessary once we have proper DataStream API TTL support. The discussion 
about Scala-free `flink-table` is also not done yet.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698934#comment-16698934
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on issue #6871: [FLINK-10583][table] Add base 
TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#issuecomment-441642893
 
 
   @fhueske @twalthr @pnowojski 
   
   What should we do with this PR given the effort to remove scala from sql's 
runtime code? Should I re-write it in Java?
   
   In addition, does this overlap with the effort to finalise TTL support at 
the backend level?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655388#comment-16655388
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r226345699
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala
 ##
 @@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util.Optional
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  *
+  * IMPORTANT NOTE TO USERS: When extending this class, do not use 
processing time
+  * timers in your business logic. The reason is that:
+  *
+  * 1) if your timers collide with clean up timers and you delete them, then 
state
+  * clean-up will not be performed, and
+  *
+  * 2) (this one is the reason why this class does not allow to override the 
onProcessingTime())
+  * the onProcessingTime with your logic would be also executed on each clean 
up timer.
+  */
+@Internal
+abstract class BaseTwoInputStreamOperatorWithStateRetention(
 
 Review comment:
   nit: I would still drop the `Base` name, but as you prefer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655357#comment-16655357
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on issue #6871: [FLINK-10583][table] Add base 
TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#issuecomment-431040705
 
 
   done!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655341#comment-16655341
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on a change in pull request #6871: [FLINK-10583][table] Add base 
TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r226333482
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -80,51 +87,52 @@ abstract class AbstractTwoInputStreamOperatorWithTTL(
 timerService = new SimpleTimerService(internalTimerService)
   }
 
-  override def processElement1(element: StreamRecord[CRow]): Unit = {
-registerProcessingCleanupTimer()
-  }
-
-  override def processElement2(element: StreamRecord[CRow]): Unit = {
-registerProcessingCleanupTimer()
-  }
-
-  private def registerProcessingCleanupTimer(): Unit = {
+  /**
+* If the user has specified a `minRetentionTime` and `maxRetentionTime`, 
this
+* method registers a cleanup timer for `currentProcessingTime + 
minRetentionTime`.
+*
+* When this timer fires, the 
[[AbstractTwoInputStreamOperatorWithTTL.cleanUpState()]]
+* method is called.
+*/
+  protected def registerProcessingCleanUpTimer(): Unit = {
 if (stateCleaningEnabled) {
   val currentProcessingTime = timerService.currentProcessingTime()
-  val currentCleanupTime = cleanupTimeState.value()
+  val currentCleanUpTime = Option(cleanUpTimeState.value())
 
-  if (currentCleanupTime == null
-|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+  if (currentCleanUpTime.isEmpty
+|| (currentProcessingTime + minRetentionTime) > 
currentCleanUpTime.get) {
 
-updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+updateCleanUpTimer(currentProcessingTime, currentCleanUpTime)
   }
 }
   }
 
-  /**
-* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
-* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
-* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one
-* cleanup timer is registered per key.
-*/
-  private def updateCleanupTimer(currentProcessingTime: JLong, 
currentCleanupTime: JLong): Unit = {
-if (currentCleanupTime != null) {
-  timerService.deleteProcessingTimeTimer(currentCleanupTime)
+  private def updateCleanUpTimer(
+  currentProcessingTime: JLong,
+  currentCleanUpTime: Option[JLong]): Unit = {
+
+val cleanUpTime: JLong = currentCleanUpTime.getOrElse(-1L)
+if (cleanUpTime != -1) {
 
 Review comment:
   I agree about the cleanness of the first solution.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655324#comment-16655324
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r226325589
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -80,51 +87,52 @@ abstract class AbstractTwoInputStreamOperatorWithTTL(
 timerService = new SimpleTimerService(internalTimerService)
   }
 
-  override def processElement1(element: StreamRecord[CRow]): Unit = {
-registerProcessingCleanupTimer()
-  }
-
-  override def processElement2(element: StreamRecord[CRow]): Unit = {
-registerProcessingCleanupTimer()
-  }
-
-  private def registerProcessingCleanupTimer(): Unit = {
+  /**
+* If the user has specified a `minRetentionTime` and `maxRetentionTime`, 
this
+* method registers a cleanup timer for `currentProcessingTime + 
minRetentionTime`.
+*
+* When this timer fires, the 
[[AbstractTwoInputStreamOperatorWithTTL.cleanUpState()]]
+* method is called.
+*/
+  protected def registerProcessingCleanUpTimer(): Unit = {
 if (stateCleaningEnabled) {
   val currentProcessingTime = timerService.currentProcessingTime()
-  val currentCleanupTime = cleanupTimeState.value()
+  val currentCleanUpTime = Option(cleanUpTimeState.value())
 
 Review comment:
   According to @twalthr we should avoid using scala classes in runtime. I 
didn't like that but changed my `Option` usages to java's `Optional`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655327#comment-16655327
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r226330817
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -39,6 +38,15 @@ import org.apache.flink.table.runtime.types.CRow
   * state to cleanup and what further action to take.
   *
   * This class takes care of maintaining at most one timer per key.
+  *
+  * IMPORTANT NOTE TO USERS: When extending this class, do not use 
processing time
+  * timers in your business logic. The reason is that:
+  *
+  * 1) if your timers collide with clean up timers and you delete them, then 
state
+  * clean-up will not be performed, and
+  *
+  * 2) (this one is the reason why this class does not allow to override the 
onProcessingTime())
+  * the onProcessingTime with your logic would be also executed on each clean 
up timer.
   */
 @PublicEvolving
 abstract class AbstractTwoInputStreamOperatorWithTTL(
 
 Review comment:
   Maybe rename it to `TwoInputStreamOperatorWithStateRetention`? I think name 
TTL is confusing with TTL in state backend while "state retention" is something 
different. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655326#comment-16655326
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r226329300
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -51,11 +59,11 @@ abstract class AbstractTwoInputStreamOperatorWithTTL(
   private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
   private val stateCleaningEnabled: Boolean = minRetentionTime > 1
 
-  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val CLEANUP_TIMESTAMP = "cleanup-timestamp"
   private val TIMERS_STATE_NAME = "timers"
 
   // the latest registered cleanup timer
-  private var cleanupTimeState: ValueState[JLong] = _
+  private var cleanUpTimeState: ValueState[JLong] = _
 
 Review comment:
   rename variable to `latestRegisteredCleanUpTimer` and drop the comment. Name 
`cleanUpTimeState` doesn't tell much.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655325#comment-16655325
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r226329029
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -80,51 +87,52 @@ abstract class AbstractTwoInputStreamOperatorWithTTL(
 timerService = new SimpleTimerService(internalTimerService)
   }
 
-  override def processElement1(element: StreamRecord[CRow]): Unit = {
-registerProcessingCleanupTimer()
-  }
-
-  override def processElement2(element: StreamRecord[CRow]): Unit = {
-registerProcessingCleanupTimer()
-  }
-
-  private def registerProcessingCleanupTimer(): Unit = {
+  /**
+* If the user has specified a `minRetentionTime` and `maxRetentionTime`, 
this
+* method registers a cleanup timer for `currentProcessingTime + 
minRetentionTime`.
+*
+* When this timer fires, the 
[[AbstractTwoInputStreamOperatorWithTTL.cleanUpState()]]
+* method is called.
+*/
+  protected def registerProcessingCleanUpTimer(): Unit = {
 if (stateCleaningEnabled) {
   val currentProcessingTime = timerService.currentProcessingTime()
-  val currentCleanupTime = cleanupTimeState.value()
+  val currentCleanUpTime = Option(cleanUpTimeState.value())
 
-  if (currentCleanupTime == null
-|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+  if (currentCleanUpTime.isEmpty
+|| (currentProcessingTime + minRetentionTime) > 
currentCleanUpTime.get) {
 
-updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+updateCleanUpTimer(currentProcessingTime, currentCleanUpTime)
   }
 }
   }
 
-  /**
-* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
-* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
-* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one
-* cleanup timer is registered per key.
-*/
-  private def updateCleanupTimer(currentProcessingTime: JLong, 
currentCleanupTime: JLong): Unit = {
-if (currentCleanupTime != null) {
-  timerService.deleteProcessingTimeTimer(currentCleanupTime)
+  private def updateCleanUpTimer(
+  currentProcessingTime: JLong,
+  currentCleanUpTime: Option[JLong]): Unit = {
+
+val cleanUpTime: JLong = currentCleanUpTime.getOrElse(-1L)
+if (cleanUpTime != -1) {
 
 Review comment:
   nit:
   ```
   if (!currentCleanUpTime.isEmpty()) {
 timerService.deleteProcessingTimeTimer(currentCleanUpTime.get())
   }
   ```
   In java I would even use:
   ```
   currentCleanUpTime.map(timerService::registerProcessingTimeTimer)
   ```
   (or sth along those lines, but I don't know what's the syntax in scala and 
@twalthr says that Scala <-> Java lambada functions interoperability is purely 
executed)
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655289#comment-16655289
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on issue #6871: [FLINK-10583][table] Add base 
TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#issuecomment-431026956
 
 
   @pnowojski I integrated your comments. Let me know what you think.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654867#comment-16654867
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on a change in pull request #6871: [FLINK-10583][table] Add base 
TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r226215552
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamR

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654866#comment-16654866
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on a change in pull request #6871: [FLINK-10583][table] Add base 
TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r226215275
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
 
 Review comment:
   The problem here is that we need to the operator, and there are different 
types of operators used in the tests. I will try to create an operator with all 
the needed functionality, and put all the build/teardown logic in `Before` and 
`After` methods. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654863#comment-16654863
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u commented on a change in pull request #6871: [FLINK-10583][table] Add base 
TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r226214592
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one
+   

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653695#comment-16653695
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225957516
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653689#comment-16653689
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225960810
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new St

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653697#comment-16653697
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225957980
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653691#comment-16653691
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225956138
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
 
 Review comment:
   I would avoid nulls and wrap this variable with `Option` and pass `Option` 
to the `updateCleanupTimer` method. Handling nullable variables without 
compilation errors enables is very dangerous and error prone.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, plea

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653694#comment-16653694
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225954133
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
 
 Review comment:
   As we discussed, I would slightly lean toward dropping the `final` words 
here, dropping the proxy/wrapper methods `onXYZ` and relaying on user to do 
something like:
   ```
   override def open() = {
 super.open()
 // my fancy logic.
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653698#comment-16653698
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225957668
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653687#comment-16653687
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225955291
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
 
 Review comment:
   here, I'm not sure. Either I would do the same as for `open` or leave this 
methods not implemented and relay on user to write sth like:
   ```
   override def processElement1(element) = {
 registerProcessingCleanupTimer()
 // my fancy logic
   }
   ```
   The argument in favour of second option is that maybe we want to 
register/bump the timers only on `processElement2` (whenever we touch the build 
side) and not on `processElement1`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
>

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653690#comment-16653690
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225965180
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new St

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653692#comment-16653692
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225962699
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new St

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653693#comment-16653693
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225957024
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala
 ##
 @@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
InternalTimer, Triggerable, TwoInputStreamOperator}
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+
+/**
+  * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean
+  * up their state based on a TTL. This TTL should be specified in the provided
+  * [[StreamQueryConfig]].
+  *
+  * For each known key, this operator registers a timer (in processing time) to
+  * fire after the TTL expires. When the timer fires, the subclass can decide 
which
+  * state to cleanup and what further action to take.
+  *
+  * This class takes care of maintaining at most one timer per key.
+  */
+@PublicEvolving
+abstract class AbstractTwoInputStreamOperatorWithTTL(
+queryConfig: StreamQueryConfig)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace] {
+
+  private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled: Boolean = minRetentionTime > 1
+
+  private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp"
+  private val TIMERS_STATE_NAME = "timers"
+
+  // the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
+
+  protected var timerService: SimpleTimerService = _
+
+  override final def open(): Unit = {
+
+initializeTimerService()
+
+if (stateCleaningEnabled) {
+  val inputCntDescriptor: ValueStateDescriptor[JLong] =
+new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG)
+  cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+}
+
+onOpen()
+  }
+
+  private def initializeTimerService(): Unit = {
+
+val internalTimerService = getInternalTimerService(
+  TIMERS_STATE_NAME,
+  VoidNamespaceSerializer.INSTANCE,
+  this)
+
+timerService = new SimpleTimerService(internalTimerService)
+  }
+
+  override final def processElement1(element: StreamRecord[CRow]): Unit = {
+onProcessElement1(element)
+registerProcessingCleanupTimer()
+  }
+
+  override final def processElement2(element: StreamRecord[CRow]): Unit = {
+onProcessElement2(element)
+registerProcessingCleanupTimer()
+  }
+
+  private def registerProcessingCleanupTimer(): Unit = {
+if (stateCleaningEnabled) {
+  val currentProcessingTime = timerService.currentProcessingTime()
+  val currentCleanupTime = cleanupTimeState.value()
+
+  if (currentCleanupTime == null
+|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) {
+
+updateCleanupTimer(currentProcessingTime, currentCleanupTime)
+  }
+}
+  }
+
+  /**
+* Deletes the processing time timer with timestamp `currentCleanupTime` 
(if any) and
+* registers a new one with timestamp `currentProcessingTime + 
maxRetentionTime`. This
+* method is used by the `registerProcessingCleanupTimer()` to guarantee 
that only one

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653688#comment-16653688
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225959340
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
 
 Review comment:
   Could you deduplicate the setup/closing logic of those methods? For example 
extract
   ```
   val operator: StubOperatorWithTTLTimers =
 new StubOperatorWithTTLTimers(streamQueryConfig)
testHarness = createTestHarness(operator)
testHarness.open()
   ```
   to a setup method or to `createStubHarnessWithTTLTimers()` method.
   
   If you pick the `setup` method, `close` could be moved to tearDown (but that 
would require `testHarness` to be a field not a local variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653696#comment-16653696
 ] 

ASF GitHub Bot commented on FLINK-10583:


pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add 
base TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871#discussion_r225961569
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.runtime.state.VoidNamespace
+import org.apache.flink.streaming.api.operators.InternalTimer
+import org.apache.flink.streaming.api.scala.OutputTag
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, 
TupleRowKeySelector}
+import 
org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL
+import org.apache.flink.table.runtime.types.CRow
+import org.hamcrest.{Description, TypeSafeMatcher}
+import org.junit.Test
+import org.hamcrest.MatcherAssert.assertThat
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+
+/**
+  * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]].
+  */
+class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase {
+
+  private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, 
CRow, CRow] = _
+
+  private val streamQueryConfig = new TestStreamQueryConfig(
+Time.milliseconds(2),
+Time.milliseconds(4)
+  )
+
+  @Test
+  def normalScenarioWorks(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(10L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered():
 Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(2L)
+testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(5L))
+  }
+
+  @Test
+  def 
whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): 
Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1))
+
+testHarness.setProcessingTime(4L)
+testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1))
+
+testHarness.setProcessingTime(20L)
+testHarness.close()
+
+assertThat(operator, hasFiredTimers(8L))
+  }
+
+  @Test
+  def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = {
+val operator: StubOperatorWithTTLTimers =
+  new StubOperatorWithTTLTimers(streamQueryConfig)
+
+testHarness = createTestHarness(operator)
+
+testHarness.open()
+
+testHarness.setProcessingTime(1L)
+testHarness.processElement1(new St

[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653601#comment-16653601
 ] 

ASF GitHub Bot commented on FLINK-10583:


kl0u opened a new pull request #6871: [FLINK-10583][table] Add base 
TwoInputStreamOperator with TTL operator.
URL: https://github.com/apache/flink/pull/6871
 
 
   ## What is the purpose of the change
   
   This is the first step for the implementation of FLINK-10583 and FLINK-10584.
   
   It introduces the `AbstractTwoInputStreamOperatorWithTTL` which contains the 
basic functionality for implementing state TTL based on timers.
   
   This operator makes sure that:
   1) only at most one timer is registered per key
   2) both "sides" of the operator (processElement1 and processElement2) are 
treated equally when it comes to registering cleanup timers.
   
   ## Verifying this change
   
   Added the `AbstractTwoInputStreamOperatorWithTTLTest` with tests for the 
class.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   
   R @pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for state retention to the Processing Time versioned joins.
> ---
>
> Key: FLINK-10583
> URL: https://issues.apache.org/jira/browse/FLINK-10583
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)