damccorm commented on code in PR #32136:
URL: https://github.com/apache/beam/pull/32136#discussion_r1716502208
##########
sdks/python/apache_beam/transforms/core_test.py:
##########
@@ -170,6 +179,68 @@ def test_flatten_mismatched_windows(self):
_ = (source1, source2, source3) | "flatten" >> beam.Flatten()
+class ExceptionHandlingTest(unittest.TestCase):
+ def test_routes_failures(self):
+ with beam.Pipeline() as pipeline:
+ good, bad = (
+ pipeline | beam.Create(['abc', 'long_word', 'foo', 'bar', 'foobar'])
+ | beam.ParDo(TestDoFn9()).with_exception_handling()
+ )
+ bad_elements = bad | beam.Map(lambda x: x[0])
+ assert_that(good, equal_to(['abc', 'foo', 'bar']), 'good')
+ assert_that(bad_elements, equal_to(['long_word', 'foobar']), 'bad')
+
+ def test_handles_callbacks(self):
+ with tempfile.TemporaryDirectory() as tmp_dirname:
+ tmp_path = os.path.join(tmp_dirname, 'tmp_filename')
+ file_contents = 'random content'
+
+ def failure_callback(e, el):
+ if type(e) is not ValueError:
+ raise Exception(f'Failed to pass in correct exception, received {e}')
+ if el != 'foobar':
+ raise Exception(f'Failed to pass in correct element, received {el}')
+ f = open(tmp_path, "a")
+ logging.warning(tmp_path)
+ f.write(file_contents)
Review Comment:
I found this didn't work. I'm not totally sure why, but I think it is
because of how the scoping of the function works as it is passed through Beam.
Somehow, it seems like its referencing a copy of the variable, I'm guessing it
gets copied by value somewhere along the way... Maybe related to us spinning up
new threads to handle pieces of this logic?
Regardless, I'm inclined to leave it rather than digging in further since it
is still effectively testing correctness at this point.
##########
sdks/python/apache_beam/transforms/core_test.py:
##########
@@ -170,6 +179,68 @@ def test_flatten_mismatched_windows(self):
_ = (source1, source2, source3) | "flatten" >> beam.Flatten()
+class ExceptionHandlingTest(unittest.TestCase):
+ def test_routes_failures(self):
+ with beam.Pipeline() as pipeline:
+ good, bad = (
+ pipeline | beam.Create(['abc', 'long_word', 'foo', 'bar', 'foobar'])
+ | beam.ParDo(TestDoFn9()).with_exception_handling()
+ )
+ bad_elements = bad | beam.Map(lambda x: x[0])
Review Comment:
Good call, updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]