Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk b28e71979 -> 1d53e28e9


Revert the changes to the accidentaly reverted files


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a4760fe3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a4760fe3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a4760fe3

Branch: refs/heads/python-sdk
Commit: a4760fe3fcd5a0dda31d137aee29b3cd825de9cf
Parents: b28e719
Author: Ahmet Altay <al...@google.com>
Authored: Mon Aug 15 15:00:32 2016 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Aug 15 15:00:32 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py            | 25 ++++++++++++++++----
 sdks/python/apache_beam/io/gcsio.py             |  9 +++++--
 sdks/python/apache_beam/io/gcsio_test.py        | 23 +++++++++++++++++-
 sdks/python/apache_beam/runners/common.pxd      |  2 +-
 sdks/python/apache_beam/runners/common.py       | 18 ++++++++++----
 .../python/apache_beam/runners/direct_runner.py |  9 +++----
 .../runners/inprocess/transform_evaluator.py    |  8 +++----
 .../apache_beam/transforms/aggregator_test.py   |  8 +++----
 8 files changed, 75 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index b1e091b..a6ce26a 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -283,7 +283,17 @@ class _CompressionType(object):
     self.identifier = identifier
 
   def __eq__(self, other):
-    return self.identifier == other.identifier
+    return (isinstance(other, _CompressionType) and
+            self.identifier == other.identifier)
+
+  def __hash__(self):
+    return hash(self.identifier)
+
+  def __ne__(self, other):
+    return not self.__eq__(other)
+
+  def __repr__(self):
+    return '_CompressionType(%s)' % self.identifier
 
 
 class CompressionTypes(object):
@@ -530,15 +540,22 @@ class FileSink(iobase.Sink):
         channel_factory.rename(old_name, final_name)
       except IOError as e:
         # May have already been copied.
-        exists = channel_factory.exists(final_name)
+        try:
+          exists = channel_factory.exists(final_name)
+        except Exception as exists_e:  # pylint: disable=broad-except
+          logging.warning('Exception when checking if file %s exists: '
+                          '%s', final_name, exists_e)
+          # Returning original exception after logging the exception from
+          # exists() call.
+          return (None, e)
         if not exists:
           logging.warning(('IOError in _rename_file. old_name: %s, '
                            'final_name: %s, err: %s'), old_name, final_name, e)
-          return(None, e)
+          return (None, e)
       except Exception as e:  # pylint: disable=broad-except
         logging.warning(('Exception in _rename_file. old_name: %s, '
                          'final_name: %s, err: %s'), old_name, final_name, e)
-        return(None, e)
+        return (None, e)
       return (final_name, None)
 
     # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py 
b/sdks/python/apache_beam/io/gcsio.py
index 88fcfb8..7bb532c 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -234,8 +234,13 @@ class GcsIO(object):
                                                  object=object_path)
       self.client.objects.Get(request)  # metadata
       return True
-    except IOError:
-      return False
+    except HttpError as http_error:
+      if http_error.status_code == 404:
+        # HTTP 404 indicates that the file did not exist
+        return False
+      else:
+        # We re-raise all other exceptions
+        raise
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py 
b/sdks/python/apache_beam/io/gcsio_test.py
index 7b15ef3..99c99b3 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -29,6 +29,7 @@ from apitools.base.py.exceptions import HttpError
 from apache_beam.internal.clients import storage
 
 from apache_beam.io import gcsio
+from mock import patch
 
 
 class FakeGcsClient(object):
@@ -79,7 +80,8 @@ class FakeGcsObjects(object):
   def Get(self, get_request, download=None):  # pylint: disable=invalid-name
     f = self.get_file(get_request.bucket, get_request.object)
     if f is None:
-      raise ValueError('Specified object does not exist.')
+      # Failing with a HTTP 404 if file does not exist.
+      raise HttpError({'status':404}, None, None)
     if download is None:
       return f.get_metadata()
     else:
@@ -189,6 +191,25 @@ class TestGCSIO(unittest.TestCase):
     self.client = FakeGcsClient()
     self.gcs = gcsio.GcsIO(self.client)
 
+  def test_exists(self):
+    file_name = 'gs://gcsio-test/dummy_file'
+    file_size = 1234
+    self._insert_random_file(self.client, file_name, file_size)
+    self.assertFalse(self.gcs.exists(file_name + 'xyz'))
+    self.assertTrue(self.gcs.exists(file_name))
+
+  @patch.object(FakeGcsObjects, 'Get')
+  def test_exists_failure(self, mock_get):
+    # Raising an error other than 404. Raising 404 is a valid failure for
+    # exists() call.
+    mock_get.side_effect = HttpError({'status':400}, None, None)
+    file_name = 'gs://gcsio-test/dummy_file'
+    file_size = 1234
+    self._insert_random_file(self.client, file_name, file_size)
+    with self.assertRaises(HttpError) as cm:
+      self.gcs.exists(file_name)
+    self.assertEquals(400, cm.exception.status_code)
+
   def test_size(self):
     file_name = 'gs://gcsio-test/dummy_file'
     file_size = 1234

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index 7acd049..5cd4cf8 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -32,7 +32,7 @@ cdef class DoFnRunner(Receiver):
   cdef object dofn
   cdef object dofn_process
   cdef object window_fn
-  cdef object context   # TODO(robertwb): Make this a DoFnContext
+  cdef DoFnContext context
   cdef object tagged_receivers
   cdef LoggingContext logging_context
   cdef object step_name

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index c017704..67277c3 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -59,12 +59,16 @@ class DoFnRunner(Receiver):
                kwargs,
                side_inputs,
                windowing,
