[GitHub] spark pull request #22404: DO NOT MERGE

2018-09-13 Thread squito
Github user squito closed the pull request at:

https://github.com/apache/spark/pull/22404


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22404: DO NOT MERGE

2018-09-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22404#discussion_r217154410
  
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
---
@@ -692,5 +681,238 @@ private[spark] class PythonBroadcast(@transient var 
path: String) extends Serial
 }
 super.finalize()
   }
+
+  def setupEncryptionServer(): Array[Any] = {
+encryptionServer = new PythonServer[Unit]("broadcast-encrypt-server") {
+  override def handleConnection(sock: Socket): Unit = {
+val env = SparkEnv.get
+val in = sock.getInputStream()
+val dir = new File(Utils.getLocalDir(env.conf))
+val file = File.createTempFile("broadcast", "", dir)
+path = file.getAbsolutePath
+val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(path))
+DechunkedInputStream.dechunkAndCopyToOutput(in, out)
+  }
+}
+Array(encryptionServer.port, encryptionServer.secret)
+  }
+
+  def waitTillDataReceived(): Unit = encryptionServer.getResult()
 }
 // scalastyle:on no.finalize
+
+/**
+ * The inverse of pyspark's ChunkedStream for sending broadcast data.
+ * Tested from python tests.
+ */
+private[spark] class DechunkedInputStream(wrapped: InputStream) extends 
InputStream with Logging {
+  private val din = new DataInputStream(wrapped)
+  private var remainingInChunk = din.readInt()
+
+  override def read(): Int = {
+val into = new Array[Byte](1)
+val n = read(into, 0, 1)
+if (n == -1) {
+  -1
+} else {
+  // if you just cast a byte to an int, then anything > 127 is 
negative, which is interpreted
+  // as an EOF
+  val b = into(0)
+  if (b < 0) {
--- End diff --

Pardon is this just trying to treat it as an unsigned byte? then just `b & 
0xFF`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22404: DO NOT MERGE

2018-09-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22404#discussion_r217154988
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -193,19 +193,51 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 val newBids = broadcastVars.map(_.id).toSet
 // number of different broadcasts
 val toRemove = oldBids.diff(newBids)
-val cnt = toRemove.size + newBids.diff(oldBids).size
+val addedBids = newBids.diff(oldBids)
+val cnt = toRemove.size + addedBids.size
+val needsDecryptionServer = 
env.serializerManager.encryptionEnabled && addedBids.nonEmpty
+dataOut.writeBoolean(needsDecryptionServer)
 dataOut.writeInt(cnt)
-for (bid <- toRemove) {
-  // remove the broadcast from worker
-  dataOut.writeLong(- bid - 1)  // bid >= 0
-  oldBids.remove(bid)
+def sendBidsToRemove(): Unit = {
+  for (bid <- toRemove) {
+// remove the broadcast from worker
+dataOut.writeLong(-bid - 1) // bid >= 0
+oldBids.remove(bid)
+  }
 }
-for (broadcast <- broadcastVars) {
-  if (!oldBids.contains(broadcast.id)) {
+if (needsDecryptionServer) {
+  // if there is encryption, we setup a server which reads the 
encrypted files, and sends
+  // the decrypted data to python
+  val idsAndFiles = broadcastVars.flatMap { broadcast =>
+if (!oldBids.contains(broadcast.id)) {
--- End diff --

Nit: flip the if condition for clarity?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22404: DO NOT MERGE

2018-09-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22404#discussion_r217153676
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.shuffle;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.util.TransportConf;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A DownloadFile that does not take any encryption settings into account 
for reading and
+ * writing data.
+ *
+ * This does *not* mean the data in the file is un-encrypted -- it could 
be that the data is
+ * already encrypted when its written, and subsequent layer is responsible 
for decrypting.
+ */
+public class SimpleDownloadFile implements DownloadFile {
+  private final File file;
+  private final TransportConf transportConf;
+
+  public SimpleDownloadFile(File file, TransportConf transportConf) {
+this.file = file;
+this.transportConf = transportConf;
+  }
+
+  @Override
+  public boolean delete() {
+return file.delete();
+  }
+
+  @Override
+  public DownloadFileWritableChannel openForWriting() {
+try {
+  return new SimpleDownloadWritableChannel();
+} catch (FileNotFoundException e) {
+  throw new RuntimeException(e);
--- End diff --

Is it bad to just let the method declare that it throws IOException for 
FileNotFoundException? I know they're checked, but it's more precise


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22404: DO NOT MERGE

2018-09-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22404#discussion_r217153478
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.shuffle;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.util.TransportConf;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A DownloadFile that does not take any encryption settings into account 
for reading and
+ * writing data.
+ *
+ * This does *not* mean the data in the file is un-encrypted -- it could 
be that the data is
+ * already encrypted when its written, and subsequent layer is responsible 
for decrypting.
+ */
+public class SimpleDownloadFile implements DownloadFile {
+  private final File file;
+  private final TransportConf transportConf;
+
+  public SimpleDownloadFile(File file, TransportConf transportConf) {
+this.file = file;
+this.transportConf = transportConf;
+  }
+
+  @Override
+  public boolean delete() {
+return file.delete();
+  }
+
+  @Override
+  public DownloadFileWritableChannel openForWriting() {
+try {
+  return new SimpleDownloadWritableChannel();
+} catch (FileNotFoundException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  @Override
+  public String path() {
+return file.getAbsolutePath();
+  }
+
+  private class SimpleDownloadWritableChannel implements 
DownloadFileWritableChannel {
+private final WritableByteChannel channel;
--- End diff --

More nits, would put blank lines around this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22404: DO NOT MERGE

2018-09-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22404#discussion_r217153096
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFile.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.shuffle;
+
+/**
+ * A handle on the file used when fetching remote data to disk.  Used to 
ensure the lifecycle of
+ * writing the data, reading it back, and then cleaning it up is followed. 
 Specific implementations
+ * may also handle encryption.  The data can be read only via 
DownloadFileWritableChannel,
+ * which ensures data is not read until after the writer is closed.
+ */
+public interface DownloadFile {
+  /**
+   * Delete the file.
+   *
+   * @return  true if and only if the file or directory is
+   *  successfully deleted; false otherwise
+   */
+  public boolean delete();
+
+  /**
+   * A channel for writing data to the file.  This special channel allows 
access to the data for
+   * reading, after the channel is closed, via {@link 
DownloadFileWritableChannel#closeAndRead()}.
+   */
+  public DownloadFileWritableChannel openForWriting();
+
+  /**
+   * The path of the file, intended only for debug purposes.
+   * @return
--- End diff --

These are just nits, but you can put the text on the line above in the tag 
here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22404: DO NOT MERGE

2018-09-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22404#discussion_r217153301
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFile.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.shuffle;
--- End diff --

Super nit, I think this can be preceded by a blank line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22404: DO NOT MERGE

2018-09-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22404#discussion_r217153554
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.shuffle;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.util.TransportConf;
+
+import java.io.*;
--- End diff --

Usually we unroll star imports, but not a big deal


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22404: DO NOT MERGE

2018-09-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22404#discussion_r217155746
  
--- Diff: python/pyspark/context.py ---
@@ -499,19 +506,32 @@ def f(split, iterator):
 
 def _serialize_to_jvm(self, data, parallelism, serializer):
 """
-Calling the Java parallelize() method with an ArrayList is too 
slow,
-because it sends O(n) Py4J commands.  As an alternative, serialized
-objects are written to a file and loaded through textFile().
-"""
-tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
-try:
-serializer.dump_stream(data, tempFile)
-tempFile.close()
-readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
-return readRDDFromFile(self._jsc, tempFile.name, parallelism)
-finally:
-# readRDDFromFile eagerily reads the file so we can delete 
right after.
-os.unlink(tempFile.name)
+Using py4j to send a large dataset to the jvm is really slow, so 
we use either a file
+or a socket if we have encryption enabled.
+"""
+if self._encryption_enabled:
+# with encryption, we open a server in java and send the data 
directly
+server = self._jvm.PythonParallelizeServer(self._jsc.sc(), 
parallelism)
+(sock_file, _) = local_connect_and_auth(server.port(), 
server.secret())
+chunked_out = ChunkedStream(sock_file, 8192)
+serializer.dump_stream(data, chunked_out)
+chunked_out.close()
+# this call will block until the server has read all the data 
and processed it (or
+# throws an exception)
+r = server.getResult()
+return r
--- End diff --

Nit, do you want to just `return server.getResult()` in cases like this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22404: DO NOT MERGE

2018-09-12 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/22404

DO NOT MERGE

just for testing

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark assorted_2.3_fixes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22404.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22404


commit f1c86c92bb7c14174027e40a890fd8e22c707887
Author: Imran Rashid 
Date:   2018-08-14T02:35:34Z

[PYSPARK] Updates to pyspark broadcast

commit 2be676d948f88719326aaeb000ebfde1d2049e0d
Author: Imran Rashid 
Date:   2018-09-06T17:11:47Z

[PYSPARK][SQL] Updates to RowQueue

Tested with updates to RowQueueSuite

commit 1b4e8ebf147e75c7a07e7a547b671f44a9cc7042
Author: Imran Rashid 
Date:   2018-08-22T21:38:28Z

[CORE] Updates to remote cache reads

Covered by tests in DistributedSuite




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org