[ https://issues.apache.org/jira/browse/BEAM-8566?focusedWorklogId=340407&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-340407 ]
ASF GitHub Bot logged work on BEAM-8566: ---------------------------------------- Author: ASF GitHub Bot Created on: 08/Nov/19 09:24 Start Date: 08/Nov/19 09:24 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10007: [BEAM-8566] Fix checkpoint buffering when another bundle is started during checkpointing URL: https://github.com/apache/beam/pull/10007#discussion_r344078876 ########## File path: runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java ########## @@ -1434,6 +1434,98 @@ public void finishBundle(FinishBundleContext context) { testHarness.close(); } + @Test + public void testCheckpointBufferingWithMultipleBundles() throws Exception { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setMaxBundleSize(10L); + options.setCheckpointingInterval(1L); + + TupleTag<String> outputTag = new TupleTag<>("main-output"); + + StringUtf8Coder coder = StringUtf8Coder.of(); + WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder = + WindowedValue.getValueOnlyCoder(coder); + + DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory = + new DoFnOperator.MultiOutputOutputManagerFactory( + outputTag, + WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)); + + Supplier<DoFnOperator<String, String>> doFnOperatorSupplier = + () -> + new DoFnOperator<>( + new IdentityDoFn(), + "stepName", + windowedValueCoder, + null, + Collections.emptyMap(), + outputTag, + Collections.emptyList(), + outputManagerFactory, + WindowingStrategy.globalDefault(), + new HashMap<>(), /* side-input mapping */ + Collections.emptyList(), /* side inputs */ + options, + null, + null, + DoFnSchemaInformation.create(), + Collections.emptyMap()); + + DoFnOperator<String, String> doFnOperator = doFnOperatorSupplier.get(); + @SuppressWarnings("unchecked") + OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness = + new OneInputStreamOperatorTestHarness<>(doFnOperator); + + testHarness.open(); + + // start a bundle + testHarness.processElement( + new StreamRecord<>(WindowedValue.valueInGlobalWindow("regular element"))); + + // Set the callback which will trigger two bundles upon checkpointing + doFnOperator.setBundleFinishedCallback( Review comment: Sure! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 340407) Time Spent: 2h (was: 1h 50m) > Checkpoint buffer is flushed prematurely when another bundle is started > during checkpointing > -------------------------------------------------------------------------------------------- > > Key: BEAM-8566 > URL: https://issues.apache.org/jira/browse/BEAM-8566 > Project: Beam > Issue Type: Bug > Components: runner-flink > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Priority: Major > Fix For: 2.17.0 > > Time Spent: 2h > Remaining Estimate: 0h > > As part of a checkpoint, the current bundle is finalized. When the bundle is > finalized, the watermark, currently held back, may also be progressed which > can cause the start of another bundle. When a new bundle is started, any > to-be-buffered items from the previous bundle for the pending checkpoint may > be emitted. This should not happen. > This only effects portable pipelines where we have to hold back the watermark > due to the asynchronous processing of elements. -- This message was sent by Atlassian Jira (v8.3.4#803005)