spark git commit: [SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming
Repository: spark Updated Branches: refs/heads/branch-1.5 5d2d2dd91 -> 598a5c2cc [SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming Move Py4jCallbackConnectionCleaner to Streaming because the callback server starts only in StreamingContext. Author: Shixiong ZhuCloses #10621 from zsxwing/SPARK-12617-2. (cherry picked from commit 1e6648d62fb82b708ea54c51cd23bfe4f542856e) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/598a5c2c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/598a5c2c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/598a5c2c Branch: refs/heads/branch-1.5 Commit: 598a5c2ccbf73d613e8237afbe75fcc5ff729396 Parents: 5d2d2dd Author: Shixiong Zhu Authored: Wed Jan 6 12:03:01 2016 -0800 Committer: Shixiong Zhu Committed: Wed Jan 6 12:18:33 2016 -0800 -- python/pyspark/context.py | 61 --- python/pyspark/streaming/context.py | 63 2 files changed, 63 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/598a5c2c/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 3d6101b..1b2a52a 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -52,64 +52,6 @@ DEFAULT_CONFIGS = { } -class Py4jCallbackConnectionCleaner(object): - -""" -A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617. -It will scan all callback connections every 30 seconds and close the dead connections. -""" - -def __init__(self, gateway): -self._gateway = gateway -self._stopped = False -self._timer = None -self._lock = RLock() - -def start(self): -if self._stopped: -return - -def clean_closed_connections(): -from py4j.java_gateway import quiet_close, quiet_shutdown - -callback_server = self._gateway._callback_server -with callback_server.lock: -try: -closed_connections = [] -for connection in callback_server.connections: -if not connection.isAlive(): -quiet_close(connection.input) -quiet_shutdown(connection.socket) -quiet_close(connection.socket) -closed_connections.append(connection) - -for closed_connection in closed_connections: -callback_server.connections.remove(closed_connection) -except Exception: -import traceback -traceback.print_exc() - -self._start_timer(clean_closed_connections) - -self._start_timer(clean_closed_connections) - -def _start_timer(self, f): -from threading import Timer - -with self._lock: -if not self._stopped: -self._timer = Timer(30.0, f) -self._timer.daemon = True -self._timer.start() - -def stop(self): -with self._lock: -self._stopped = True -if self._timer: -self._timer.cancel() -self._timer = None - - class SparkContext(object): """ @@ -124,7 +66,6 @@ class SparkContext(object): _active_spark_context = None _lock = Lock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH -_py4j_cleaner = None PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar') @@ -292,8 +233,6 @@ class SparkContext(object): if not SparkContext._gateway: SparkContext._gateway = gateway or launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm -_py4j_cleaner = Py4jCallbackConnectionCleaner(SparkContext._gateway) -_py4j_cleaner.start() if instance: if (SparkContext._active_spark_context and http://git-wip-us.apache.org/repos/asf/spark/blob/598a5c2c/python/pyspark/streaming/context.py -- diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 3a8f949..74c0d30 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -19,6 +19,7 @@ from __future__ import print_function import os import sys +from threading import RLock, Timer from py4j.java_gateway import java_import, JavaObject @@ -74,6 +75,63
spark git commit: [SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming
Repository: spark Updated Branches: refs/heads/master f82ebb152 -> 1e6648d62 [SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming Move Py4jCallbackConnectionCleaner to Streaming because the callback server starts only in StreamingContext. Author: Shixiong ZhuCloses #10621 from zsxwing/SPARK-12617-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e6648d6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e6648d6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e6648d6 Branch: refs/heads/master Commit: 1e6648d62fb82b708ea54c51cd23bfe4f542856e Parents: f82ebb1 Author: Shixiong Zhu Authored: Wed Jan 6 12:03:01 2016 -0800 Committer: Shixiong Zhu Committed: Wed Jan 6 12:03:01 2016 -0800 -- python/pyspark/context.py | 61 --- python/pyspark/streaming/context.py | 63 2 files changed, 63 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1e6648d6/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 5e4aeac..529d16b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -54,64 +54,6 @@ DEFAULT_CONFIGS = { } -class Py4jCallbackConnectionCleaner(object): - -""" -A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617. -It will scan all callback connections every 30 seconds and close the dead connections. -""" - -def __init__(self, gateway): -self._gateway = gateway -self._stopped = False -self._timer = None -self._lock = RLock() - -def start(self): -if self._stopped: -return - -def clean_closed_connections(): -from py4j.java_gateway import quiet_close, quiet_shutdown - -callback_server = self._gateway._callback_server -with callback_server.lock: -try: -closed_connections = [] -for connection in callback_server.connections: -if not connection.isAlive(): -quiet_close(connection.input) -quiet_shutdown(connection.socket) -quiet_close(connection.socket) -closed_connections.append(connection) - -for closed_connection in closed_connections: -callback_server.connections.remove(closed_connection) -except Exception: -import traceback -traceback.print_exc() - -self._start_timer(clean_closed_connections) - -self._start_timer(clean_closed_connections) - -def _start_timer(self, f): -from threading import Timer - -with self._lock: -if not self._stopped: -self._timer = Timer(30.0, f) -self._timer.daemon = True -self._timer.start() - -def stop(self): -with self._lock: -self._stopped = True -if self._timer: -self._timer.cancel() -self._timer = None - - class SparkContext(object): """ @@ -126,7 +68,6 @@ class SparkContext(object): _active_spark_context = None _lock = RLock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH -_py4j_cleaner = None PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar') @@ -303,8 +244,6 @@ class SparkContext(object): if not SparkContext._gateway: SparkContext._gateway = gateway or launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm -_py4j_cleaner = Py4jCallbackConnectionCleaner(SparkContext._gateway) -_py4j_cleaner.start() if instance: if (SparkContext._active_spark_context and http://git-wip-us.apache.org/repos/asf/spark/blob/1e6648d6/python/pyspark/streaming/context.py -- diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 5cc4bbd..0f1f005 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -19,6 +19,7 @@ from __future__ import print_function import os import sys +from threading import RLock, Timer from py4j.java_gateway import java_import, JavaObject @@ -32,6 +33,63 @@ from pyspark.streaming.util import TransformFunction, TransformFunctionSerialize __all__ = ["StreamingContext"] +class
spark git commit: [SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming
Repository: spark Updated Branches: refs/heads/branch-1.6 175681914 -> d821fae0e [SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming Move Py4jCallbackConnectionCleaner to Streaming because the callback server starts only in StreamingContext. Author: Shixiong ZhuCloses #10621 from zsxwing/SPARK-12617-2. (cherry picked from commit 1e6648d62fb82b708ea54c51cd23bfe4f542856e) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d821fae0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d821fae0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d821fae0 Branch: refs/heads/branch-1.6 Commit: d821fae0ecca6393d3632977797d72ba594d26a9 Parents: 1756819 Author: Shixiong Zhu Authored: Wed Jan 6 12:03:01 2016 -0800 Committer: Shixiong Zhu Committed: Wed Jan 6 12:03:10 2016 -0800 -- python/pyspark/context.py | 61 --- python/pyspark/streaming/context.py | 63 2 files changed, 63 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d821fae0/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 5e4aeac..529d16b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -54,64 +54,6 @@ DEFAULT_CONFIGS = { } -class Py4jCallbackConnectionCleaner(object): - -""" -A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617. -It will scan all callback connections every 30 seconds and close the dead connections. -""" - -def __init__(self, gateway): -self._gateway = gateway -self._stopped = False -self._timer = None -self._lock = RLock() - -def start(self): -if self._stopped: -return - -def clean_closed_connections(): -from py4j.java_gateway import quiet_close, quiet_shutdown - -callback_server = self._gateway._callback_server -with callback_server.lock: -try: -closed_connections = [] -for connection in callback_server.connections: -if not connection.isAlive(): -quiet_close(connection.input) -quiet_shutdown(connection.socket) -quiet_close(connection.socket) -closed_connections.append(connection) - -for closed_connection in closed_connections: -callback_server.connections.remove(closed_connection) -except Exception: -import traceback -traceback.print_exc() - -self._start_timer(clean_closed_connections) - -self._start_timer(clean_closed_connections) - -def _start_timer(self, f): -from threading import Timer - -with self._lock: -if not self._stopped: -self._timer = Timer(30.0, f) -self._timer.daemon = True -self._timer.start() - -def stop(self): -with self._lock: -self._stopped = True -if self._timer: -self._timer.cancel() -self._timer = None - - class SparkContext(object): """ @@ -126,7 +68,6 @@ class SparkContext(object): _active_spark_context = None _lock = RLock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH -_py4j_cleaner = None PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar') @@ -303,8 +244,6 @@ class SparkContext(object): if not SparkContext._gateway: SparkContext._gateway = gateway or launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm -_py4j_cleaner = Py4jCallbackConnectionCleaner(SparkContext._gateway) -_py4j_cleaner.start() if instance: if (SparkContext._active_spark_context and http://git-wip-us.apache.org/repos/asf/spark/blob/d821fae0/python/pyspark/streaming/context.py -- diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 06346e5..874cb3f 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -19,6 +19,7 @@ from __future__ import print_function import os import sys +from threading import RLock, Timer from py4j.java_gateway import java_import, JavaObject @@ -32,6