This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new c50151b [FLINK-24880][python] Fix PeriodicThread to handle properly for negative wait timeout value c50151b is described below commit c50151b5523705a58e4c38ee71f351b6f6f06699 Author: Dian Fu <dia...@apache.org> AuthorDate: Fri Jan 7 10:56:26 2022 +0800 [FLINK-24880][python] Fix PeriodicThread to handle properly for negative wait timeout value This closes #18292. --- .../pyflink/fn_execution/beam/beam_stream_fast.pyx | 2 +- .../pyflink/fn_execution/beam/beam_stream_slow.py | 2 +- .../pyflink/fn_execution/utils/operation_utils.py | 35 ++++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx b/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx index 583a0c2..d7ed666 100644 --- a/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx +++ b/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx @@ -23,7 +23,7 @@ from libc.stdlib cimport realloc from libc.string cimport memcpy -from apache_beam.runners.worker.data_plane import PeriodicThread +from pyflink.fn_execution.utils.operation_utils import PeriodicThread cdef class BeamInputStream(LengthPrefixInputStream): def __cinit__(self, input_stream, size): diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py b/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py index 376a861..2210865 100644 --- a/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py +++ b/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py @@ -16,9 +16,9 @@ # limitations under the License. ################################################################################ from apache_beam.coders.coder_impl import create_InputStream, create_OutputStream -from apache_beam.runners.worker.data_plane import PeriodicThread from pyflink.fn_execution.stream_slow import InputStream +from pyflink.fn_execution.utils.operation_utils import PeriodicThread class BeamInputStream(InputStream): diff --git a/flink-python/pyflink/fn_execution/utils/operation_utils.py b/flink-python/pyflink/fn_execution/utils/operation_utils.py index f779cfd..9ed177d 100644 --- a/flink-python/pyflink/fn_execution/utils/operation_utils.py +++ b/flink-python/pyflink/fn_execution/utils/operation_utils.py @@ -16,6 +16,8 @@ # limitations under the License. ################################################################################ import datetime +import threading +import time from collections.abc import Generator from functools import partial @@ -271,3 +273,36 @@ def load_aggregate_function(payload): return cls() else: return pickle.loads(payload) + + +class PeriodicThread(threading.Thread): + """Call a function periodically with the specified number of seconds""" + + def __init__(self, + interval, + function, + args=None, + kwargs=None + ) -> None: + threading.Thread.__init__(self) + self._interval = interval + self._function = function + self._args = args if args is not None else [] + self._kwargs = kwargs if kwargs is not None else {} + self._finished = threading.Event() + + def run(self) -> None: + now = time.time() + next_call = now + self._interval + while (next_call <= now and not self._finished.is_set()) or \ + (not self._finished.wait(next_call - now)): + if next_call <= now: + next_call = now + self._interval + else: + next_call = next_call + self._interval + self._function(*self._args, **self._kwargs) + now = time.time() + + def cancel(self) -> None: + """Stop the thread if it hasn't finished yet.""" + self._finished.set()