tweise commented on a change in pull request #12759:
URL: https://github.com/apache/beam/pull/12759#discussion_r482392713
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -753,6 +754,13 @@ private void maybeEmitWatermark(long watermark) {
LOG.debug("Emitting watermark {}", watermark);
currentOutputWatermark = watermark;
output.emitWatermark(new Watermark(watermark));
+
+ // Check if the final watermark was triggered to perform state cleanup
for global window
+ if (keyedStateInternals != null
+ && currentOutputWatermark
Review comment:
The existing solution checks that we have not already reached the
watermark to avoid repeated execution of the cleanup code. Please carry that
over.
Also, why check for `keyedStateInternals`? See
https://github.com/apache/beam/pull/12733#discussion_r480015675
##########
File path:
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
##########
@@ -910,21 +892,16 @@ public void testEnsureStateCleanupOnFinalWatermark()
throws Exception {
operator.keyedStateInternals.state(
stateNamespace, StateTags.bag(stateId, ByteStringCoder.of()));
state.add(ByteString.copyFrom("userstate".getBytes(Charsets.UTF_8)));
+ // No timers have been set for cleanup
+ assertThat(testHarness.numEventTimeTimers(), is(0));
+ // State has been created
assertThat(testHarness.numKeyedStateEntries(), is(1));
// Generate final watermark to trigger state cleanup
testHarness.processWatermark(
new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.plus(1).getMillis()));
assertThat(testHarness.numKeyedStateEntries(), is(0));
-
- // Close should not repeat state cleanup
Review comment:
Keep this to check that the cleanup isn't repeated even when the
watermark is repeated.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1330,6 +1338,19 @@ public void setTimer(
@Deprecated
@Override
public void setTimer(TimerData timer) {
+ if (timer.getTimestamp().isAfter(GlobalWindow.INSTANCE.maxTimestamp())) {
Review comment:
The namespace we use for cleanup is based on `GlobalWindow` and so
should be the condition here. I think we can prioritize clarity over the
duplication of a single conditional return statement, why should be covered by
the unit tests.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -114,16 +127,27 @@ public K getKey() {
return address.getSpec().bind(address.getId(), new
FlinkStateBinder(namespace, context));
}
- public void clearBagStates(StateNamespace namespace, StateTag<? extends
BagState> address)
- throws Exception {
- CoderTypeSerializer typeSerializer = new
CoderTypeSerializer<>(VoidCoder.of());
- flinkStateBackend.applyToAllKeys(
- namespace.stringKey(),
- StringSerializer.INSTANCE,
- new ListStateDescriptor<>(address.getId(), typeSerializer),
- (key, state) -> {
- state.clear();
- });
+ /**
+ * Allows to clear all state for the global watermark when the maximum
watermark arrives. We do
+ * not clean up the global window state via timers because we are not
guranteed to ever receive
+ * the final watermark which would lead to an unbounded number of keys and
cleanup timers.
+ * Instead, the cleanup code below should be run when we finally receive the
max watermark.
Review comment:
I would remove "because we are not guaranteed to ever receive the final
watermark" because that is misleading. The problem is that we accumulate too
many timers, depending on the number of keys.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -139,17 +163,27 @@ private FlinkStateBinder(StateNamespace namespace,
StateContext<?> stateContext)
@Override
public <T2> ValueState<T2> bindValue(
String id, StateSpec<ValueState<T2>> spec, Coder<T2> coder) {
- return new FlinkValueState<>(flinkStateBackend, id, namespace, coder);
+ ValueStateDescriptor<T2> valueStateDescriptor =
+ new ValueStateDescriptor<>(id, new CoderTypeSerializer<>(coder));
+ globalWindowStateDescriptors.add(valueStateDescriptor);
+ return new FlinkValueState<>(flinkStateBackend, id, namespace,
valueStateDescriptor);
}
@Override
public <T2> BagState<T2> bindBag(String id, StateSpec<BagState<T2>> spec,
Coder<T2> elemCoder) {
- return new FlinkBagState<>(flinkStateBackend, id, namespace, elemCoder);
+ ListStateDescriptor<T2> listStateDescriptor =
+ new ListStateDescriptor<>(id, new CoderTypeSerializer<>(elemCoder));
+ globalWindowStateDescriptors.add(listStateDescriptor);
Review comment:
This is executed on every state access in the portable runner. It might
be possible to reuse the descriptors. How much could be the net benefit?
Probably small compared to the fn api overhead.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -75,7 +80,14 @@
public class FlinkStateInternals<K> implements StateInternals {
private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
- private Coder<K> keyCoder;
+ private final Coder<K> keyCoder;
+
+ /**
+ * A set which contains all state descriptors created in the global window.
Used for cleanup on
Review comment:
As implemented this contains all state descriptors, regardless of the
window?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]