[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/23166


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237886286
  
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
---
@@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var 
path: String) extends Serial
   override def handleConnection(sock: Socket): Unit = {
 val env = SparkEnv.get
 val in = sock.getInputStream()
-val dir = new File(Utils.getLocalDir(env.conf))
-val file = File.createTempFile("broadcast", "", dir)
-path = file.getAbsolutePath
-val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(path))
+val abspath = new File(path).getAbsolutePath
+val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(abspath))
--- End diff --

yes +1.  Sorry didn't mean to get things stuck on this, just wanted to make 
sure I was actually following what was happening correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-30 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237875851
  
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
---
@@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var 
path: String) extends Serial
   override def handleConnection(sock: Socket): Unit = {
 val env = SparkEnv.get
 val in = sock.getInputStream()
-val dir = new File(Utils.getLocalDir(env.conf))
-val file = File.createTempFile("broadcast", "", dir)
-path = file.getAbsolutePath
-val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(path))
+val abspath = new File(path).getAbsolutePath
+val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(abspath))
--- End diff --

ok I think we agree its good this way, (just to verify though I won't 
commit until you +1 it), but yes you are correct, now that we are using the 
decryption server which reads from the path in PythonBroadcast the path change 
isn't strictly necessary, but the value of self._path in broadcast.py doesn't 
match the path in PythonBroadcast so I think its better to have those match.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237738802
  
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
---
@@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var 
path: String) extends Serial
   override def handleConnection(sock: Socket): Unit = {
 val env = SparkEnv.get
 val in = sock.getInputStream()
-val dir = new File(Utils.getLocalDir(env.conf))
-val file = File.createTempFile("broadcast", "", dir)
-path = file.getAbsolutePath
-val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(path))
+val abspath = new File(path).getAbsolutePath
+val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(abspath))
--- End diff --

yeah I see how it was wrong before.  I'm saying, after you add 
`setupDecryptionServer`, then that decryption server would still be reading 
from the value of `path` which gets updated here, since its the same object in 
the driver's JVM.

anyway, this isn't a big deal, I think its better with your change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-29 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237652232
  
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
---
@@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var 
path: String) extends Serial
   override def handleConnection(sock: Socket): Unit = {
 val env = SparkEnv.get
 val in = sock.getInputStream()
-val dir = new File(Utils.getLocalDir(env.conf))
-val file = File.createTempFile("broadcast", "", dir)
-path = file.getAbsolutePath
-val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(path))
+val abspath = new File(path).getAbsolutePath
+val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(abspath))
--- End diff --

In the old version, we generated a random path with encryption turned off, 
so with encryption off it reads and writes from random path. When encryption 
related code was written we introduced a new "broadcast" path, the problem is 
when we tried to decrypt it on the driver side, it looks at the random path 
reference lying around and tries to decrypt from it but the actual data is in 
the new "broadcast" path location. So, by just passing the random reference, we 
make sure all the places are in sync with and without encryption


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237535187
  
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
---
@@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var 
path: String) extends Serial
   override def handleConnection(sock: Socket): Unit = {
 val env = SparkEnv.get
 val in = sock.getInputStream()
-val dir = new File(Utils.getLocalDir(env.conf))
-val file = File.createTempFile("broadcast", "", dir)
-path = file.getAbsolutePath
-val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(path))
+val abspath = new File(path).getAbsolutePath
+val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(abspath))
--- End diff --

just want to make sure I understand this part -- this change isn't 
necessary, right?  even in the old version, `path` gets updated here, so 
`setupDecryptionServer` would know where to read the data from.

that said, I do think your change makes more sense -- not sure why I didn't 
just use the supplied path in the first place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-28 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237250388
  
--- Diff: python/pyspark/broadcast.py ---
@@ -134,7 +137,15 @@ def value(self):
 """ Return the broadcasted value
 """
 if not hasattr(self, "_value") and self._path is not None:
-self._value = self.load_from_path(self._path)
+# we only need to decrypt it here when encryption is enabled 
and
--- End diff --