-               context,
-               tagged_receivers,
+               context=None,
+               tagged_receivers=None,
                logger=None,
                step_name=None,
                # Preferred alternative to logger
-               logging_context=None):
+               # TODO(robertwb): Remove once all runners are updated.
+               logging_context=None,
+               # Preferred alternative to context
+               # TODO(robertwb): Remove once all runners are updated.
+               state=None):
     if not args and not kwargs:
       self.dofn = fn
       self.dofn_process = fn.process
@@ -85,10 +89,16 @@ class DoFnRunner(Receiver):
       self.dofn_process = lambda context: fn.process(context, *args, **kwargs)
 
     self.window_fn = windowing.windowfn
-    self.context = context
     self.tagged_receivers = tagged_receivers
     self.step_name = step_name
 
+    if state:
+      assert context is None
+      self.context = DoFnContext(self.step_name, state=state)
+    else:
+      assert context is not None
+      self.context = context
+
     if logging_context:
       self.logging_context = logging_context
     else:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/runners/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct_runner.py 
b/sdks/python/apache_beam/runners/direct_runner.py
index e0df439..a62ddf7 100644
--- a/sdks/python/apache_beam/runners/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct_runner.py
@@ -43,7 +43,6 @@ from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PValueCache
-from apache_beam.transforms import DoFnProcessContext
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
@@ -138,9 +137,6 @@ class DirectPipelineRunner(PipelineRunner):
   @skip_if_cached
   def run_ParDo(self, transform_node):
     transform = transform_node.transform
-    # TODO(gildea): what is the appropriate object to attach the state to?
-    context = DoFnProcessContext(label=transform.label,
-                                 state=DoFnState(self._counter_factory))
 
     side_inputs = [self._cache.get_pvalue(view)
                    for view in transform_node.side_inputs]
@@ -176,8 +172,9 @@ class DirectPipelineRunner(PipelineRunner):
 
     runner = DoFnRunner(transform.dofn, transform.args, transform.kwargs,
                         side_inputs, transform_node.inputs[0].windowing,
-                        context, TaggedReceivers(),
-                        step_name=transform_node.full_label)
+                        tagged_receivers=TaggedReceivers(),
+                        step_name=transform_node.full_label,
+                        state=DoFnState(self._counter_factory))
     runner.start()
     for v in self._cache.get_pvalue(transform_node.inputs[0]):
       runner.process(v)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/runners/inprocess/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/transform_evaluator.py 
b/sdks/python/apache_beam/runners/inprocess/transform_evaluator.py
index 138ea87..9aeda46 100644
--- a/sdks/python/apache_beam/runners/inprocess/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/inprocess/transform_evaluator.py
@@ -30,7 +30,6 @@ from apache_beam.runners.common import DoFnState
 from apache_beam.runners.inprocess.inprocess_watermark_manager import 
InProcessWatermarkManager
 from apache_beam.runners.inprocess.inprocess_transform_result import 
InProcessTransformResult
 from apache_beam.transforms import core
-from apache_beam.transforms import DoFnProcessContext
 from apache_beam.transforms import sideinputs
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import WindowedValue
@@ -337,8 +336,6 @@ class _ParDoEvaluator(_TransformEvaluator):
     self._tagged_receivers[None].tag = None  # main_tag is None.
 
     self._counter_factory = counters.CounterFactory()
-    context = DoFnProcessContext(label=transform.label,
-                                 state=DoFnState(self._counter_factory))
 
     dofn = copy.deepcopy(transform.dofn)
 
@@ -351,8 +348,9 @@ class _ParDoEvaluator(_TransformEvaluator):
     self.runner = DoFnRunner(dofn, transform.args, transform.kwargs,
                              self._side_inputs,
                              self._applied_ptransform.inputs[0].windowing,
-                             context, self._tagged_receivers,
-                             step_name=self._applied_ptransform.full_label)
+                             tagged_receivers=self._tagged_receivers,
+                             step_name=self._applied_ptransform.full_label,
+                             state=DoFnState(self._counter_factory))
     self.runner.start()
 
   def process_element(self, element):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4760fe3/sdks/python/apache_beam/transforms/aggregator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py 
b/sdks/python/apache_beam/transforms/aggregator_test.py
index 0b3a20b..040819e 100644
--- a/sdks/python/apache_beam/transforms/aggregator_test.py
+++ b/sdks/python/apache_beam/transforms/aggregator_test.py
@@ -55,18 +55,18 @@ class AggregatorTest(unittest.TestCase):
         (any, int, True),
         (all, float, False),
     ]
-    aggeregators = [Aggregator('%s_%s' % (f.__name__, t.__name__), f, t)
-                    for f, t, _ in counter_types]
+    aggregators = [Aggregator('%s_%s' % (f.__name__, t.__name__), f, t)
+                   for f, t, _ in counter_types]
 
     class UpdateAggregators(beam.DoFn):
       def process(self, context):
-        for a in aggeregators:
+        for a in aggregators:
           context.aggregate_to(a, context.element)
 
     p = beam.Pipeline('DirectPipelineRunner')
     p | beam.Create([0, 1, 2, 3]) | beam.ParDo(UpdateAggregators())  # pylint: 
disable=expression-not-assigned
     res = p.run()
-    for (_, _, expected), a in zip(counter_types, aggeregators):
+    for (_, _, expected), a in zip(counter_types, aggregators):
       actual = res.aggregated_values(a).values()[0]
       self.assertEqual(expected, actual)
       self.assertEqual(type(expected), type(actual))

Reply via email to