spark git commit: [SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming

2016-01-06 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 5d2d2dd91 -> 598a5c2cc


[SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming

Move Py4jCallbackConnectionCleaner to Streaming because the callback server 
starts only in StreamingContext.

Author: Shixiong Zhu 

Closes #10621 from zsxwing/SPARK-12617-2.

(cherry picked from commit 1e6648d62fb82b708ea54c51cd23bfe4f542856e)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/598a5c2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/598a5c2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/598a5c2c

Branch: refs/heads/branch-1.5
Commit: 598a5c2ccbf73d613e8237afbe75fcc5ff729396
Parents: 5d2d2dd
Author: Shixiong Zhu 
Authored: Wed Jan 6 12:03:01 2016 -0800
Committer: Shixiong Zhu 
Committed: Wed Jan 6 12:18:33 2016 -0800

--
 python/pyspark/context.py   | 61 ---
 python/pyspark/streaming/context.py | 63 
 2 files changed, 63 insertions(+), 61 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/598a5c2c/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 3d6101b..1b2a52a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -52,64 +52,6 @@ DEFAULT_CONFIGS = {
 }
 
 
-class Py4jCallbackConnectionCleaner(object):
-
-"""
-A cleaner to clean up callback connections that are not closed by Py4j. 
See SPARK-12617.
-It will scan all callback connections every 30 seconds and close the dead 
connections.
-"""
-
-def __init__(self, gateway):
-self._gateway = gateway
-self._stopped = False
-self._timer = None
-self._lock = RLock()
-
-def start(self):
-if self._stopped:
-return
-
-def clean_closed_connections():
-from py4j.java_gateway import quiet_close, quiet_shutdown
-
-callback_server = self._gateway._callback_server
-with callback_server.lock:
-try:
-closed_connections = []
-for connection in callback_server.connections:
-if not connection.isAlive():
-quiet_close(connection.input)
-quiet_shutdown(connection.socket)
-quiet_close(connection.socket)
-closed_connections.append(connection)
-
-for closed_connection in closed_connections:
-callback_server.connections.remove(closed_connection)
-except Exception:
-import traceback
-traceback.print_exc()
-
-self._start_timer(clean_closed_connections)
-
-self._start_timer(clean_closed_connections)
-
-def _start_timer(self, f):
-from threading import Timer
-
-with self._lock:
-if not self._stopped:
-self._timer = Timer(30.0, f)
-self._timer.daemon = True
-self._timer.start()
-
-def stop(self):
-with self._lock:
-self._stopped = True
-if self._timer:
-self._timer.cancel()
-self._timer = None
-
-
 class SparkContext(object):
 
 """
@@ -124,7 +66,6 @@ class SparkContext(object):
 _active_spark_context = None
 _lock = Lock()
 _python_includes = None  # zip and egg files that need to be added to 
PYTHONPATH
-_py4j_cleaner = None
 
 PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
 
@@ -292,8 +233,6 @@ class SparkContext(object):
 if not SparkContext._gateway:
 SparkContext._gateway = gateway or launch_gateway()
 SparkContext._jvm = SparkContext._gateway.jvm
-_py4j_cleaner = 
Py4jCallbackConnectionCleaner(SparkContext._gateway)
-_py4j_cleaner.start()
 
 if instance:
 if (SparkContext._active_spark_context and

http://git-wip-us.apache.org/repos/asf/spark/blob/598a5c2c/python/pyspark/streaming/context.py
--
diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index 3a8f949..74c0d30 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -19,6 +19,7 @@ from __future__ import print_function
 
 import os
 import sys
+from threading import RLock, Timer
 
 from py4j.java_gateway import java_import, JavaObject
 
@@ -74,6 +75,63 

spark git commit: [SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming

2016-01-06 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master f82ebb152 -> 1e6648d62


[SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming

Move Py4jCallbackConnectionCleaner to Streaming because the callback server 
starts only in StreamingContext.

Author: Shixiong Zhu 

Closes #10621 from zsxwing/SPARK-12617-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e6648d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e6648d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e6648d6

Branch: refs/heads/master
Commit: 1e6648d62fb82b708ea54c51cd23bfe4f542856e
Parents: f82ebb1
Author: Shixiong Zhu 
Authored: Wed Jan 6 12:03:01 2016 -0800
Committer: Shixiong Zhu 
Committed: Wed Jan 6 12:03:01 2016 -0800

--
 python/pyspark/context.py   | 61 ---
 python/pyspark/streaming/context.py | 63 
 2 files changed, 63 insertions(+), 61 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1e6648d6/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5e4aeac..529d16b 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -54,64 +54,6 @@ DEFAULT_CONFIGS = {
 }
 
 
-class Py4jCallbackConnectionCleaner(object):
-
-"""
-A cleaner to clean up callback connections that are not closed by Py4j. 
See SPARK-12617.
-It will scan all callback connections every 30 seconds and close the dead 
connections.
-"""
-
-def __init__(self, gateway):
-self._gateway = gateway
-self._stopped = False
-self._timer = None
-self._lock = RLock()
-
-def start(self):
-if self._stopped:
-return
-
-def clean_closed_connections():
-from py4j.java_gateway import quiet_close, quiet_shutdown
-
-callback_server = self._gateway._callback_server
-with callback_server.lock:
-try:
-closed_connections = []
-for connection in callback_server.connections:
-if not connection.isAlive():
-quiet_close(connection.input)
-quiet_shutdown(connection.socket)
-quiet_close(connection.socket)
-closed_connections.append(connection)
-
-for closed_connection in closed_connections:
-callback_server.connections.remove(closed_connection)
-except Exception:
-import traceback
-traceback.print_exc()
-
-self._start_timer(clean_closed_connections)
-
-self._start_timer(clean_closed_connections)
-
-def _start_timer(self, f):
-from threading import Timer
-
-with self._lock:
-if not self._stopped:
-self._timer = Timer(30.0, f)
-self._timer.daemon = True
-self._timer.start()
-
-def stop(self):
-with self._lock:
-self._stopped = True
-if self._timer:
-self._timer.cancel()
-self._timer = None
-
-
 class SparkContext(object):
 
 """
@@ -126,7 +68,6 @@ class SparkContext(object):
 _active_spark_context = None
 _lock = RLock()
 _python_includes = None  # zip and egg files that need to be added to 
PYTHONPATH
-_py4j_cleaner = None
 
 PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
 
@@ -303,8 +244,6 @@ class SparkContext(object):
 if not SparkContext._gateway:
 SparkContext._gateway = gateway or launch_gateway()
 SparkContext._jvm = SparkContext._gateway.jvm
-_py4j_cleaner = 
Py4jCallbackConnectionCleaner(SparkContext._gateway)
-_py4j_cleaner.start()
 
 if instance:
 if (SparkContext._active_spark_context and

http://git-wip-us.apache.org/repos/asf/spark/blob/1e6648d6/python/pyspark/streaming/context.py
--
diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index 5cc4bbd..0f1f005 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -19,6 +19,7 @@ from __future__ import print_function
 
 import os
 import sys
+from threading import RLock, Timer
 
 from py4j.java_gateway import java_import, JavaObject
 
@@ -32,6 +33,63 @@ from pyspark.streaming.util import TransformFunction, 
TransformFunctionSerialize
 __all__ = ["StreamingContext"]
 
 
+class 

spark git commit: [SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming

2016-01-06 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 175681914 -> d821fae0e


[SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming

Move Py4jCallbackConnectionCleaner to Streaming because the callback server 
starts only in StreamingContext.

Author: Shixiong Zhu 

Closes #10621 from zsxwing/SPARK-12617-2.

(cherry picked from commit 1e6648d62fb82b708ea54c51cd23bfe4f542856e)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d821fae0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d821fae0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d821fae0

Branch: refs/heads/branch-1.6
Commit: d821fae0ecca6393d3632977797d72ba594d26a9
Parents: 1756819
Author: Shixiong Zhu 
Authored: Wed Jan 6 12:03:01 2016 -0800
Committer: Shixiong Zhu 
Committed: Wed Jan 6 12:03:10 2016 -0800

--
 python/pyspark/context.py   | 61 ---
 python/pyspark/streaming/context.py | 63 
 2 files changed, 63 insertions(+), 61 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d821fae0/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5e4aeac..529d16b 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -54,64 +54,6 @@ DEFAULT_CONFIGS = {
 }
 
 
-class Py4jCallbackConnectionCleaner(object):
-
-"""
-A cleaner to clean up callback connections that are not closed by Py4j. 
See SPARK-12617.
-It will scan all callback connections every 30 seconds and close the dead 
connections.
-"""
-
-def __init__(self, gateway):
-self._gateway = gateway
-self._stopped = False
-self._timer = None
-self._lock = RLock()
-
-def start(self):
-if self._stopped:
-return
-
-def clean_closed_connections():
-from py4j.java_gateway import quiet_close, quiet_shutdown
-
-callback_server = self._gateway._callback_server
-with callback_server.lock:
-try:
-closed_connections = []
-for connection in callback_server.connections:
-if not connection.isAlive():
-quiet_close(connection.input)
-quiet_shutdown(connection.socket)
-quiet_close(connection.socket)
-closed_connections.append(connection)
-
-for closed_connection in closed_connections:
-callback_server.connections.remove(closed_connection)
-except Exception:
-import traceback
-traceback.print_exc()
-
-self._start_timer(clean_closed_connections)
-
-self._start_timer(clean_closed_connections)
-
-def _start_timer(self, f):
-from threading import Timer
-
-with self._lock:
-if not self._stopped:
-self._timer = Timer(30.0, f)
-self._timer.daemon = True
-self._timer.start()
-
-def stop(self):
-with self._lock:
-self._stopped = True
-if self._timer:
-self._timer.cancel()
-self._timer = None
-
-
 class SparkContext(object):
 
 """
@@ -126,7 +68,6 @@ class SparkContext(object):
 _active_spark_context = None
 _lock = RLock()
 _python_includes = None  # zip and egg files that need to be added to 
PYTHONPATH
-_py4j_cleaner = None
 
 PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
 
@@ -303,8 +244,6 @@ class SparkContext(object):
 if not SparkContext._gateway:
 SparkContext._gateway = gateway or launch_gateway()
 SparkContext._jvm = SparkContext._gateway.jvm
-_py4j_cleaner = 
Py4jCallbackConnectionCleaner(SparkContext._gateway)
-_py4j_cleaner.start()
 
 if instance:
 if (SparkContext._active_spark_context and

http://git-wip-us.apache.org/repos/asf/spark/blob/d821fae0/python/pyspark/streaming/context.py
--
diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index 06346e5..874cb3f 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -19,6 +19,7 @@ from __future__ import print_function
 
 import os
 import sys
+from threading import RLock, Timer
 
 from py4j.java_gateway import java_import, JavaObject
 
@@ -32,6