mxm commented on a change in pull request #12952:
URL: https://github.com/apache/beam/pull/12952#discussion_r500826435



##########
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##########
@@ -427,11 +427,29 @@ def test_sql(self):
 
 
 class FlinkRunnerTestStreaming(FlinkRunnerTest):
+  def __init__(self, *args, **kwargs):
+    super(FlinkRunnerTestStreaming, self).__init__(*args, **kwargs)
+    self.enable_commit = False
+
+  def setUp(self):
+    self.enable_commit = False
+
   def create_options(self):
     options = super(FlinkRunnerTestStreaming, self).create_options()
     options.view_as(StandardOptions).streaming = True
+    if self.enable_commit:
+      options._all_options['checkpointing_interval'] = 3000
+      options._all_options['shutdown_sources_after_idle_ms'] = 60000
     return options
 
+  def test_callbacks_with_exception(self):
+    self.enable_commit = True
+    super(FlinkRunnerTest, self).test_callbacks_with_exception()
+
+  def test_register_finalizations(self):
+    self.enable_commit = True
+    super(FlinkRunnerTest, self).test_register_finalizations()

Review comment:
       It definitely requires changes in Flink for the finalization to be 
persisted without checkpointing by restoring the state from materialized 
intermediate results. However, if the user is fine with the job re-executing in 
case of failures, we may still offer bundle finalization. That may be 
acceptable as opposed to piling up to-be-acknowledged bundles which is likely 
going to fail the job. Perhaps this should be opt-in.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to