Repository: beam Updated Branches: refs/heads/master d338b4412 -> 32bf7bc64
Allow termination of DirectRunner execution with Ctrl-C Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a493f325 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a493f325 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a493f325 Branch: refs/heads/master Commit: a493f325f9e90b378b38a3cff03f8103b28d282a Parents: d338b44 Author: Charles Chen <c...@google.com> Authored: Mon Aug 21 14:18:45 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Mon Aug 21 18:32:20 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/runners/direct/direct_runner.py | 10 ++++++++++ .../python/apache_beam/runners/direct/executor.py | 18 ++++++++++++++---- .../experimental/python_rpc_direct/server.py | 2 +- .../typehints/native_type_compatibility_test.py | 3 ++- .../apache_beam/typehints/typed_pipeline_test.py | 3 ++- 5 files changed, 29 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/runners/direct/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 7a88d0e..2deb7da 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -283,6 +283,16 @@ class DirectPipelineResult(PipelineResult): self._executor = executor self._evaluation_context = evaluation_context + def __del__(self): + if self._state == PipelineState.RUNNING: + logging.warning( + 'The DirectPipelineResult is being garbage-collected while the ' + 'DirectRunner is still running the corresponding pipeline. This may ' + 'lead to incomplete execution of the pipeline if the main thread ' + 'exits before pipeline completion. Consider using ' + 'result.wait_until_finish() to wait for completion of pipeline ' + 'execution.') + def _is_in_terminal_state(self): return self._state is not PipelineState.RUNNING http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 2e46978..d465068 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -58,6 +58,9 @@ class _ExecutorService(object): self._default_name = 'ExecutorServiceWorker-' + str(index) self._update_name() self.shutdown_requested = False + + # Stop worker thread when main thread exits. + self.daemon = True self.start() def _update_name(self, task=None): @@ -78,7 +81,6 @@ class _ExecutorService(object): return None def run(self): - while not self.shutdown_requested: task = self._get_task_or_none() if task: @@ -460,9 +462,17 @@ class _ExecutorServiceParallelExecutor(object): return None def take(self): - item = self._queue.get() - self._queue.task_done() - return item + # The implementation of Queue.Queue.get() does not propagate + # KeyboardInterrupts when a timeout is not used. We therefore use a + # one-second timeout in the following loop to allow KeyboardInterrupts + # to be correctly propagated. + while True: + try: + item = self._queue.get(timeout=1) + self._queue.task_done() + return item + except Queue.Empty: + pass def offer(self, item): assert isinstance(item, self._item_type) http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py index 3addf92..bae25a4 100644 --- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py +++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py @@ -17,10 +17,10 @@ """A runner implementation that submits a job for remote execution. """ -from concurrent import futures import time import uuid +from concurrent import futures import grpc from apache_beam.portability.api import beam_job_api_pb2 http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/typehints/native_type_compatibility_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py index d0cafe1..0ff2b3b 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py @@ -17,9 +17,10 @@ """Test for Beam type compatibility library.""" -import typing import unittest +import typing + from apache_beam.typehints import typehints from apache_beam.typehints import native_type_compatibility http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/typehints/typed_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 58274f3..59d1e1c 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -17,9 +17,10 @@ """Unit tests for the type-hint objects and decorators.""" import inspect -import typing import unittest +import typing + import apache_beam as beam from apache_beam import pvalue from apache_beam import typehints