Repository: beam Updated Branches: refs/heads/master 43443c94f -> 97dde95bc
Revert clean else-return lint changes. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a907323f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a907323f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a907323f Branch: refs/heads/master Commit: a907323ff8c6d4dda61087a5b026703a3c6f43a8 Parents: 43443c9 Author: Robert Bradshaw <rober...@gmail.com> Authored: Mon May 1 16:44:55 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Tue May 2 10:39:05 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/concat_source.py | 74 +++++++++++--------- sdks/python/apache_beam/io/filesystems_util.py | 3 +- sdks/python/apache_beam/io/gcp/bigquery.py | 13 ++-- sdks/python/apache_beam/io/iobase.py | 3 +- sdks/python/apache_beam/io/localfilesystem.py | 3 +- sdks/python/apache_beam/io/range_trackers.py | 11 +-- sdks/python/apache_beam/io/source_test_utils.py | 7 +- sdks/python/apache_beam/io/textio.py | 13 ++-- 8 files changed, 70 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/concat_source.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py index de51f0f..1656180 100644 --- a/sdks/python/apache_beam/io/concat_source.py +++ b/sdks/python/apache_beam/io/concat_source.py @@ -84,7 +84,8 @@ class ConcatSource(iobase.BoundedSource): # Getting coder from the first sub-sources. This assumes all sub-sources # to produce the same coder. return self._source_bundles[0].source.default_output_coder() - return super(ConcatSource, self).default_output_coder() + else: + return super(ConcatSource, self).default_output_coder() class ConcatRangeTracker(iobase.RangeTracker): @@ -164,12 +165,13 @@ class ConcatRangeTracker(iobase.RangeTracker): return False elif source_ix == self._end[0] and self._end[1] is None: return False - - assert source_ix >= self._claimed_source_ix - self._claimed_source_ix = source_ix - if source_pos is None: - return True - return self.sub_range_tracker(source_ix).try_claim(source_pos) + else: + assert source_ix >= self._claimed_source_ix + self._claimed_source_ix = source_ix + if source_pos is None: + return True + else: + return self.sub_range_tracker(source_ix).try_claim(source_pos) def try_split(self, pos): source_ix, source_pos = pos @@ -183,24 +185,24 @@ class ConcatRangeTracker(iobase.RangeTracker): elif source_ix == self._end[0] and self._end[1] is None: # At/after end. return None - - if source_ix > self._claimed_source_ix: - # Prefer to split on even boundary. - split_pos = None - ratio = self._cumulative_weights[source_ix] else: - # Split the current subsource. - split = self.sub_range_tracker(source_ix).try_split( - source_pos) - if not split: - return None - split_pos, frac = split - ratio = self.local_to_global(source_ix, frac) - - self._end = source_ix, split_pos - self._cumulative_weights = [min(w / ratio, 1) - for w in self._cumulative_weights] - return (source_ix, split_pos), ratio + if source_ix > self._claimed_source_ix: + # Prefer to split on even boundary. + split_pos = None + ratio = self._cumulative_weights[source_ix] + else: + # Split the current subsource. + split = self.sub_range_tracker(source_ix).try_split( + source_pos) + if not split: + return None + split_pos, frac = split + ratio = self.local_to_global(source_ix, frac) + + self._end = source_ix, split_pos + self._cumulative_weights = [min(w / ratio, 1) + for w in self._cumulative_weights] + return (source_ix, split_pos), ratio def set_current_position(self, pos): raise NotImplementedError('Should only be called on sub-trackers') @@ -210,9 +212,10 @@ class ConcatRangeTracker(iobase.RangeTracker): last = self._end[0] if self._end[1] is None else self._end[0] + 1 if source_ix == last: return (source_ix, None) - return (source_ix, - self.sub_range_tracker(source_ix).position_at_fraction( - source_frac)) + else: + return (source_ix, + self.sub_range_tracker(source_ix).position_at_fraction( + source_frac)) def fraction_consumed(self): with self._lock: @@ -231,14 +234,15 @@ class ConcatRangeTracker(iobase.RangeTracker): if frac == 1: last = self._end[0] if self._end[1] is None else self._end[0] + 1 return (last, None) - cw = self._cumulative_weights - # Find the last source that starts at or before frac. - source_ix = bisect.bisect(cw, frac) - 1 - # Return this source, converting what's left of frac after starting - # this source into a value in [0.0, 1.0) representing how far we are - # towards the next source. - return (source_ix, - (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix])) + else: + cw = self._cumulative_weights + # Find the last source that starts at or before frac. + source_ix = bisect.bisect(cw, frac) - 1 + # Return this source, converting what's left of frac after starting + # this source into a value in [0.0, 1.0) representing how far we are + # towards the next source. + return (source_ix, + (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix])) def sub_range_tracker(self, source_ix): assert self._start[0] <= source_ix <= self._end[0] http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/filesystems_util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py index 5034068..6d21298 100644 --- a/sdks/python/apache_beam/io/filesystems_util.py +++ b/sdks/python/apache_beam/io/filesystems_util.py @@ -32,4 +32,5 @@ def get_filesystem(path): 'Google Cloud Platform IO not available, ' 'please install apache_beam[gcp]') return GCSFileSystem() - return LocalFileSystem() + else: + return LocalFileSystem() http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/gcp/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 0db965f..4e8d61b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -996,12 +996,13 @@ class BigQueryWrapper(object): % (project_id, dataset_id, table_id)) if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE: return found_table - # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete - # the table before this point. - return self._create_table(project_id=project_id, - dataset_id=dataset_id, - table_id=table_id, - schema=schema or found_table.schema) + else: + # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete + # the table before this point. + return self._create_table(project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + schema=schema or found_table.schema) def run_query(self, project_id, query, use_legacy_sql, flatten_results, dry_run=False): http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 2cac67f..312542a 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -805,7 +805,8 @@ class Read(ptransform.PTransform): def _infer_output_coder(self, input_type=None, input_coder=None): if isinstance(self.source, BoundedSource): return self.source.default_output_coder() - return self.source.coder + else: + return self.source.coder def display_data(self): return {'source': DisplayDataItem(self.source.__class__, http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/localfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py index fbb65bf..8b2bda9 100644 --- a/sdks/python/apache_beam/io/localfilesystem.py +++ b/sdks/python/apache_beam/io/localfilesystem.py @@ -104,7 +104,8 @@ class LocalFileSystem(FileSystem): raw_file = open(path, mode) if compression_type == CompressionTypes.UNCOMPRESSED: return raw_file - return CompressedFile(raw_file, compression_type=compression_type) + else: + return CompressedFile(raw_file, compression_type=compression_type) def create(self, path, mime_type='application/octet-stream', compression_type=CompressionTypes.AUTO): http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/range_trackers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py index 6e7b84f..000df81 100644 --- a/sdks/python/apache_beam/io/range_trackers.py +++ b/sdks/python/apache_beam/io/range_trackers.py @@ -354,7 +354,8 @@ class OrderedPositionRangeTracker(iobase.RangeTracker): if self._stop_position is None or position < self._stop_position: self._last_claim = position return True - return False + else: + return False def position_at_fraction(self, fraction): return self.fraction_to_position( @@ -372,13 +373,15 @@ class OrderedPositionRangeTracker(iobase.RangeTracker): position, start=self._start_position, end=self._stop_position) self._stop_position = position return position, fraction - return None + else: + return None def fraction_consumed(self): if self._last_claim is self.UNSTARTED: return 0 - return self.position_to_fraction( - self._last_claim, self._start_position, self._stop_position) + else: + return self.position_to_fraction( + self._last_claim, self._start_position, self._stop_position) def position_to_fraction(self, pos, start, end): """ http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/source_test_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py index 91aae33..edb6409 100644 --- a/sdks/python/apache_beam/io/source_test_utils.py +++ b/sdks/python/apache_beam/io/source_test_utils.py @@ -610,9 +610,10 @@ def _assert_split_at_fraction_concurrent( def read_or_split(test_params): if test_params[0]: return [val for val in test_params[1]] - position = test_params[1].position_at_fraction(test_params[2]) - result = test_params[1].try_split(position) - return result + else: + position = test_params[1].position_at_fraction(test_params[2]) + result = test_params[1].try_split(position) + return result inputs = [] pool = thread_pool if thread_pool else _ThreadPool(2) http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/textio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index f2c3d34..750ec45 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -198,9 +198,9 @@ class _TextSource(filebasedsource.FileBasedSource): if next_lf > 0 and read_buffer.data[next_lf - 1] == '\r': # Found a '\r\n'. Accepting that as the next separator. return (next_lf - 1, next_lf + 1) - - # Found a '\n'. Accepting that as the next separator. - return (next_lf, next_lf + 1) + else: + # Found a '\n'. Accepting that as the next separator. + return (next_lf, next_lf + 1) current_pos = len(read_buffer.data) @@ -256,9 +256,10 @@ class _TextSource(filebasedsource.FileBasedSource): # Current record should not contain the separator. return (read_buffer.data[record_start_position_in_buffer:sep_bounds[0]], sep_bounds[1] - record_start_position_in_buffer) - # Current record should contain the separator. - return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]], - sep_bounds[1] - record_start_position_in_buffer) + else: + # Current record should contain the separator. + return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]], + sep_bounds[1] - record_start_position_in_buffer) class _TextSink(fileio.FileSink):