[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/23166 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237886286 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial override def handleConnection(sock: Socket): Unit = { val env = SparkEnv.get val in = sock.getInputStream() -val dir = new File(Utils.getLocalDir(env.conf)) -val file = File.createTempFile("broadcast", "", dir) -path = file.getAbsolutePath -val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path)) +val abspath = new File(path).getAbsolutePath +val out = env.serializerManager.wrapForEncryption(new FileOutputStream(abspath)) --- End diff -- yes +1. Sorry didn't mean to get things stuck on this, just wanted to make sure I was actually following what was happening correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237875851 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial override def handleConnection(sock: Socket): Unit = { val env = SparkEnv.get val in = sock.getInputStream() -val dir = new File(Utils.getLocalDir(env.conf)) -val file = File.createTempFile("broadcast", "", dir) -path = file.getAbsolutePath -val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path)) +val abspath = new File(path).getAbsolutePath +val out = env.serializerManager.wrapForEncryption(new FileOutputStream(abspath)) --- End diff -- ok I think we agree its good this way, (just to verify though I won't commit until you +1 it), but yes you are correct, now that we are using the decryption server which reads from the path in PythonBroadcast the path change isn't strictly necessary, but the value of self._path in broadcast.py doesn't match the path in PythonBroadcast so I think its better to have those match. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237738802 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial override def handleConnection(sock: Socket): Unit = { val env = SparkEnv.get val in = sock.getInputStream() -val dir = new File(Utils.getLocalDir(env.conf)) -val file = File.createTempFile("broadcast", "", dir) -path = file.getAbsolutePath -val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path)) +val abspath = new File(path).getAbsolutePath +val out = env.serializerManager.wrapForEncryption(new FileOutputStream(abspath)) --- End diff -- yeah I see how it was wrong before. I'm saying, after you add `setupDecryptionServer`, then that decryption server would still be reading from the value of `path` which gets updated here, since its the same object in the driver's JVM. anyway, this isn't a big deal, I think its better with your change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237652232 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial override def handleConnection(sock: Socket): Unit = { val env = SparkEnv.get val in = sock.getInputStream() -val dir = new File(Utils.getLocalDir(env.conf)) -val file = File.createTempFile("broadcast", "", dir) -path = file.getAbsolutePath -val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path)) +val abspath = new File(path).getAbsolutePath +val out = env.serializerManager.wrapForEncryption(new FileOutputStream(abspath)) --- End diff -- In the old version, we generated a random path with encryption turned off, so with encryption off it reads and writes from random path. When encryption related code was written we introduced a new "broadcast" path, the problem is when we tried to decrypt it on the driver side, it looks at the random path reference lying around and tries to decrypt from it but the actual data is in the new "broadcast" path location. So, by just passing the random reference, we make sure all the places are in sync with and without encryption --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237535187 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial override def handleConnection(sock: Socket): Unit = { val env = SparkEnv.get val in = sock.getInputStream() -val dir = new File(Utils.getLocalDir(env.conf)) -val file = File.createTempFile("broadcast", "", dir) -path = file.getAbsolutePath -val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path)) +val abspath = new File(path).getAbsolutePath +val out = env.serializerManager.wrapForEncryption(new FileOutputStream(abspath)) --- End diff -- just want to make sure I understand this part -- this change isn't necessary, right? even in the old version, `path` gets updated here, so `setupDecryptionServer` would know where to read the data from. that said, I do think your change makes more sense -- not sure why I didn't just use the supplied path in the first place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237250388 --- Diff: python/pyspark/broadcast.py --- @@ -134,7 +137,15 @@ def value(self): """ Return the broadcasted value """ if not hasattr(self, "_value") and self._path is not None: -self._value = self.load_from_path(self._path) +# we only need to decrypt it here when encryption is enabled and --- End diff -- sorry yeah i thought hasattr would take of that my bad earlier had the check self._sc is not None, will add it back in load_from_path --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237248890 --- Diff: python/pyspark/broadcast.py --- @@ -134,7 +137,15 @@ def value(self): """ Return the broadcasted value """ if not hasattr(self, "_value") and self._path is not None: -self._value = self.load_from_path(self._path) +# we only need to decrypt it here when encryption is enabled and --- End diff -- I think you can get in here when encryption off on executors and self._sc would be not definied --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237231750 --- Diff: python/pyspark/broadcast.py --- @@ -118,8 +121,16 @@ def dump(self, value, f): f.close() def load_from_path(self, path): -with open(path, 'rb', 1 << 20) as f: -return self.load(f) +# we only need to decrypt it here if its on the driver since executor +# decryption handled already +if self._sc is not None and self._sc._encryption_enabled: --- End diff -- makes sense, will move it there --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237228157 --- Diff: python/pyspark/broadcast.py --- @@ -118,8 +121,16 @@ def dump(self, value, f): f.close() def load_from_path(self, path): -with open(path, 'rb', 1 << 20) as f: -return self.load(f) +# we only need to decrypt it here if its on the driver since executor +# decryption handled already +if self._sc is not None and self._sc._encryption_enabled: --- End diff -- can you move the entire conditional check into `value()` instead, and keep `load_from_path` like it was before? `value()` is already checking for different scenarios, and keeps the meaning of this function more straightforward. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237212353 --- Diff: python/pyspark/tests/test_broadcast.py --- @@ -67,6 +67,20 @@ def test_broadcast_with_encryption(self): def test_broadcast_no_encryption(self): self._test_multiple_broadcasts() +def _test_broadcast_on_driver(self, *extra_confs): +conf = SparkConf() +for key, value in extra_confs: +conf.set(key, value) +conf.setMaster("local-cluster[2,1,1024]") +self.sc = SparkContext(conf=conf) +bs = self.sc.broadcast(value=5) +self.assertEqual(5, bs.value) + +def test_broadcast_value_driver_no_encryption(self): +self._test_broadcast_on_driver() + +def test_broadcast_value_driver_encryption(self): + self.self._test_broadcast_on_driver(("spark.io.encryption.enabled", "true")) --- End diff -- have an extra .self here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/23166 [SPARK-26201] Fix python broadcast with encryption ## What changes were proposed in this pull request? Python with rpc and disk encryption enabled along with a python broadcast variable and just read the value back on the driver side the job failed with: Traceback (most recent call last): File "broadcast.py", line 37, in words_new.value File "/pyspark.zip/pyspark/broadcast.py", line 137, in value File "pyspark.zip/pyspark/broadcast.py", line 122, in load_from_path File "pyspark.zip/pyspark/broadcast.py", line 128, in load EOFError: Ran out of input To reproduce use configs: --conf spark.network.crypto.enabled=true --conf spark.io.encryption.enabled=true Code: words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) words_new.value print(words_new.value) (Please fill in changes proposed in this fix) ## How was this patch tested? words_new = sc.broadcast([âscalaâ, âjavaâ, âhadoopâ, âsparkâ, âakkaâ]) textFile = sc.textFile(âREADME.mdâ) wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word + words_new.value[1], 1)).reduceByKey(lambda a, b: a+b) count = wordCounts.count() print(count) words_new.value print(words_new.value) (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark SPARK-26201 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23166.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23166 commit 67a2ac87fb6e2d3fd4a5f260047a37bd2858228d Author: schintap Date: 2018-11-28T16:20:55Z [SPARK-26201] Fix python broadcast with encryption --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org