mxm commented on a change in pull request #12759:
URL: https://github.com/apache/beam/pull/12759#discussion_r482840511
##########
File path:
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
##########
@@ -910,21 +892,16 @@ public void testEnsureStateCleanupOnFinalWatermark()
throws Exception {
operator.keyedStateInternals.state(
stateNamespace, StateTags.bag(stateId, ByteStringCoder.of()));
state.add(ByteString.copyFrom("userstate".getBytes(Charsets.UTF_8)));
+ // No timers have been set for cleanup
+ assertThat(testHarness.numEventTimeTimers(), is(0));
+ // State has been created
assertThat(testHarness.numKeyedStateEntries(), is(1));
// Generate final watermark to trigger state cleanup
testHarness.processWatermark(
new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.plus(1).getMillis()));
assertThat(testHarness.numKeyedStateEntries(), is(0));
-
- // Close should not repeat state cleanup
Review comment:
I believe the cleanup ought to be repeated if there is new state. If
there is none, then no cleanup will be performed because we keep track of the
state descriptors created and clear that set on clean-up.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]