sorry yeah i thought hasattr would take of that my bad earlier had the 
check self._sc is not None, will add it back in load_from_path


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-28 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237248890
  
--- Diff: python/pyspark/broadcast.py ---
@@ -134,7 +137,15 @@ def value(self):
 """ Return the broadcasted value
 """
 if not hasattr(self, "_value") and self._path is not None:
-self._value = self.load_from_path(self._path)
+# we only need to decrypt it here when encryption is enabled 
and
--- End diff --

I think you can get in here when encryption off on executors and self._sc 
would be not definied


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-28 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237231750
  
--- Diff: python/pyspark/broadcast.py ---
@@ -118,8 +121,16 @@ def dump(self, value, f):
 f.close()
 
 def load_from_path(self, path):
-with open(path, 'rb', 1 << 20) as f:
-return self.load(f)
+# we only need to decrypt it here if its on the driver since 
executor
+# decryption handled already
+if self._sc is not None and self._sc._encryption_enabled:
--- End diff --

makes sense, will move it there


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-28 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237228157
  
--- Diff: python/pyspark/broadcast.py ---
@@ -118,8 +121,16 @@ def dump(self, value, f):
 f.close()
 
 def load_from_path(self, path):
-with open(path, 'rb', 1 << 20) as f:
-return self.load(f)
+# we only need to decrypt it here if its on the driver since 
executor
+# decryption handled already
+if self._sc is not None and self._sc._encryption_enabled:
--- End diff --

can you move the entire conditional check into `value()` instead, and keep 
`load_from_path` like it was before?  `value()` is already checking for 
different scenarios, and keeps the meaning of this function more 
straightforward.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-28 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237212353
  
--- Diff: python/pyspark/tests/test_broadcast.py ---
@@ -67,6 +67,20 @@ def test_broadcast_with_encryption(self):
 def test_broadcast_no_encryption(self):
 self._test_multiple_broadcasts()
 
+def _test_broadcast_on_driver(self, *extra_confs):
+conf = SparkConf()
+for key, value in extra_confs:
+conf.set(key, value)
+conf.setMaster("local-cluster[2,1,1024]")
+self.sc = SparkContext(conf=conf)
+bs = self.sc.broadcast(value=5)
+self.assertEqual(5, bs.value)
+
+def test_broadcast_value_driver_no_encryption(self):
+self._test_broadcast_on_driver()
+
+def test_broadcast_value_driver_encryption(self):
+
self.self._test_broadcast_on_driver(("spark.io.encryption.enabled", "true"))
--- End diff --

have an extra .self here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-28 Thread redsanket
GitHub user redsanket opened a pull request:

https://github.com/apache/spark/pull/23166

[SPARK-26201] Fix python broadcast with encryption

## What changes were proposed in this pull request?
Python with rpc and disk encryption enabled along with a python broadcast 
variable and just read the value back on the driver side the job failed with:

 

Traceback (most recent call last): File "broadcast.py", line 37, in 
 words_new.value File "/pyspark.zip/pyspark/broadcast.py", line 137, in 
value File "pyspark.zip/pyspark/broadcast.py", line 122, in load_from_path File 
"pyspark.zip/pyspark/broadcast.py", line 128, in load EOFError: Ran out of input

To reproduce use configs: --conf spark.network.crypto.enabled=true --conf 
spark.io.encryption.enabled=true

 

Code:

words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
words_new.value
print(words_new.value)
(Please fill in changes proposed in this fix)

## How was this patch tested?
words_new = sc.broadcast([“scala”, “java”, “hadoop”, 
“spark”, “akka”])
textFile = sc.textFile(“README.md”)
wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: 
(word + words_new.value[1], 1)).reduceByKey(lambda a, b: a+b)
 count = wordCounts.count()
 print(count)
 words_new.value
 print(words_new.value)

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/redsanket/spark SPARK-26201

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23166.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23166


commit 67a2ac87fb6e2d3fd4a5f260047a37bd2858228d
Author: schintap 
Date:   2018-11-28T16:20:55Z

[SPARK-26201] Fix python broadcast with encryption




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org