[GitHub] spark pull request #22404: DO NOT MERGE
Github user squito closed the pull request at: https://github.com/apache/spark/pull/22404 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22404: DO NOT MERGE
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22404#discussion_r217154410 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -692,5 +681,238 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial } super.finalize() } + + def setupEncryptionServer(): Array[Any] = { +encryptionServer = new PythonServer[Unit]("broadcast-encrypt-server") { + override def handleConnection(sock: Socket): Unit = { +val env = SparkEnv.get +val in = sock.getInputStream() +val dir = new File(Utils.getLocalDir(env.conf)) +val file = File.createTempFile("broadcast", "", dir) +path = file.getAbsolutePath +val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path)) +DechunkedInputStream.dechunkAndCopyToOutput(in, out) + } +} +Array(encryptionServer.port, encryptionServer.secret) + } + + def waitTillDataReceived(): Unit = encryptionServer.getResult() } // scalastyle:on no.finalize + +/** + * The inverse of pyspark's ChunkedStream for sending broadcast data. + * Tested from python tests. + */ +private[spark] class DechunkedInputStream(wrapped: InputStream) extends InputStream with Logging { + private val din = new DataInputStream(wrapped) + private var remainingInChunk = din.readInt() + + override def read(): Int = { +val into = new Array[Byte](1) +val n = read(into, 0, 1) +if (n == -1) { + -1 +} else { + // if you just cast a byte to an int, then anything > 127 is negative, which is interpreted + // as an EOF + val b = into(0) + if (b < 0) { --- End diff -- Pardon is this just trying to treat it as an unsigned byte? then just `b & 0xFF`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22404: DO NOT MERGE
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22404#discussion_r217154988 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -193,19 +193,51 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val newBids = broadcastVars.map(_.id).toSet // number of different broadcasts val toRemove = oldBids.diff(newBids) -val cnt = toRemove.size + newBids.diff(oldBids).size +val addedBids = newBids.diff(oldBids) +val cnt = toRemove.size + addedBids.size +val needsDecryptionServer = env.serializerManager.encryptionEnabled && addedBids.nonEmpty +dataOut.writeBoolean(needsDecryptionServer) dataOut.writeInt(cnt) -for (bid <- toRemove) { - // remove the broadcast from worker - dataOut.writeLong(- bid - 1) // bid >= 0 - oldBids.remove(bid) +def sendBidsToRemove(): Unit = { + for (bid <- toRemove) { +// remove the broadcast from worker +dataOut.writeLong(-bid - 1) // bid >= 0 +oldBids.remove(bid) + } } -for (broadcast <- broadcastVars) { - if (!oldBids.contains(broadcast.id)) { +if (needsDecryptionServer) { + // if there is encryption, we setup a server which reads the encrypted files, and sends + // the decrypted data to python + val idsAndFiles = broadcastVars.flatMap { broadcast => +if (!oldBids.contains(broadcast.id)) { --- End diff -- Nit: flip the if condition for clarity? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22404: DO NOT MERGE
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22404#discussion_r217153676 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.shuffle; + +import org.apache.spark.network.buffer.FileSegmentManagedBuffer; +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.util.TransportConf; + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; + +/** + * A DownloadFile that does not take any encryption settings into account for reading and + * writing data. + * + * This does *not* mean the data in the file is un-encrypted -- it could be that the data is + * already encrypted when its written, and subsequent layer is responsible for decrypting. + */ +public class SimpleDownloadFile implements DownloadFile { + private final File file; + private final TransportConf transportConf; + + public SimpleDownloadFile(File file, TransportConf transportConf) { +this.file = file; +this.transportConf = transportConf; + } + + @Override + public boolean delete() { +return file.delete(); + } + + @Override + public DownloadFileWritableChannel openForWriting() { +try { + return new SimpleDownloadWritableChannel(); +} catch (FileNotFoundException e) { + throw new RuntimeException(e); --- End diff -- Is it bad to just let the method declare that it throws IOException for FileNotFoundException? I know they're checked, but it's more precise --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22404: DO NOT MERGE
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22404#discussion_r217153478 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.shuffle; + +import org.apache.spark.network.buffer.FileSegmentManagedBuffer; +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.util.TransportConf; + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; + +/** + * A DownloadFile that does not take any encryption settings into account for reading and + * writing data. + * + * This does *not* mean the data in the file is un-encrypted -- it could be that the data is + * already encrypted when its written, and subsequent layer is responsible for decrypting. + */ +public class SimpleDownloadFile implements DownloadFile { + private final File file; + private final TransportConf transportConf; + + public SimpleDownloadFile(File file, TransportConf transportConf) { +this.file = file; +this.transportConf = transportConf; + } + + @Override + public boolean delete() { +return file.delete(); + } + + @Override + public DownloadFileWritableChannel openForWriting() { +try { + return new SimpleDownloadWritableChannel(); +} catch (FileNotFoundException e) { + throw new RuntimeException(e); +} + } + + @Override + public String path() { +return file.getAbsolutePath(); + } + + private class SimpleDownloadWritableChannel implements DownloadFileWritableChannel { +private final WritableByteChannel channel; --- End diff -- More nits, would put blank lines around this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22404: DO NOT MERGE
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22404#discussion_r217153096 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFile.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.shuffle; + +/** + * A handle on the file used when fetching remote data to disk. Used to ensure the lifecycle of + * writing the data, reading it back, and then cleaning it up is followed. Specific implementations + * may also handle encryption. The data can be read only via DownloadFileWritableChannel, + * which ensures data is not read until after the writer is closed. + */ +public interface DownloadFile { + /** + * Delete the file. + * + * @return true if and only if the file or directory is + * successfully deleted; false otherwise + */ + public boolean delete(); + + /** + * A channel for writing data to the file. This special channel allows access to the data for + * reading, after the channel is closed, via {@link DownloadFileWritableChannel#closeAndRead()}. + */ + public DownloadFileWritableChannel openForWriting(); + + /** + * The path of the file, intended only for debug purposes. + * @return --- End diff -- These are just nits, but you can put the text on the line above in the tag here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22404: DO NOT MERGE
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22404#discussion_r217153301 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFile.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.shuffle; --- End diff -- Super nit, I think this can be preceded by a blank line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22404: DO NOT MERGE
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22404#discussion_r217153554 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.shuffle; + +import org.apache.spark.network.buffer.FileSegmentManagedBuffer; +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.util.TransportConf; + +import java.io.*; --- End diff -- Usually we unroll star imports, but not a big deal --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22404: DO NOT MERGE
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22404#discussion_r217155746 --- Diff: python/pyspark/context.py --- @@ -499,19 +506,32 @@ def f(split, iterator): def _serialize_to_jvm(self, data, parallelism, serializer): """ -Calling the Java parallelize() method with an ArrayList is too slow, -because it sends O(n) Py4J commands. As an alternative, serialized -objects are written to a file and loaded through textFile(). -""" -tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir) -try: -serializer.dump_stream(data, tempFile) -tempFile.close() -readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile -return readRDDFromFile(self._jsc, tempFile.name, parallelism) -finally: -# readRDDFromFile eagerily reads the file so we can delete right after. -os.unlink(tempFile.name) +Using py4j to send a large dataset to the jvm is really slow, so we use either a file +or a socket if we have encryption enabled. +""" +if self._encryption_enabled: +# with encryption, we open a server in java and send the data directly +server = self._jvm.PythonParallelizeServer(self._jsc.sc(), parallelism) +(sock_file, _) = local_connect_and_auth(server.port(), server.secret()) +chunked_out = ChunkedStream(sock_file, 8192) +serializer.dump_stream(data, chunked_out) +chunked_out.close() +# this call will block until the server has read all the data and processed it (or +# throws an exception) +r = server.getResult() +return r --- End diff -- Nit, do you want to just `return server.getResult()` in cases like this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22404: DO NOT MERGE
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/22404 DO NOT MERGE just for testing You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark assorted_2.3_fixes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22404.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22404 commit f1c86c92bb7c14174027e40a890fd8e22c707887 Author: Imran Rashid Date: 2018-08-14T02:35:34Z [PYSPARK] Updates to pyspark broadcast commit 2be676d948f88719326aaeb000ebfde1d2049e0d Author: Imran Rashid Date: 2018-09-06T17:11:47Z [PYSPARK][SQL] Updates to RowQueue Tested with updates to RowQueueSuite commit 1b4e8ebf147e75c7a07e7a547b671f44a9cc7042 Author: Imran Rashid Date: 2018-08-22T21:38:28Z [CORE] Updates to remote cache reads Covered by tests in DistributedSuite --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org