[ 
https://issues.apache.org/jira/browse/BEAM-6778?focusedWorklogId=213138&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-213138
 ]

ASF GitHub Bot logged work on BEAM-6778:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Mar/19 14:23
            Start Date: 14/Mar/19 14:23
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on pull request #7937: [BEAM-6778] 
Enable Bundle Finalization in Python SDK harness over FnApi
URL: https://github.com/apache/beam/pull/7937#discussion_r265579163
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##########
 @@ -753,6 +753,81 @@ def create_pipeline(self):
         runner=fn_api_runner.FnApiRunner(bundle_repeat=3))
 
 
+class EventRecorder(object):
+  def __init__(self):
+    self.work_path = os.getcwd() + '/EventRecorder'
+    if os.path.exists(self.work_path):
+      self.cleanup()
+    os.mkdir(self.work_path)
+
+  def record(self, file_name):
+    file_path = os.path.join(self.work_path, str(file_name) + '.txt')
+    open(file_path, 'a').close()
+
+  def events(self):
+    return sorted(os.listdir(self.work_path))
+
+  def cleanup(self):
+    for file in os.listdir(self.work_path):
+      os.remove(os.path.join(self.work_path, file))
+    os.rmdir(self.work_path)
+
+
+class FnApiRunnerFinalizeBundleTest(FnApiRunnerTest):
+  def create_pipeline(self):
+    return beam.Pipeline(
+        runner=fn_api_runner.FnApiRunner(
+            default_environment=beam_runner_api_pb2.Environment(
+                urn=python_urns.EMBEDDED_PYTHON_GRPC)))
+
+  def test_no_finalize_call_without_register(self):
+    event_recorder = EventRecorder()
+    elements_list = [1, 2]
+
+    class FinalizebleDoFnWithoutRegister(beam.DoFn):
+      def process(
+          self,
+          element,
+          bundle_finalizer=beam.DoFn.BundleFinalizerParam):
+        yield element
+
+    with self.create_pipeline() as p:
+      res = (p
+             | beam.Create(elements_list)
+             | beam.ParDo(FinalizebleDoFnWithoutRegister()))
+      assert_that(res, equal_to(elements_list))
+      p.run().wait_until_finish()
+
+    results = event_recorder.events()
+    event_recorder.cleanup()
+    assert results == []
+
+  def test_register_finalizations(self):
+    event_recorder = EventRecorder()
+    elements_list = [1, 2]
 
 Review comment:
   Use string elements to avoid needing to do str(...). 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 213138)
    Time Spent: 1h 50m  (was: 1h 40m)

> Enable Bundle Finalization in Python SDK
> ----------------------------------------
>
>                 Key: BEAM-6778
>                 URL: https://issues.apache.org/jira/browse/BEAM-6778
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-harness
>            Reporter: Boyuan Zhang
>            Assignee: Boyuan Zhang
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to