[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704794#comment-16704794 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237875804 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala ## @@ -120,6 +120,16 @@ abstract class BaseTwoInputStreamOperatorWithStateRetention( latestRegisteredCleanUpTimer.update(newCleanUpTime) } + protected def cleanUpLastTimer(): Unit = { Review comment: Well it is tested in some of the testcases in the `TemporalJoinHarnessTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704808#comment-16704808 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on issue #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#issuecomment-443220452 Thanks for the review @pnowojski . I integrated most of your comments or I gave reasons on why I did not integrate the ones the I did not. Let me know what you think. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704805#comment-16704805 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237877929 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala ## @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase { 0) } + // -- Event time TTL tests -- + + @Test + def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupShouldSucceed(): Unit = { Review comment: The tests have conceptually 3 phases, the build-up where we add some elements and set the initial cleanup timer, the update of the timer (left or right depending on the test), and then a final record that would otherwise fire but due to the cleanup, it does not. I agree that there is code duplication in the "building phase" of the test, but even if we merge the two tests, we will still have to go through the "building phase" for the second one. So I am not sure if we will save a lot of code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703309#comment-16703309 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237520314 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala ## @@ -205,6 +203,7 @@ class TemporalRowtimeJoin( val rightRowsSorted = getRightRowsSorted(rightRowtimeComparator) var lastUnprocessedTime = Long.MaxValue +// TODO: here we should check if the right side has anything or not (optimization). Review comment: @pnowojski What is your opinion on this one? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703291#comment-16703291 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237515137 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala ## @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase { 0) } + // -- Event time TTL tests -- + + @Test + def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = { Review comment: I agree that this adds nothing to the test coverage. It uses the same code as the following tests and it serves to illustrate that "this is what would happen without State Retention enabled". I will remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703254#comment-16703254 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237507423 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala ## @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase { 0) } + // -- Event time TTL tests -- + + @Test + def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupShouldSucceed(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +verify(expectedOutput, testHarness.getOutput) + +// this should clean-up the state +testHarness.setProcessingTime(5L) + +assert(testHarness.numKeyedStateEntries() == 1) // this is the index +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) // this should succeed + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupTimerGettingOverwrittenFromReadSide(): Unit = { Review comment: nit: rename `eventtime` to `rowtime` and also we are not using nomenclature "read" or "write" side, but either "probe/build" or "left/right". This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703275#comment-16703275 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237509884 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala ## @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase { 0) } + // -- Event time TTL tests -- + + @Test + def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupShouldSucceed(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +verify(expectedOutput, testHarness.getOutput) + +// this should clean-up the state +testHarness.setProcessingTime(5L) + +assert(testHarness.numKeyedStateEntries() == 1) // this is the index +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) // this should succeed + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupTimerGettingOverwrittenFromReadSide(): Unit = { Review comment: Yes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703273#comment-16703273 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237509724 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala ## @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase { 0) } + // -- Event time TTL tests -- + + @Test + def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupShouldSucceed(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) Review comment: Yes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703272#comment-16703272 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237509459 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala ## @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase { 0) } + // -- Event time TTL tests -- + + @Test + def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupShouldSucceed(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +verify(expectedOutput, testHarness.getOutput) + +// this should clean-up the state +testHarness.setProcessingTime(5L) + +assert(testHarness.numKeyedStateEntries() == 1) // this is the index +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) // this should succeed + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupTimerGettingOverwrittenFromReadSide(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +// this should update the clean-up timer to 8 +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 0L))) + +// this should now do nothing (also it does not update the timer as 5 + 2ms (min) < 8) +testHarness.setProcessingTime(5L) + +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 5L))) +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 5L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +// this should now clean up the state +testHarness.setProcessingTime(8L) + +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 7L))) // this should find no match + +testHarness.processWatermark1(new Watermark(10L)) +testHarness.processWatermark2(new Watermark(10L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupTimerGettingOverwrittenFromWriteSide(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtime
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703259#comment-16703259 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237478157 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala ## @@ -120,6 +120,16 @@ abstract class BaseTwoInputStreamOperatorWithStateRetention( latestRegisteredCleanUpTimer.update(newCleanUpTime) } + protected def cleanUpLastTimer(): Unit = { Review comment: shouldn't this be covered by some test? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703255#comment-16703255 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237484681 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala ## @@ -175,6 +166,13 @@ class TemporalRowtimeJoin( if (lastUnprocessedTime < Long.MaxValue) { registerTimer(lastUnprocessedTime) } + +// if we have more state at any side, then update the timer, else clean it up. Review comment: I think we shouldn't execute this if `!stateCleaningEnabled`. Calling `rightState.iterator().hasNext` might not be for free and maybe sometimes can be quite costly. Alternatively you could modify `cleanUpState` to return whether the state is empty or not, but it would be more complicate since `emitResultAndCleanUpState` already returns something. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703260#comment-16703260 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237505985 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala ## @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase { 0) } + // -- Event time TTL tests -- + + @Test + def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupShouldSucceed(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) Review comment: Now that I think about it, I would add: ``` public void processWatermarks(Watermark mark) throws Exception { twoInputOperator.processWatermark1(mark); twoInputOperator.processWatermark2(mark); } ``` method to the `TwoInputStreamOperatorTestHarness`. This pattern of advancing both watermarks at the same time is duplicated in multiple (if not in all) places... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703261#comment-16703261 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237495768 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala ## @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase { 0) } + // -- Event time TTL tests -- + + @Test + def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupShouldSucceed(): Unit = { Review comment: I think `testEventTimeCleanupTimerGettingOverwrittenFromReadSide` supersedes this one (it tests for the same thing and more), so you can just drop `testEventTimeCleanupShouldSucceed`. Generally speaking I would also extend `testEventTimeCleanupTimerGettingOverwrittenFromReadSide` to cover also `testEventTimeCleanupTimerGettingOverwrittenFromWriteSide` in one longer test. Problem is, that as it is now, there is a lot of code duplication between those tests, they have the same beginning and instead of deduplicating them IMO it's better/easier to just merge them into one test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703256#comment-16703256 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237490138 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala ## @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase { 0) } + // -- Event time TTL tests -- + + @Test + def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = { Review comment: `testEventTimeScenarioWithoutAdvancingProcessingTime` I think is covered by already existing tests (the one that I have added originally), isn't it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703257#comment-16703257 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237500242 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala ## @@ -530,6 +530,214 @@ class TemporalJoinHarnessTest extends HarnessTestBase { 0) } + // -- Event time TTL tests -- + + @Test + def testEventTimeScenarioWithoutAdvancingProcessingTime(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupShouldSucceed(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +verify(expectedOutput, testHarness.getOutput) + +// this should clean-up the state +testHarness.setProcessingTime(5L) + +assert(testHarness.numKeyedStateEntries() == 1) // this is the index +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) // this should succeed + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupTimerGettingOverwrittenFromReadSide(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRowtimeTemporalJoinInfo()) + +testHarness.open() +val expectedOutput = new ConcurrentLinkedQueue[Object]() + +testHarness.setProcessingTime(1L) + +// process without conversion rates +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 1L))) +testHarness.processElement2(new StreamRecord(CRow("Euro", 114L, 0L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 1L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(2L)) +testHarness.processWatermark2(new Watermark(2L)) + +// this should update the clean-up timer to 8 +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 4L))) + +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 4L, "Euro", 114L, 0L))) + +// this should now do nothing (also it does not update the timer as 5 + 2ms (min) < 8) +testHarness.setProcessingTime(5L) + +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 5L))) +expectedOutput.add(new StreamRecord(CRow(2L, "Euro", 5L, "Euro", 114L, 0L))) + +testHarness.processWatermark1(new Watermark(5L)) +testHarness.processWatermark2(new Watermark(5L)) + +// this should now clean up the state +testHarness.setProcessingTime(8L) + +testHarness.processElement1(new StreamRecord(CRow(2L, "Euro", 7L))) // this should find no match + +testHarness.processWatermark1(new Watermark(10L)) +testHarness.processWatermark2(new Watermark(10L)) + +verify(expectedOutput, testHarness.getOutput) + +testHarness.close() + } + + @Test + def testEventTimeCleanupTimerGettingOverwrittenFromWriteSide(): Unit = { +// min=2ms max=4ms +val testHarness = createTestHarness(new OrdersRatesRo
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703258#comment-16703258 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#discussion_r237506741 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/TemporalJoinHarnessTest.scala ## @@ -738,6 +738,136 @@ class TemporalJoinHarnessTest extends HarnessTestBase { testHarness.close() } + // -- Processing time TTL tests -- + + @Test + def testProcessingTimeScenarioWithoutAdvancingProcessingTime(): Unit = { Review comment: same comments as to `rowtime` versions: - `testProcessingTimeScenarioWithoutAdvancingProcessingTime` seems unnecessary - either deduplicate the test code between `testProcessingTimeCleanupShouldSucceed`, `testProcessingTimeCleanupTimerGettingOverwrittenFromReadSide` & `testProcessingTimeCleanupTimerGettingOverwrittenFromWriteSide` or collapse them to a single test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701610#comment-16701610 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on issue #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#issuecomment-442382181 @pnowojski and @twalthr I updated the PR to expose the state retention logic to the joins. Please review and let me know if you have any comments. Also there is a question I had that is marked with a `TODO` and has to do with a (small) potential optimization. Let me know if that comment seems correct. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701611#comment-16701611 ] ASF GitHub Bot commented on FLINK-10583: kl0u edited a comment on issue #6871: [FLINK-10583][FLINK-10584][table] Add State Retention to temporal joins. URL: https://github.com/apache/flink/pull/6871#issuecomment-442382181 @pnowojski and @twalthr I updated the PR to expose the state retention logic to the joins. Please review and let me know if you have any comments. Also there is a question I had that is marked with a `TODO` and has to do with a (small) potential optimisation. Let me know if that comment seems correct. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699124#comment-16699124 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on issue #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#issuecomment-441673911 Thanks @twalthr ! So I will rebase the PR and have another look and if travis gives green light, I will go on and merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699042#comment-16699042 ] ASF GitHub Bot commented on FLINK-10583: twalthr commented on issue #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#issuecomment-441662394 I think it is not worth the effort to convert code to Java that will be unnecessary once we have proper DataStream API TTL support. The discussion about Scala-free `flink-table` is also not done yet. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698934#comment-16698934 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on issue #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#issuecomment-441642893 @fhueske @twalthr @pnowojski What should we do with this PR given the effort to remove scala from sql's runtime code? Should I re-write it in Java? In addition, does this overlap with the effort to finalise TTL support at the backend level? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655388#comment-16655388 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r226345699 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/BaseTwoInputStreamOperatorWithStateRetention.scala ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} +import java.util.Optional + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + * + * IMPORTANT NOTE TO USERS: When extending this class, do not use processing time + * timers in your business logic. The reason is that: + * + * 1) if your timers collide with clean up timers and you delete them, then state + * clean-up will not be performed, and + * + * 2) (this one is the reason why this class does not allow to override the onProcessingTime()) + * the onProcessingTime with your logic would be also executed on each clean up timer. + */ +@Internal +abstract class BaseTwoInputStreamOperatorWithStateRetention( Review comment: nit: I would still drop the `Base` name, but as you prefer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655357#comment-16655357 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on issue #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#issuecomment-431040705 done! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655341#comment-16655341 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r226333482 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -80,51 +87,52 @@ abstract class AbstractTwoInputStreamOperatorWithTTL( timerService = new SimpleTimerService(internalTimerService) } - override def processElement1(element: StreamRecord[CRow]): Unit = { -registerProcessingCleanupTimer() - } - - override def processElement2(element: StreamRecord[CRow]): Unit = { -registerProcessingCleanupTimer() - } - - private def registerProcessingCleanupTimer(): Unit = { + /** +* If the user has specified a `minRetentionTime` and `maxRetentionTime`, this +* method registers a cleanup timer for `currentProcessingTime + minRetentionTime`. +* +* When this timer fires, the [[AbstractTwoInputStreamOperatorWithTTL.cleanUpState()]] +* method is called. +*/ + protected def registerProcessingCleanUpTimer(): Unit = { if (stateCleaningEnabled) { val currentProcessingTime = timerService.currentProcessingTime() - val currentCleanupTime = cleanupTimeState.value() + val currentCleanUpTime = Option(cleanUpTimeState.value()) - if (currentCleanupTime == null -|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + if (currentCleanUpTime.isEmpty +|| (currentProcessingTime + minRetentionTime) > currentCleanUpTime.get) { -updateCleanupTimer(currentProcessingTime, currentCleanupTime) +updateCleanUpTimer(currentProcessingTime, currentCleanUpTime) } } } - /** -* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and -* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This -* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one -* cleanup timer is registered per key. -*/ - private def updateCleanupTimer(currentProcessingTime: JLong, currentCleanupTime: JLong): Unit = { -if (currentCleanupTime != null) { - timerService.deleteProcessingTimeTimer(currentCleanupTime) + private def updateCleanUpTimer( + currentProcessingTime: JLong, + currentCleanUpTime: Option[JLong]): Unit = { + +val cleanUpTime: JLong = currentCleanUpTime.getOrElse(-1L) +if (cleanUpTime != -1) { Review comment: I agree about the cleanness of the first solution. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655324#comment-16655324 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r226325589 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -80,51 +87,52 @@ abstract class AbstractTwoInputStreamOperatorWithTTL( timerService = new SimpleTimerService(internalTimerService) } - override def processElement1(element: StreamRecord[CRow]): Unit = { -registerProcessingCleanupTimer() - } - - override def processElement2(element: StreamRecord[CRow]): Unit = { -registerProcessingCleanupTimer() - } - - private def registerProcessingCleanupTimer(): Unit = { + /** +* If the user has specified a `minRetentionTime` and `maxRetentionTime`, this +* method registers a cleanup timer for `currentProcessingTime + minRetentionTime`. +* +* When this timer fires, the [[AbstractTwoInputStreamOperatorWithTTL.cleanUpState()]] +* method is called. +*/ + protected def registerProcessingCleanUpTimer(): Unit = { if (stateCleaningEnabled) { val currentProcessingTime = timerService.currentProcessingTime() - val currentCleanupTime = cleanupTimeState.value() + val currentCleanUpTime = Option(cleanUpTimeState.value()) Review comment: According to @twalthr we should avoid using scala classes in runtime. I didn't like that but changed my `Option` usages to java's `Optional`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655327#comment-16655327 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r226330817 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -39,6 +38,15 @@ import org.apache.flink.table.runtime.types.CRow * state to cleanup and what further action to take. * * This class takes care of maintaining at most one timer per key. + * + * IMPORTANT NOTE TO USERS: When extending this class, do not use processing time + * timers in your business logic. The reason is that: + * + * 1) if your timers collide with clean up timers and you delete them, then state + * clean-up will not be performed, and + * + * 2) (this one is the reason why this class does not allow to override the onProcessingTime()) + * the onProcessingTime with your logic would be also executed on each clean up timer. */ @PublicEvolving abstract class AbstractTwoInputStreamOperatorWithTTL( Review comment: Maybe rename it to `TwoInputStreamOperatorWithStateRetention`? I think name TTL is confusing with TTL in state backend while "state retention" is something different. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655326#comment-16655326 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r226329300 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -51,11 +59,11 @@ abstract class AbstractTwoInputStreamOperatorWithTTL( private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime private val stateCleaningEnabled: Boolean = minRetentionTime > 1 - private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val CLEANUP_TIMESTAMP = "cleanup-timestamp" private val TIMERS_STATE_NAME = "timers" // the latest registered cleanup timer - private var cleanupTimeState: ValueState[JLong] = _ + private var cleanUpTimeState: ValueState[JLong] = _ Review comment: rename variable to `latestRegisteredCleanUpTimer` and drop the comment. Name `cleanUpTimeState` doesn't tell much. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655325#comment-16655325 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r226329029 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -80,51 +87,52 @@ abstract class AbstractTwoInputStreamOperatorWithTTL( timerService = new SimpleTimerService(internalTimerService) } - override def processElement1(element: StreamRecord[CRow]): Unit = { -registerProcessingCleanupTimer() - } - - override def processElement2(element: StreamRecord[CRow]): Unit = { -registerProcessingCleanupTimer() - } - - private def registerProcessingCleanupTimer(): Unit = { + /** +* If the user has specified a `minRetentionTime` and `maxRetentionTime`, this +* method registers a cleanup timer for `currentProcessingTime + minRetentionTime`. +* +* When this timer fires, the [[AbstractTwoInputStreamOperatorWithTTL.cleanUpState()]] +* method is called. +*/ + protected def registerProcessingCleanUpTimer(): Unit = { if (stateCleaningEnabled) { val currentProcessingTime = timerService.currentProcessingTime() - val currentCleanupTime = cleanupTimeState.value() + val currentCleanUpTime = Option(cleanUpTimeState.value()) - if (currentCleanupTime == null -|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + if (currentCleanUpTime.isEmpty +|| (currentProcessingTime + minRetentionTime) > currentCleanUpTime.get) { -updateCleanupTimer(currentProcessingTime, currentCleanupTime) +updateCleanUpTimer(currentProcessingTime, currentCleanUpTime) } } } - /** -* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and -* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This -* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one -* cleanup timer is registered per key. -*/ - private def updateCleanupTimer(currentProcessingTime: JLong, currentCleanupTime: JLong): Unit = { -if (currentCleanupTime != null) { - timerService.deleteProcessingTimeTimer(currentCleanupTime) + private def updateCleanUpTimer( + currentProcessingTime: JLong, + currentCleanUpTime: Option[JLong]): Unit = { + +val cleanUpTime: JLong = currentCleanUpTime.getOrElse(-1L) +if (cleanUpTime != -1) { Review comment: nit: ``` if (!currentCleanUpTime.isEmpty()) { timerService.deleteProcessingTimeTimer(currentCleanUpTime.get()) } ``` In java I would even use: ``` currentCleanUpTime.map(timerService::registerProcessingTimeTimer) ``` (or sth along those lines, but I don't know what's the syntax in scala and @twalthr says that Scala <-> Java lambada functions interoperability is purely executed) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655289#comment-16655289 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on issue #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#issuecomment-431026956 @pnowojski I integrated your comments. Let me know what you think. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654867#comment-16654867 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r226215552 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamR
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654866#comment-16654866 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r226215275 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = Review comment: The problem here is that we need to the operator, and there are different types of operators used in the tests. I will try to create an operator with all the needed functionality, and put all the build/teardown logic in `Before` and `After` methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654863#comment-16654863 ] ASF GitHub Bot commented on FLINK-10583: kl0u commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r226214592 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one +
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653695#comment-16653695 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225957516 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653689#comment-16653689 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225960810 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new St
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653697#comment-16653697 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225957980 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653691#comment-16653691 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225956138 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() Review comment: I would avoid nulls and wrap this variable with `Option` and pass `Option` to the `updateCleanupTimer` method. Handling nullable variables without compilation errors enables is very dangerous and error prone. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, plea
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653694#comment-16653694 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225954133 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { Review comment: As we discussed, I would slightly lean toward dropping the `final` words here, dropping the proxy/wrapper methods `onXYZ` and relaying on user to do something like: ``` override def open() = { super.open() // my fancy logic. } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653698#comment-16653698 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225957668 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653687#comment-16653687 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225955291 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { Review comment: here, I'm not sure. Either I would do the same as for `open` or leave this methods not implemented and relay on user to write sth like: ``` override def processElement1(element) = { registerProcessingCleanupTimer() // my fancy logic } ``` The argument in favour of second option is that maybe we want to register/bump the timers only on `processElement2` (whenever we touch the build side) and not on `processElement1`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > >
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653690#comment-16653690 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225965180 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new St
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653692#comment-16653692 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225962699 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new St
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653693#comment-16653693 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225957024 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/AbstractTwoInputStreamOperatorWithTTL.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.lang.{Long => JLong} + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer} +import org.apache.flink.streaming.api.SimpleTimerService +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, InternalTimer, Triggerable, TwoInputStreamOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow + +/** + * An abstract [[TwoInputStreamOperator]] that allows its subclasses to clean + * up their state based on a TTL. This TTL should be specified in the provided + * [[StreamQueryConfig]]. + * + * For each known key, this operator registers a timer (in processing time) to + * fire after the TTL expires. When the timer fires, the subclass can decide which + * state to cleanup and what further action to take. + * + * This class takes care of maintaining at most one timer per key. + */ +@PublicEvolving +abstract class AbstractTwoInputStreamOperatorWithTTL( +queryConfig: StreamQueryConfig) + extends AbstractStreamOperator[CRow] + with TwoInputStreamOperator[CRow, CRow, CRow] + with Triggerable[Any, VoidNamespace] { + + private val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + private val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val ACTIVE_CLEANUP_TIMESTAMP = "cleanup-timestamp" + private val TIMERS_STATE_NAME = "timers" + + // the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected var timerService: SimpleTimerService = _ + + override final def open(): Unit = { + +initializeTimerService() + +if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = +new ValueStateDescriptor[JLong](ACTIVE_CLEANUP_TIMESTAMP, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) +} + +onOpen() + } + + private def initializeTimerService(): Unit = { + +val internalTimerService = getInternalTimerService( + TIMERS_STATE_NAME, + VoidNamespaceSerializer.INSTANCE, + this) + +timerService = new SimpleTimerService(internalTimerService) + } + + override final def processElement1(element: StreamRecord[CRow]): Unit = { +onProcessElement1(element) +registerProcessingCleanupTimer() + } + + override final def processElement2(element: StreamRecord[CRow]): Unit = { +onProcessElement2(element) +registerProcessingCleanupTimer() + } + + private def registerProcessingCleanupTimer(): Unit = { +if (stateCleaningEnabled) { + val currentProcessingTime = timerService.currentProcessingTime() + val currentCleanupTime = cleanupTimeState.value() + + if (currentCleanupTime == null +|| (currentProcessingTime + minRetentionTime) > currentCleanupTime) { + +updateCleanupTimer(currentProcessingTime, currentCleanupTime) + } +} + } + + /** +* Deletes the processing time timer with timestamp `currentCleanupTime` (if any) and +* registers a new one with timestamp `currentProcessingTime + maxRetentionTime`. This +* method is used by the `registerProcessingCleanupTimer()` to guarantee that only one
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653688#comment-16653688 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225959340 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = Review comment: Could you deduplicate the setup/closing logic of those methods? For example extract ``` val operator: StubOperatorWithTTLTimers = new StubOperatorWithTTLTimers(streamQueryConfig) testHarness = createTestHarness(operator) testHarness.open() ``` to a setup method or to `createStubHarnessWithTTLTimers()` method. If you pick the `setup` method, `close` could be moved to tearDown (but that would require `testHarness` to be a field not a local variable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653696#comment-16653696 ] ASF GitHub Bot commented on FLINK-10583: pnowojski commented on a change in pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871#discussion_r225961569 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AbstractTwoInputStreamOperatorWithTTLTest.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.harness + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.VoidNamespace +import org.apache.flink.streaming.api.operators.InternalTimer +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.runtime.join.AbstractTwoInputStreamOperatorWithTTL +import org.apache.flink.table.runtime.types.CRow +import org.hamcrest.{Description, TypeSafeMatcher} +import org.junit.Test +import org.hamcrest.MatcherAssert.assertThat + +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala._ + +/** + * Tests for the [[AbstractTwoInputStreamOperatorWithTTL]]. + */ +class AbstractTwoInputStreamOperatorWithTTLTest extends HarnessTestBase { + + private var testHarness: KeyedTwoInputStreamOperatorTestHarness[JLong, CRow, CRow, CRow] = _ + + private val streamQueryConfig = new TestStreamQueryConfig( +Time.milliseconds(2), +Time.milliseconds(4) + ) + + @Test + def normalScenarioWorks(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(10L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionSmallerThanCurrentCleanupTimeNoNewTimerRegistered(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(2L) +testHarness.processElement2(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(5L)) + } + + @Test + def whenCurrentTimePlusMinRetentionLargerThanCurrentCleanupTimeTimerIsUpdated(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "hello"), 1)) + +testHarness.setProcessingTime(4L) +testHarness.processElement1(new StreamRecord(CRow(1L: JLong, "world"), 1)) + +testHarness.setProcessingTime(20L) +testHarness.close() + +assertThat(operator, hasFiredTimers(8L)) + } + + @Test + def otherSideToSameKeyStateAlsoUpdatesCleanupTimer(): Unit = { +val operator: StubOperatorWithTTLTimers = + new StubOperatorWithTTLTimers(streamQueryConfig) + +testHarness = createTestHarness(operator) + +testHarness.open() + +testHarness.setProcessingTime(1L) +testHarness.processElement1(new St
[jira] [Commented] (FLINK-10583) Add support for state retention to the Processing Time versioned joins.
[ https://issues.apache.org/jira/browse/FLINK-10583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653601#comment-16653601 ] ASF GitHub Bot commented on FLINK-10583: kl0u opened a new pull request #6871: [FLINK-10583][table] Add base TwoInputStreamOperator with TTL operator. URL: https://github.com/apache/flink/pull/6871 ## What is the purpose of the change This is the first step for the implementation of FLINK-10583 and FLINK-10584. It introduces the `AbstractTwoInputStreamOperatorWithTTL` which contains the basic functionality for implementing state TTL based on timers. This operator makes sure that: 1) only at most one timer is registered per key 2) both "sides" of the operator (processElement1 and processElement2) are treated equally when it comes to registering cleanup timers. ## Verifying this change Added the `AbstractTwoInputStreamOperatorWithTTLTest` with tests for the class. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) R @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for state retention to the Processing Time versioned joins. > --- > > Key: FLINK-10583 > URL: https://issues.apache.org/jira/browse/FLINK-10583 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)