Repository: incubator-beam Updated Branches: refs/heads/python-sdk b28e71979 -> 1d53e28e9
Revert the changes to the accidentaly reverted files Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a4760fe3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a4760fe3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a4760fe3 Branch: refs/heads/python-sdk Commit: a4760fe3fcd5a0dda31d137aee29b3cd825de9cf Parents: b28e719 Author: Ahmet Altay <al...@google.com> Authored: Mon Aug 15 15:00:32 2016 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Mon Aug 15 15:00:32 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 25 ++++++++++++++++---- sdks/python/apache_beam/io/gcsio.py | 9 +++++-- sdks/python/apache_beam/io/gcsio_test.py | 23 +++++++++++++++++- sdks/python/apache_beam/runners/common.pxd | 2 +- sdks/python/apache_beam/runners/common.py | 18 ++++++++++---- .../python/apache_beam/runners/direct_runner.py | 9 +++---- .../runners/inprocess/transform_evaluator.py | 8 +++---- .../apache_beam/transforms/aggregator_test.py | 8 +++---- 8 files changed, 75 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index b1e091b..a6ce26a 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -283,7 +283,17 @@ class _CompressionType(object): self.identifier = identifier def __eq__(self, other): - return self.identifier == other.identifier + return (isinstance(other, _CompressionType) and + self.identifier == other.identifier) + + def __hash__(self): + return hash(self.identifier) + + def __ne__(self, other): + return not self.__eq__(other) + + def __repr__(self): + return '_CompressionType(%s)' % self.identifier class CompressionTypes(object): @@ -530,15 +540,22 @@ class FileSink(iobase.Sink): channel_factory.rename(old_name, final_name) except IOError as e: # May have already been copied. - exists = channel_factory.exists(final_name) + try: + exists = channel_factory.exists(final_name) + except Exception as exists_e: # pylint: disable=broad-except + logging.warning('Exception when checking if file %s exists: ' + '%s', final_name, exists_e) + # Returning original exception after logging the exception from + # exists() call. + return (None, e) if not exists: logging.warning(('IOError in _rename_file. old_name: %s, ' 'final_name: %s, err: %s'), old_name, final_name, e) - return(None, e) + return (None, e) except Exception as e: # pylint: disable=broad-except logging.warning(('Exception in _rename_file. old_name: %s, ' 'final_name: %s, err: %s'), old_name, final_name, e) - return(None, e) + return (None, e) return (final_name, None) # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 88fcfb8..7bb532c 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -234,8 +234,13 @@ class GcsIO(object): object=object_path) self.client.objects.Get(request) # metadata return True - except IOError: - return False + except HttpError as http_error: + if http_error.status_code == 404: + # HTTP 404 indicates that the file did not exist + return False + else: + # We re-raise all other exceptions + raise @retry.with_exponential_backoff( retry_filter=retry.retry_on_server_errors_and_timeout_filter) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/io/gcsio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py index 7b15ef3..99c99b3 100644 --- a/sdks/python/apache_beam/io/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcsio_test.py @@ -29,6 +29,7 @@ from apitools.base.py.exceptions import HttpError from apache_beam.internal.clients import storage from apache_beam.io import gcsio +from mock import patch class FakeGcsClient(object): @@ -79,7 +80,8 @@ class FakeGcsObjects(object): def Get(self, get_request, download=None): # pylint: disable=invalid-name f = self.get_file(get_request.bucket, get_request.object) if f is None: - raise ValueError('Specified object does not exist.') + # Failing with a HTTP 404 if file does not exist. + raise HttpError({'status':404}, None, None) if download is None: return f.get_metadata() else: @@ -189,6 +191,25 @@ class TestGCSIO(unittest.TestCase): self.client = FakeGcsClient() self.gcs = gcsio.GcsIO(self.client) + def test_exists(self): + file_name = 'gs://gcsio-test/dummy_file' + file_size = 1234 + self._insert_random_file(self.client, file_name, file_size) + self.assertFalse(self.gcs.exists(file_name + 'xyz')) + self.assertTrue(self.gcs.exists(file_name)) + + @patch.object(FakeGcsObjects, 'Get') + def test_exists_failure(self, mock_get): + # Raising an error other than 404. Raising 404 is a valid failure for + # exists() call. + mock_get.side_effect = HttpError({'status':400}, None, None) + file_name = 'gs://gcsio-test/dummy_file' + file_size = 1234 + self._insert_random_file(self.client, file_name, file_size) + with self.assertRaises(HttpError) as cm: + self.gcs.exists(file_name) + self.assertEquals(400, cm.exception.status_code) + def test_size(self): file_name = 'gs://gcsio-test/dummy_file' file_size = 1234 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/runners/common.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 7acd049..5cd4cf8 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -32,7 +32,7 @@ cdef class DoFnRunner(Receiver): cdef object dofn cdef object dofn_process cdef object window_fn - cdef object context # TODO(robertwb): Make this a DoFnContext + cdef DoFnContext context cdef object tagged_receivers cdef LoggingContext logging_context cdef object step_name http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index c017704..67277c3 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -59,12 +59,16 @@ class DoFnRunner(Receiver): kwargs, side_inputs, windowing, - context, - tagged_receivers, + context=None, + tagged_receivers=None, logger=None, step_name=None, # Preferred alternative to logger - logging_context=None): + # TODO(robertwb): Remove once all runners are updated. + logging_context=None, + # Preferred alternative to context + # TODO(robertwb): Remove once all runners are updated. + state=None): if not args and not kwargs: self.dofn = fn self.dofn_process = fn.process @@ -85,10 +89,16 @@ class DoFnRunner(Receiver): self.dofn_process = lambda context: fn.process(context, *args, **kwargs) self.window_fn = windowing.windowfn - self.context = context self.tagged_receivers = tagged_receivers self.step_name = step_name + if state: + assert context is None + self.context = DoFnContext(self.step_name, state=state) + else: + assert context is not None + self.context = context + if logging_context: self.logging_context = logging_context else: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/runners/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct_runner.py b/sdks/python/apache_beam/runners/direct_runner.py index e0df439..a62ddf7 100644 --- a/sdks/python/apache_beam/runners/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct_runner.py @@ -43,7 +43,6 @@ from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState from apache_beam.runners.runner import PValueCache -from apache_beam.transforms import DoFnProcessContext from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import WindowedValue from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn @@ -138,9 +137,6 @@ class DirectPipelineRunner(PipelineRunner): @skip_if_cached def run_ParDo(self, transform_node): transform = transform_node.transform - # TODO(gildea): what is the appropriate object to attach the state to? - context = DoFnProcessContext(label=transform.label, - state=DoFnState(self._counter_factory)) side_inputs = [self._cache.get_pvalue(view) for view in transform_node.side_inputs] @@ -176,8 +172,9 @@ class DirectPipelineRunner(PipelineRunner): runner = DoFnRunner(transform.dofn, transform.args, transform.kwargs, side_inputs, transform_node.inputs[0].windowing, - context, TaggedReceivers(), - step_name=transform_node.full_label) + tagged_receivers=TaggedReceivers(), + step_name=transform_node.full_label, + state=DoFnState(self._counter_factory)) runner.start() for v in self._cache.get_pvalue(transform_node.inputs[0]): runner.process(v) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/runners/inprocess/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/transform_evaluator.py b/sdks/python/apache_beam/runners/inprocess/transform_evaluator.py index 138ea87..9aeda46 100644 --- a/sdks/python/apache_beam/runners/inprocess/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/inprocess/transform_evaluator.py @@ -30,7 +30,6 @@ from apache_beam.runners.common import DoFnState from apache_beam.runners.inprocess.inprocess_watermark_manager import InProcessWatermarkManager from apache_beam.runners.inprocess.inprocess_transform_result import InProcessTransformResult from apache_beam.transforms import core -from apache_beam.transforms import DoFnProcessContext from apache_beam.transforms import sideinputs from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import WindowedValue @@ -337,8 +336,6 @@ class _ParDoEvaluator(_TransformEvaluator): self._tagged_receivers[None].tag = None # main_tag is None. self._counter_factory = counters.CounterFactory() - context = DoFnProcessContext(label=transform.label, - state=DoFnState(self._counter_factory)) dofn = copy.deepcopy(transform.dofn) @@ -351,8 +348,9 @@ class _ParDoEvaluator(_TransformEvaluator): self.runner = DoFnRunner(dofn, transform.args, transform.kwargs, self._side_inputs, self._applied_ptransform.inputs[0].windowing, - context, self._tagged_receivers, - step_name=self._applied_ptransform.full_label) + tagged_receivers=self._tagged_receivers, + step_name=self._applied_ptransform.full_label, + state=DoFnState(self._counter_factory)) self.runner.start() def process_element(self, element): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/transforms/aggregator_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py b/sdks/python/apache_beam/transforms/aggregator_test.py index 0b3a20b..040819e 100644 --- a/sdks/python/apache_beam/transforms/aggregator_test.py +++ b/sdks/python/apache_beam/transforms/aggregator_test.py @@ -55,18 +55,18 @@ class AggregatorTest(unittest.TestCase): (any, int, True), (all, float, False), ] - aggeregators = [Aggregator('%s_%s' % (f.__name__, t.__name__), f, t) - for f, t, _ in counter_types] + aggregators = [Aggregator('%s_%s' % (f.__name__, t.__name__), f, t) + for f, t, _ in counter_types] class UpdateAggregators(beam.DoFn): def process(self, context): - for a in aggeregators: + for a in aggregators: context.aggregate_to(a, context.element) p = beam.Pipeline('DirectPipelineRunner') p | beam.Create([0, 1, 2, 3]) | beam.ParDo(UpdateAggregators()) # pylint: disable=expression-not-assigned res = p.run() - for (_, _, expected), a in zip(counter_types, aggeregators): + for (_, _, expected), a in zip(counter_types, aggregators): actual = res.aggregated_values(a).values()[0] self.assertEqual(expected, actual) self.assertEqual(type(expected), type(actual))