Avoid circular imports.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2dc6855 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2dc6855 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2dc6855 Branch: refs/heads/python-sdk Commit: e2dc685516d3869a4c2ebfe7d389c5d4c948745b Parents: 7852075 Author: Robert Bradshaw <rober...@google.com> Authored: Thu Oct 6 16:37:45 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Mon Oct 10 10:30:00 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/direct_runner.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2dc6855/sdks/python/apache_beam/runners/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct_runner.py b/sdks/python/apache_beam/runners/direct_runner.py index a62ddf7..9eb587c 100644 --- a/sdks/python/apache_beam/runners/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct_runner.py @@ -30,8 +30,6 @@ import logging from apache_beam import coders from apache_beam import error -from apache_beam.io import fileio -from apache_beam.io import iobase from apache_beam.pvalue import DictPCollectionView from apache_beam.pvalue import EmptySideInput from apache_beam.pvalue import IterablePCollectionView @@ -244,6 +242,9 @@ class DirectPipelineRunner(PipelineRunner): transform_node.full_label] += len(read_result) self._cache.cache_output(transform_node, read_result) + # pylint: disable=import-at-top + from apache_beam.io import iobase + if isinstance(source, iobase.BoundedSource): # Getting a RangeTracker for the default range of the source and reading # the full source using that. @@ -257,6 +258,10 @@ class DirectPipelineRunner(PipelineRunner): @skip_if_cached def run__NativeWrite(self, transform_node): sink = transform_node.transform.sink + + # pylint: disable=import-at-top + from apache_beam.io import fileio + if isinstance(sink, fileio.NativeTextFileSink): assert sink.num_shards in (0, 1) if sink.shard_name_template: