Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2024-01-01 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1439137984


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+

Review Comment:
   We are going to make this developer API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2024-01-01 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1439122099


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, BlockManager}
+
+
+class CachedArrowBatchServer(
+  val sparkConf: SparkConf,
+  val blockManager: BlockManager
+) extends Logging {
+
+  val authHelper = new SocketAuthHelper(sparkConf)
+
+  val serverSocket = new ServerSocket(
+0, 1, InetAddress.getLoopbackAddress()
+  )
+
+  protected def readUtf8(s: Socket): String = {
+val din = new DataInputStream(s.getInputStream())
+val len = din.readInt()
+val bytes = new Array[Byte](len)
+din.readFully(bytes)
+new String(bytes, UTF_8)
+  }
+
+  protected def writeUtf8(str: String, s: Socket): Unit = {
+val bytes = str.getBytes(UTF_8)
+val dout = new DataOutputStream(s.getOutputStream())
+dout.writeInt(bytes.length)
+dout.write(bytes, 0, bytes.length)
+dout.flush()
+  }
+
+  private def handleConnection(sock: Socket): Unit = {
+val blockId = BlockId(readUtf8(sock))
+assert(blockId.isInstanceOf[ArrowBatchBlockId])
+
+var errMessage = "ok"
+var blockDataOpt: Option[Array[Byte]] = None
+
+try {
+  val blockResult = blockManager.get[Array[Byte]](blockId)
+  if (blockResult.isDefined) {
+blockDataOpt = 
Some(blockResult.get.data.next().asInstanceOf[Array[Byte]])
+  } else {
+errMessage = s"The chunk $blockId data cache does not exist or has 
been removed"
+  }
+} catch {
+  case e: Exception =>
+errMessage = e.getMessage
+}
+
+writeUtf8(errMessage, sock)
+
+if (blockDataOpt.isDefined) {
+  val out = new BufferedOutputStream(sock.getOutputStream())
+  out.write(blockDataOpt.get)
+  out.flush()
+}
+  }
+
+  def createConnectionThread(sock: Socket, threadName: String): Unit = {
+new Thread(threadName) {
+  setDaemon(true)
+
+  override def run(): Unit = {
+try {
+  authHelper.authClient(sock)
+  handleConnection(sock)
+} finally {
+  JavaUtils.closeQuietly(sock)
+}
+  }
+}.start()
+  }
+
+  def start(): Unit = {
+logTrace("Creating listening socket")
+
+new Thread(s"CachedArrowBatchServer-listener") {
+  setDaemon(true)
+
+  override def run(): Unit = {
+var sock: Socket = null
+
+var connectionCount = 0
+try {
+  while (true) {
+sock = serverSocket.accept()
+connectionCount += 1
+createConnectionThread(

Review Comment:
   I prefer to use thread instead of threadpool because of 
https://github.com/apache/spark/pull/44294#discussion_r1427999506 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2024-01-01 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1439121654


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, BlockManager}
+
+
+class CachedArrowBatchServer(
+  val sparkConf: SparkConf,
+  val blockManager: BlockManager
+) extends Logging {
+
+  val authHelper = new SocketAuthHelper(sparkConf)
+
+  val serverSocket = new ServerSocket(
+0, 1, InetAddress.getLoopbackAddress()
+  )
+
+  protected def readUtf8(s: Socket): String = {
+val din = new DataInputStream(s.getInputStream())
+val len = din.readInt()
+val bytes = new Array[Byte](len)
+din.readFully(bytes)
+new String(bytes, UTF_8)
+  }
+
+  protected def writeUtf8(str: String, s: Socket): Unit = {
+val bytes = str.getBytes(UTF_8)
+val dout = new DataOutputStream(s.getOutputStream())
+dout.writeInt(bytes.length)
+dout.write(bytes, 0, bytes.length)
+dout.flush()
+  }
+
+  private def handleConnection(sock: Socket): Unit = {
+val blockId = BlockId(readUtf8(sock))
+assert(blockId.isInstanceOf[ArrowBatchBlockId])
+
+var errMessage = "ok"
+var blockDataOpt: Option[Array[Byte]] = None
+
+try {
+  val blockResult = blockManager.get[Array[Byte]](blockId)
+  if (blockResult.isDefined) {
+blockDataOpt = 
Some(blockResult.get.data.next().asInstanceOf[Array[Byte]])
+  } else {
+errMessage = s"The chunk $blockId data cache does not exist or has 
been removed"
+  }
+} catch {
+  case e: Exception =>
+errMessage = e.getMessage
+}
+
+writeUtf8(errMessage, sock)
+
+if (blockDataOpt.isDefined) {
+  val out = new BufferedOutputStream(sock.getOutputStream())
+  out.write(blockDataOpt.get)
+  out.flush()
+}
+  }
+
+  def createConnectionThread(sock: Socket, threadName: String): Unit = {
+new Thread(threadName) {
+  setDaemon(true)
+
+  override def run(): Unit = {
+try {
+  authHelper.authClient(sock)

Review Comment:
   No. "write a string ok " will write the length (sending 4 bytes) of the 
string first, then write utf8 encoded bytes of the string, for receiver side, 
it reads string length first then decode following bytes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2024-01-01 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1439121654


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, BlockManager}
+
+
+class CachedArrowBatchServer(
+  val sparkConf: SparkConf,
+  val blockManager: BlockManager
+) extends Logging {
+
+  val authHelper = new SocketAuthHelper(sparkConf)
+
+  val serverSocket = new ServerSocket(
+0, 1, InetAddress.getLoopbackAddress()
+  )
+
+  protected def readUtf8(s: Socket): String = {
+val din = new DataInputStream(s.getInputStream())
+val len = din.readInt()
+val bytes = new Array[Byte](len)
+din.readFully(bytes)
+new String(bytes, UTF_8)
+  }
+
+  protected def writeUtf8(str: String, s: Socket): Unit = {
+val bytes = str.getBytes(UTF_8)
+val dout = new DataOutputStream(s.getOutputStream())
+dout.writeInt(bytes.length)
+dout.write(bytes, 0, bytes.length)
+dout.flush()
+  }
+
+  private def handleConnection(sock: Socket): Unit = {
+val blockId = BlockId(readUtf8(sock))
+assert(blockId.isInstanceOf[ArrowBatchBlockId])
+
+var errMessage = "ok"
+var blockDataOpt: Option[Array[Byte]] = None
+
+try {
+  val blockResult = blockManager.get[Array[Byte]](blockId)
+  if (blockResult.isDefined) {
+blockDataOpt = 
Some(blockResult.get.data.next().asInstanceOf[Array[Byte]])
+  } else {
+errMessage = s"The chunk $blockId data cache does not exist or has 
been removed"
+  }
+} catch {
+  case e: Exception =>
+errMessage = e.getMessage
+}
+
+writeUtf8(errMessage, sock)
+
+if (blockDataOpt.isDefined) {
+  val out = new BufferedOutputStream(sock.getOutputStream())
+  out.write(blockDataOpt.get)
+  out.flush()
+}
+  }
+
+  def createConnectionThread(sock: Socket, threadName: String): Unit = {
+new Thread(threadName) {
+  setDaemon(true)
+
+  override def run(): Unit = {
+try {
+  authHelper.authClient(sock)

Review Comment:
   No. "write a string ok " will write the length of the string first, then 
write utf8 encoded bytes of the string, for receiver side, it reads string 
length first then decode following bytes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


Ngone51 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1438071149


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, BlockManager}
+
+
+class CachedArrowBatchServer(
+  val sparkConf: SparkConf,
+  val blockManager: BlockManager
+) extends Logging {
+
+  val authHelper = new SocketAuthHelper(sparkConf)
+
+  val serverSocket = new ServerSocket(
+0, 1, InetAddress.getLoopbackAddress()
+  )
+
+  protected def readUtf8(s: Socket): String = {
+val din = new DataInputStream(s.getInputStream())
+val len = din.readInt()
+val bytes = new Array[Byte](len)
+din.readFully(bytes)
+new String(bytes, UTF_8)
+  }
+
+  protected def writeUtf8(str: String, s: Socket): Unit = {
+val bytes = str.getBytes(UTF_8)
+val dout = new DataOutputStream(s.getOutputStream())
+dout.writeInt(bytes.length)
+dout.write(bytes, 0, bytes.length)
+dout.flush()
+  }
+
+  private def handleConnection(sock: Socket): Unit = {
+val blockId = BlockId(readUtf8(sock))
+assert(blockId.isInstanceOf[ArrowBatchBlockId])
+
+var errMessage = "ok"
+var blockDataOpt: Option[Array[Byte]] = None
+
+try {
+  val blockResult = blockManager.get[Array[Byte]](blockId)
+  if (blockResult.isDefined) {
+blockDataOpt = 
Some(blockResult.get.data.next().asInstanceOf[Array[Byte]])
+  } else {
+errMessage = s"The chunk $blockId data cache does not exist or has 
been removed"
+  }
+} catch {
+  case e: Exception =>
+errMessage = e.getMessage
+}
+
+writeUtf8(errMessage, sock)
+
+if (blockDataOpt.isDefined) {
+  val out = new BufferedOutputStream(sock.getOutputStream())
+  out.write(blockDataOpt.get)
+  out.flush()
+}
+  }
+
+  def createConnectionThread(sock: Socket, threadName: String): Unit = {
+new Thread(threadName) {
+  setDaemon(true)
+
+  override def run(): Unit = {
+try {
+  authHelper.authClient(sock)

Review Comment:
   `authClient` write a string `ok` to the socket if authentication pass. Do 
you think it would pollute the data?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


Ngone51 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1438068517


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, BlockManager}
+
+
+class CachedArrowBatchServer(
+  val sparkConf: SparkConf,
+  val blockManager: BlockManager
+) extends Logging {
+
+  val authHelper = new SocketAuthHelper(sparkConf)
+
+  val serverSocket = new ServerSocket(
+0, 1, InetAddress.getLoopbackAddress()
+  )
+
+  protected def readUtf8(s: Socket): String = {
+val din = new DataInputStream(s.getInputStream())
+val len = din.readInt()
+val bytes = new Array[Byte](len)
+din.readFully(bytes)
+new String(bytes, UTF_8)
+  }
+
+  protected def writeUtf8(str: String, s: Socket): Unit = {
+val bytes = str.getBytes(UTF_8)
+val dout = new DataOutputStream(s.getOutputStream())
+dout.writeInt(bytes.length)
+dout.write(bytes, 0, bytes.length)
+dout.flush()
+  }
+
+  private def handleConnection(sock: Socket): Unit = {
+val blockId = BlockId(readUtf8(sock))
+assert(blockId.isInstanceOf[ArrowBatchBlockId])
+
+var errMessage = "ok"
+var blockDataOpt: Option[Array[Byte]] = None
+
+try {
+  val blockResult = blockManager.get[Array[Byte]](blockId)
+  if (blockResult.isDefined) {
+blockDataOpt = 
Some(blockResult.get.data.next().asInstanceOf[Array[Byte]])
+  } else {
+errMessage = s"The chunk $blockId data cache does not exist or has 
been removed"
+  }
+} catch {
+  case e: Exception =>
+errMessage = e.getMessage
+}
+
+writeUtf8(errMessage, sock)
+
+if (blockDataOpt.isDefined) {
+  val out = new BufferedOutputStream(sock.getOutputStream())
+  out.write(blockDataOpt.get)
+  out.flush()
+}
+  }
+
+  def createConnectionThread(sock: Socket, threadName: String): Unit = {
+new Thread(threadName) {
+  setDaemon(true)
+
+  override def run(): Unit = {
+try {
+  authHelper.authClient(sock)
+  handleConnection(sock)
+} finally {
+  JavaUtils.closeQuietly(sock)
+}
+  }
+}.start()
+  }
+
+  def start(): Unit = {
+logTrace("Creating listening socket")
+
+new Thread(s"CachedArrowBatchServer-listener") {
+  setDaemon(true)
+
+  override def run(): Unit = {
+var sock: Socket = null
+
+var connectionCount = 0
+try {
+  while (true) {
+sock = serverSocket.accept()
+connectionCount += 1
+createConnectionThread(

Review Comment:
   It still uses a fresh thread rather than a thread pool?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


Ngone51 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1438068013


##
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##
@@ -99,6 +99,10 @@ class SparkEnv (
 
   private[spark] var executorBackend: Option[ExecutorBackend] = None
 
+  private[spark] var cachedArrowBatchServerPort: Option[Int] = None
+
+  private[spark] var cachedArrowBatchServerSecret: Option[String] = None

Review Comment:
   And, actually, I think we can create the `CachedArrowBatchServer` instance 
in `SparkEnv` so that driver and executor can reuse it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


Ngone51 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r143805


##
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##
@@ -99,6 +99,10 @@ class SparkEnv (
 
   private[spark] var executorBackend: Option[ExecutorBackend] = None
 
+  private[spark] var cachedArrowBatchServerPort: Option[Int] = None
+
+  private[spark] var cachedArrowBatchServerSecret: Option[String] = None

Review Comment:
   Shall we assign the values for driver too? Thus, driver and executor would 
have the consistent behaviour, e.g., both can access the vlaue through these 
local vars instead of the server itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


Ngone51 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1438060364


##
core/src/main/scala/org/apache/spark/internal/config/Python.scala:
##
@@ -69,4 +69,12 @@ private[spark] object Python {
 .version("3.2.0")
 .booleanConf
 .createWithDefault(false)
+
+  val PYTHON_DATAFRAME_CHUNK_READ_ENABLED =
+ConfigBuilder("spark.python.dataFrameChunkRead.enabled")
+.doc("When true, driver and executes launch local cached arrow batch 
servers for serving " +

Review Comment:
   executors?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


Ngone51 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1438025198


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory, 
SparkEnv, TaskContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+class PersistDataFrameAsArrowBatchChunksPartitionEvaluator(
+schema: StructType,
+timeZoneId: String,
+errorOnDuplicatedFieldNames: Boolean,
+maxRecordsPerBatch: Long
+) extends PartitionEvaluator[InternalRow, ChunkMeta] {
+
+  def eval(partitionIndex: Int, inputs: Iterator[InternalRow]*): 
Iterator[ChunkMeta] = {
+val blockManager = SparkEnv.get.blockManager
+val chunkMetaList = new ArrayBuffer[ChunkMeta]()
+
+val context = TaskContext.get()
+val arrowBatchIter = ArrowConverters.toBatchIterator(
+  inputs(0), schema, maxRecordsPerBatch, timeZoneId,
+  errorOnDuplicatedFieldNames, context
+)
+
+try {
+  while (arrowBatchIter.hasNext) {
+val arrowBatch = arrowBatchIter.next()
+val rowCount = arrowBatchIter.lastBatchRowCount
+
+val uuid = java.util.UUID.randomUUID()
+val blockId = ArrowBatchBlockId(uuid)
+
+val out = new ByteArrayOutputStream(32 * 1024 * 1024)
+
+val batchWriter =
+  new ArrowBatchStreamWriter(schema, out, timeZoneId, 
errorOnDuplicatedFieldNames)
+
+batchWriter.writeBatches(Iterator.single(arrowBatch))
+batchWriter.end()
+
+val blockData = out.toByteArray
+
+blockManager.putSingle[Array[Byte]](
+  blockId, blockData, StorageLevel.MEMORY_AND_DISK, tellMaster = true
+)
+chunkMetaList.append(
+  ChunkMeta(blockId.toString, rowCount, blockData.length)
+)
+  }
+} catch {
+  case e: Exception =>
+// Clean cached chunks
+for (chunkMeta <- chunkMetaList) {
+  try {
+blockManager.master.removeBlock(BlockId(chunkMeta.id))
+  } catch {
+case _: Exception => ()

Review Comment:
   I think we surpress the expection during block removal already, see 
[BlockManagerMasterEndpoint#handleBlockRemovalFailure](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L309).
 Even if the exception escapes from 
`BlockManagerMasterEndpoint#handleBlockRemovalFailure` in the corner case, 
[Inbox#safelyCall](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala#L202)
 would surpress it too. But please at least log a warning here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


Ngone51 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1438025198


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory, 
SparkEnv, TaskContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+class PersistDataFrameAsArrowBatchChunksPartitionEvaluator(
+schema: StructType,
+timeZoneId: String,
+errorOnDuplicatedFieldNames: Boolean,
+maxRecordsPerBatch: Long
+) extends PartitionEvaluator[InternalRow, ChunkMeta] {
+
+  def eval(partitionIndex: Int, inputs: Iterator[InternalRow]*): 
Iterator[ChunkMeta] = {
+val blockManager = SparkEnv.get.blockManager
+val chunkMetaList = new ArrayBuffer[ChunkMeta]()
+
+val context = TaskContext.get()
+val arrowBatchIter = ArrowConverters.toBatchIterator(
+  inputs(0), schema, maxRecordsPerBatch, timeZoneId,
+  errorOnDuplicatedFieldNames, context
+)
+
+try {
+  while (arrowBatchIter.hasNext) {
+val arrowBatch = arrowBatchIter.next()
+val rowCount = arrowBatchIter.lastBatchRowCount
+
+val uuid = java.util.UUID.randomUUID()
+val blockId = ArrowBatchBlockId(uuid)
+
+val out = new ByteArrayOutputStream(32 * 1024 * 1024)
+
+val batchWriter =
+  new ArrowBatchStreamWriter(schema, out, timeZoneId, 
errorOnDuplicatedFieldNames)
+
+batchWriter.writeBatches(Iterator.single(arrowBatch))
+batchWriter.end()
+
+val blockData = out.toByteArray
+
+blockManager.putSingle[Array[Byte]](
+  blockId, blockData, StorageLevel.MEMORY_AND_DISK, tellMaster = true
+)
+chunkMetaList.append(
+  ChunkMeta(blockId.toString, rowCount, blockData.length)
+)
+  }
+} catch {
+  case e: Exception =>
+// Clean cached chunks
+for (chunkMeta <- chunkMetaList) {
+  try {
+blockManager.master.removeBlock(BlockId(chunkMeta.id))
+  } catch {
+case _: Exception => ()

Review Comment:
   I think we surpress the expection during block removal already, see 
[BlockManagerMasterEndpoint#handleBlockRemovalFailure](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L309).
 Even if the exception escapes from 
`BlockManagerMasterEndpoint#handleBlockRemovalFailure` in the corner case, 
[Inbox#safeCall](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala#L202)
 would surpress it too. But please at least log a warning here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


Ngone51 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1438025198


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory, 
SparkEnv, TaskContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+class PersistDataFrameAsArrowBatchChunksPartitionEvaluator(
+schema: StructType,
+timeZoneId: String,
+errorOnDuplicatedFieldNames: Boolean,
+maxRecordsPerBatch: Long
+) extends PartitionEvaluator[InternalRow, ChunkMeta] {
+
+  def eval(partitionIndex: Int, inputs: Iterator[InternalRow]*): 
Iterator[ChunkMeta] = {
+val blockManager = SparkEnv.get.blockManager
+val chunkMetaList = new ArrayBuffer[ChunkMeta]()
+
+val context = TaskContext.get()
+val arrowBatchIter = ArrowConverters.toBatchIterator(
+  inputs(0), schema, maxRecordsPerBatch, timeZoneId,
+  errorOnDuplicatedFieldNames, context
+)
+
+try {
+  while (arrowBatchIter.hasNext) {
+val arrowBatch = arrowBatchIter.next()
+val rowCount = arrowBatchIter.lastBatchRowCount
+
+val uuid = java.util.UUID.randomUUID()
+val blockId = ArrowBatchBlockId(uuid)
+
+val out = new ByteArrayOutputStream(32 * 1024 * 1024)
+
+val batchWriter =
+  new ArrowBatchStreamWriter(schema, out, timeZoneId, 
errorOnDuplicatedFieldNames)
+
+batchWriter.writeBatches(Iterator.single(arrowBatch))
+batchWriter.end()
+
+val blockData = out.toByteArray
+
+blockManager.putSingle[Array[Byte]](
+  blockId, blockData, StorageLevel.MEMORY_AND_DISK, tellMaster = true
+)
+chunkMetaList.append(
+  ChunkMeta(blockId.toString, rowCount, blockData.length)
+)
+  }
+} catch {
+  case e: Exception =>
+// Clean cached chunks
+for (chunkMeta <- chunkMetaList) {
+  try {
+blockManager.master.removeBlock(BlockId(chunkMeta.id))
+  } catch {
+case _: Exception => ()

Review Comment:
   I think surpress the expection during block removal already, see 
[BlockManagerMasterEndpoint#handleBlockRemovalFailure](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L309).
 But please at least log a warning here.



##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory, 
SparkEnv, TaskContext}
+import 

Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437961373


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory, 
SparkEnv, TaskContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+class PersistDataFrameAsArrowBatchChunksPartitionEvaluator(
+schema: StructType,
+timeZoneId: String,
+errorOnDuplicatedFieldNames: Boolean,
+maxRecordsPerBatch: Long
+) extends PartitionEvaluator[InternalRow, ChunkMeta] {
+
+  def eval(partitionIndex: Int, inputs: Iterator[InternalRow]*): 
Iterator[ChunkMeta] = {
+val blockManager = SparkEnv.get.blockManager
+val chunkMetaList = new ArrayBuffer[ChunkMeta]()
+
+val context = TaskContext.get()
+val arrowBatchIter = ArrowConverters.toBatchIterator(
+  inputs(0), schema, maxRecordsPerBatch, timeZoneId,
+  errorOnDuplicatedFieldNames, context
+)
+
+try {
+  while (arrowBatchIter.hasNext) {
+val arrowBatch = arrowBatchIter.next()
+val rowCount = arrowBatchIter.lastBatchRowCount
+
+val uuid = java.util.UUID.randomUUID()
+val blockId = ArrowBatchBlockId(uuid)
+
+val out = new ByteArrayOutputStream(32 * 1024 * 1024)
+
+val batchWriter =
+  new ArrowBatchStreamWriter(schema, out, timeZoneId, 
errorOnDuplicatedFieldNames)
+
+batchWriter.writeBatches(Iterator.single(arrowBatch))
+batchWriter.end()
+
+val blockData = out.toByteArray
+
+blockManager.putSingle[Array[Byte]](
+  blockId, blockData, StorageLevel.MEMORY_AND_DISK, tellMaster = true
+)
+chunkMetaList.append(
+  ChunkMeta(blockId.toString, rowCount, blockData.length)
+)
+  }
+} catch {
+  case e: Exception =>
+// Clean cached chunks
+for (chunkMeta <- chunkMetaList) {
+  try {
+blockManager.master.removeBlock(BlockId(chunkMeta.id))
+  } catch {
+case _: Exception => ()

Review Comment:
   We suppress the exception, so we prevent the exception to interrupt the 
loop. We have following blocks in the loop to be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437960948


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory, 
SparkEnv, TaskContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+class PersistDataFrameAsArrowBatchChunksPartitionEvaluator(
+schema: StructType,
+timeZoneId: String,
+errorOnDuplicatedFieldNames: Boolean,
+maxRecordsPerBatch: Long
+) extends PartitionEvaluator[InternalRow, ChunkMeta] {
+
+  def eval(partitionIndex: Int, inputs: Iterator[InternalRow]*): 
Iterator[ChunkMeta] = {
+val blockManager = SparkEnv.get.blockManager
+val chunkMetaList = new ArrayBuffer[ChunkMeta]()
+
+val context = TaskContext.get()
+val arrowBatchIter = ArrowConverters.toBatchIterator(
+  inputs(0), schema, maxRecordsPerBatch, timeZoneId,
+  errorOnDuplicatedFieldNames, context
+)
+
+try {
+  while (arrowBatchIter.hasNext) {
+val arrowBatch = arrowBatchIter.next()
+val rowCount = arrowBatchIter.lastBatchRowCount
+
+val uuid = java.util.UUID.randomUUID()
+val blockId = ArrowBatchBlockId(uuid)
+
+val out = new ByteArrayOutputStream(32 * 1024 * 1024)
+
+val batchWriter =
+  new ArrowBatchStreamWriter(schema, out, timeZoneId, 
errorOnDuplicatedFieldNames)
+
+batchWriter.writeBatches(Iterator.single(arrowBatch))
+batchWriter.end()
+
+val blockData = out.toByteArray
+
+blockManager.putSingle[Array[Byte]](
+  blockId, blockData, StorageLevel.MEMORY_AND_DISK, tellMaster = true
+)
+chunkMetaList.append(
+  ChunkMeta(blockId.toString, rowCount, blockData.length)
+)
+  }
+} catch {
+  case e: Exception =>
+// Clean cached chunks
+for (chunkMeta <- chunkMetaList) {
+  try {
+blockManager.master.removeBlock(BlockId(chunkMeta.id))
+  } catch {
+case _: Exception => ()

Review Comment:
   Or we can print a warning here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437960948


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory, 
SparkEnv, TaskContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+class PersistDataFrameAsArrowBatchChunksPartitionEvaluator(
+schema: StructType,
+timeZoneId: String,
+errorOnDuplicatedFieldNames: Boolean,
+maxRecordsPerBatch: Long
+) extends PartitionEvaluator[InternalRow, ChunkMeta] {
+
+  def eval(partitionIndex: Int, inputs: Iterator[InternalRow]*): 
Iterator[ChunkMeta] = {
+val blockManager = SparkEnv.get.blockManager
+val chunkMetaList = new ArrayBuffer[ChunkMeta]()
+
+val context = TaskContext.get()
+val arrowBatchIter = ArrowConverters.toBatchIterator(
+  inputs(0), schema, maxRecordsPerBatch, timeZoneId,
+  errorOnDuplicatedFieldNames, context
+)
+
+try {
+  while (arrowBatchIter.hasNext) {
+val arrowBatch = arrowBatchIter.next()
+val rowCount = arrowBatchIter.lastBatchRowCount
+
+val uuid = java.util.UUID.randomUUID()
+val blockId = ArrowBatchBlockId(uuid)
+
+val out = new ByteArrayOutputStream(32 * 1024 * 1024)
+
+val batchWriter =
+  new ArrowBatchStreamWriter(schema, out, timeZoneId, 
errorOnDuplicatedFieldNames)
+
+batchWriter.writeBatches(Iterator.single(arrowBatch))
+batchWriter.end()
+
+val blockData = out.toByteArray
+
+blockManager.putSingle[Array[Byte]](
+  blockId, blockData, StorageLevel.MEMORY_AND_DISK, tellMaster = true
+)
+chunkMetaList.append(
+  ChunkMeta(blockId.toString, rowCount, blockData.length)
+)
+  }
+} catch {
+  case e: Exception =>
+// Clean cached chunks
+for (chunkMeta <- chunkMetaList) {
+  try {
+blockManager.master.removeBlock(BlockId(chunkMeta.id))
+  } catch {
+case _: Exception => ()

Review Comment:
   If we do nothing in catch block, then the exception is raised , and we lost 
tracking on these persisted blocks, i.e., we have no way remove these blocks if 
we don't remove them in the catch block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437960050


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+
+
+ChunkMeta = namedtuple("ChunkMeta", ["id", "row_count", "byte_count"])
+
+
+def persist_dataframe_as_chunks(
+dataframe: DataFrame, max_records_per_batch: int
+) -> list[ChunkMeta]:
+"""
+Persist and materialize the spark dataframe as chunks, each chunk is an 
arrow batch.
+It tries to persist data to spark worker memory firstly, if memory is not 
sufficient,
+then it fallbacks to persist spilled data to spark worker local disk.
+Return the list of tuple (chunk_id, chunk_row_count, chunk_byte_count).
+This function is only available when it is called from spark driver 
process.
+"""
+spark = dataframe.sparkSession
+if spark is None:
+raise PySparkRuntimeError("Active spark session is required.")
+
+sc = spark.sparkContext
+if sc.getConf().get("spark.python.dataFrameChunkRead.enabled", 
"false").lower() != "true":

Review Comment:
   Putting it in StaticSQLConf sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437929666


##
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##
@@ -92,6 +92,9 @@ private[sql] object ArrowConverters extends Logging {
 private val root = VectorSchemaRoot.create(arrowSchema, allocator)
 protected val unloader = new VectorUnloader(root)
 protected val arrowWriter = ArrowWriter.create(root)
+private var _lastBatchRowCount: Long = -1L

Review Comment:
   Let's name it `rowCountInLastBatch` to be consistent with 
`ArrowBatchWithSchemaIterator` below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437929500


##
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##
@@ -92,6 +92,9 @@ private[sql] object ArrowConverters extends Logging {
 private val root = VectorSchemaRoot.create(arrowSchema, allocator)
 protected val unloader = new VectorUnloader(root)
 protected val arrowWriter = ArrowWriter.create(root)
+private var _lastBatchRowCount: Long = -1L

Review Comment:
   Let's don't use `_` prefix. Just use the name like `lastBatchRowCount`, and 
`getLastBatchRowCount`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437929286


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory, 
SparkEnv, TaskContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+class PersistDataFrameAsArrowBatchChunksPartitionEvaluator(
+schema: StructType,
+timeZoneId: String,
+errorOnDuplicatedFieldNames: Boolean,
+maxRecordsPerBatch: Long
+) extends PartitionEvaluator[InternalRow, ChunkMeta] {
+
+  def eval(partitionIndex: Int, inputs: Iterator[InternalRow]*): 
Iterator[ChunkMeta] = {
+val blockManager = SparkEnv.get.blockManager
+val chunkMetaList = new ArrayBuffer[ChunkMeta]()
+
+val context = TaskContext.get()
+val arrowBatchIter = ArrowConverters.toBatchIterator(
+  inputs(0), schema, maxRecordsPerBatch, timeZoneId,
+  errorOnDuplicatedFieldNames, context
+)
+
+try {
+  while (arrowBatchIter.hasNext) {
+val arrowBatch = arrowBatchIter.next()
+val rowCount = arrowBatchIter.lastBatchRowCount
+
+val uuid = java.util.UUID.randomUUID()
+val blockId = ArrowBatchBlockId(uuid)
+
+val out = new ByteArrayOutputStream(32 * 1024 * 1024)
+
+val batchWriter =
+  new ArrowBatchStreamWriter(schema, out, timeZoneId, 
errorOnDuplicatedFieldNames)
+
+batchWriter.writeBatches(Iterator.single(arrowBatch))
+batchWriter.end()
+
+val blockData = out.toByteArray
+
+blockManager.putSingle[Array[Byte]](
+  blockId, blockData, StorageLevel.MEMORY_AND_DISK, tellMaster = true
+)
+chunkMetaList.append(
+  ChunkMeta(blockId.toString, rowCount, blockData.length)
+)
+  }
+} catch {
+  case e: Exception =>
+// Clean cached chunks
+for (chunkMeta <- chunkMetaList) {
+  try {
+blockManager.master.removeBlock(BlockId(chunkMeta.id))
+  } catch {
+case _: Exception => ()
+  }
+}
+throw e
+}
+
+chunkMetaList.iterator
+  }
+}
+
+class PersistDataFrameAsArrowBatchChunksPartitionEvaluatorFactory(

Review Comment:
   Hey please add corresponding scaladoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437929213


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory, 
SparkEnv, TaskContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+class PersistDataFrameAsArrowBatchChunksPartitionEvaluator(
+schema: StructType,
+timeZoneId: String,
+errorOnDuplicatedFieldNames: Boolean,
+maxRecordsPerBatch: Long
+) extends PartitionEvaluator[InternalRow, ChunkMeta] {
+
+  def eval(partitionIndex: Int, inputs: Iterator[InternalRow]*): 
Iterator[ChunkMeta] = {
+val blockManager = SparkEnv.get.blockManager
+val chunkMetaList = new ArrayBuffer[ChunkMeta]()
+
+val context = TaskContext.get()
+val arrowBatchIter = ArrowConverters.toBatchIterator(
+  inputs(0), schema, maxRecordsPerBatch, timeZoneId,
+  errorOnDuplicatedFieldNames, context
+)
+
+try {
+  while (arrowBatchIter.hasNext) {
+val arrowBatch = arrowBatchIter.next()
+val rowCount = arrowBatchIter.lastBatchRowCount
+
+val uuid = java.util.UUID.randomUUID()
+val blockId = ArrowBatchBlockId(uuid)
+
+val out = new ByteArrayOutputStream(32 * 1024 * 1024)
+
+val batchWriter =
+  new ArrowBatchStreamWriter(schema, out, timeZoneId, 
errorOnDuplicatedFieldNames)
+
+batchWriter.writeBatches(Iterator.single(arrowBatch))
+batchWriter.end()
+
+val blockData = out.toByteArray
+
+blockManager.putSingle[Array[Byte]](
+  blockId, blockData, StorageLevel.MEMORY_AND_DISK, tellMaster = true
+)
+chunkMetaList.append(
+  ChunkMeta(blockId.toString, rowCount, blockData.length)
+)
+  }
+} catch {
+  case e: Exception =>
+// Clean cached chunks
+for (chunkMeta <- chunkMetaList) {
+  try {
+blockManager.master.removeBlock(BlockId(chunkMeta.id))
+  } catch {
+case _: Exception => ()

Review Comment:
   Hm, I don't see this kind of pattern in the current usage of `removeBlock` 
in the codebase. Mind explaning a bit why we should catch and surpress the 
exception during block removal? cc @Ngone51 too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437928720


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory, 
SparkEnv, TaskContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+class PersistDataFrameAsArrowBatchChunksPartitionEvaluator(

Review Comment:
   Please add Scaladoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437928372


##
python/pyspark/sql/tests/test_chunk_read_api.py:
##
@@ -0,0 +1,122 @@
+#

Review Comment:
   Can you add these files into `modules.py` at `dev/sparktestsupport`? Tests 
are not running in CI now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437928051


##
core/src/main/scala/org/apache/spark/internal/config/Python.scala:
##
@@ -69,4 +69,12 @@ private[spark] object Python {
 .version("3.2.0")
 .booleanConf
 .createWithDefault(false)
+
+  val PYTHON_DATAFRAME_CHUNK_READ_ENABLED =
+ConfigBuilder("spark.python.dataFrameChunkRead.enabled")
+.doc("When true, driver and executes launch local cached arrow batch 
servers for serving " +
+  "pyspark DataFrame chunk read requests.")

Review Comment:
   Can we mention which API to call? at least `pyspark.sql.chunk_api`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437927991


##
core/src/main/scala/org/apache/spark/internal/config/Python.scala:
##
@@ -69,4 +69,12 @@ private[spark] object Python {
 .version("3.2.0")
 .booleanConf
 .createWithDefault(false)
+
+  val PYTHON_DATAFRAME_CHUNK_READ_ENABLED =
+ConfigBuilder("spark.python.dataFrameChunkRead.enabled")
+.doc("When true, driver and executes launch local cached arrow batch 
servers for serving " +

Review Comment:
   ```suggestion
   .doc("When true, driver and executers launch local cached Arrow batch 
servers for serving " +
   ```



##
core/src/main/scala/org/apache/spark/internal/config/Python.scala:
##
@@ -69,4 +69,12 @@ private[spark] object Python {
 .version("3.2.0")
 .booleanConf
 .createWithDefault(false)
+
+  val PYTHON_DATAFRAME_CHUNK_READ_ENABLED =
+ConfigBuilder("spark.python.dataFrameChunkRead.enabled")
+.doc("When true, driver and executes launch local cached arrow batch 
servers for serving " +
+  "pyspark DataFrame chunk read requests.")

Review Comment:
   ```suggestion
 "PySpark DataFrame chunk read requests.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437927961


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+
+
+ChunkMeta = namedtuple("ChunkMeta", ["id", "row_count", "byte_count"])
+
+
+def persist_dataframe_as_chunks(
+dataframe: DataFrame, max_records_per_batch: int
+) -> list[ChunkMeta]:
+"""
+Persist and materialize the spark dataframe as chunks, each chunk is an 
arrow batch.
+It tries to persist data to spark worker memory firstly, if memory is not 
sufficient,

Review Comment:
   You should also mention that we should enable 
`spark.python.dataFrameChunkRead.enabled` conf



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437926770


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+
+
+ChunkMeta = namedtuple("ChunkMeta", ["id", "row_count", "byte_count"])
+
+
+def persist_dataframe_as_chunks(

Review Comment:
   Should be listed in 
https://github.com/apache/spark/tree/master/python/docs/source/reference/pyspark.sql
 for documentation



##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+
+
+ChunkMeta = namedtuple("ChunkMeta", ["id", "row_count", "byte_count"])
+
+
+def persist_dataframe_as_chunks(
+dataframe: DataFrame, max_records_per_batch: int
+) -> list[ChunkMeta]:
+"""
+Persist and materialize the spark dataframe as chunks, each chunk is an 
arrow batch.
+It tries to persist data to spark worker memory firstly, if memory is not 
sufficient,
+then it fallbacks to persist spilled data to spark worker local disk.
+Return the list of tuple (chunk_id, chunk_row_count, chunk_byte_count).
+This function is only available when it is called from spark driver 
process.

Review Comment:
   Can we explicitly mark it as a developer API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437927742


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+
+
+ChunkMeta = namedtuple("ChunkMeta", ["id", "row_count", "byte_count"])

Review Comment:
   I would name it with one word.
   
   ```suggestion
   ChunkMeta = namedtuple("ChunkMeta", ["id", "rcount", "bcount"])
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437927411


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, BlockManager}
+
+
+class CachedArrowBatchServer(
+  val sparkConf: SparkConf,
+  val blockManager: BlockManager
+) extends Logging {
+
+  val authHelper = new SocketAuthHelper(sparkConf)
+
+  val serverSocket = new ServerSocket(
+0, 1, InetAddress.getLoopbackAddress()
+  )
+
+  protected def readUtf8(s: Socket): String = {
+val din = new DataInputStream(s.getInputStream())
+val len = din.readInt()
+val bytes = new Array[Byte](len)
+din.readFully(bytes)
+new String(bytes, UTF_8)
+  }
+
+  protected def writeUtf8(str: String, s: Socket): Unit = {
+val bytes = str.getBytes(UTF_8)
+val dout = new DataOutputStream(s.getOutputStream())
+dout.writeInt(bytes.length)
+dout.write(bytes, 0, bytes.length)
+dout.flush()
+  }
+
+  private def handleConnection(sock: Socket): Unit = {
+val blockId = BlockId(readUtf8(sock))
+assert(blockId.isInstanceOf[ArrowBatchBlockId])
+
+var errMessage = "ok"
+var blockDataOpt: Option[Array[Byte]] = None
+
+try {
+  val blockResult = blockManager.get[Array[Byte]](blockId)
+  if (blockResult.isDefined) {
+blockDataOpt = 
Some(blockResult.get.data.next().asInstanceOf[Array[Byte]])
+  } else {
+errMessage = s"The chunk $blockId data cache does not exist or has 
been removed"
+  }
+} catch {
+  case e: Exception =>
+errMessage = e.getMessage
+}
+
+writeUtf8(errMessage, sock)
+
+if (blockDataOpt.isDefined) {
+  val out = new BufferedOutputStream(sock.getOutputStream())
+  out.write(blockDataOpt.get)
+  out.flush()
+}
+  }
+
+  def createConnectionThread(sock: Socket, threadName: String): Unit = {
+new Thread(threadName) {
+  setDaemon(true)
+
+  override def run(): Unit = {
+try {
+  authHelper.authClient(sock)
+  handleConnection(sock)
+} finally {
+  JavaUtils.closeQuietly(sock)
+}
+  }
+}.start()
+  }
+
+  def start(): Unit = {
+logTrace("Creating listening socket")
+
+new Thread(s"CachedArrowBatchServer-listener") {

Review Comment:
   ```suggestion
   new Thread("CachedArrowBatchServer-listener") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437927145


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#

Review Comment:
   module name shouldn't better have `_` there especially for user-facing one. 
I would just rename it to `chunk` from `chunk_api`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437926996


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+
+
+ChunkMeta = namedtuple("ChunkMeta", ["id", "row_count", "byte_count"])
+
+
+def persist_dataframe_as_chunks(
+dataframe: DataFrame, max_records_per_batch: int
+) -> list[ChunkMeta]:
+"""
+Persist and materialize the spark dataframe as chunks, each chunk is an 
arrow batch.
+It tries to persist data to spark worker memory firstly, if memory is not 
sufficient,
+then it fallbacks to persist spilled data to spark worker local disk.
+Return the list of tuple (chunk_id, chunk_row_count, chunk_byte_count).
+This function is only available when it is called from spark driver 
process.
+"""
+spark = dataframe.sparkSession
+if spark is None:
+raise PySparkRuntimeError("Active spark session is required.")
+
+sc = spark.sparkContext
+if sc.getConf().get("spark.python.dataFrameChunkRead.enabled", 
"false").lower() != "true":

Review Comment:
   Is this SQL API or core API? If it's a SQL API, we should put this into 
`StaticSQLConf`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437926884


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+
+
+ChunkMeta = namedtuple("ChunkMeta", ["id", "row_count", "byte_count"])
+
+
+def persist_dataframe_as_chunks(
+dataframe: DataFrame, max_records_per_batch: int
+) -> list[ChunkMeta]:
+"""
+Persist and materialize the spark dataframe as chunks, each chunk is an 
arrow batch.
+It tries to persist data to spark worker memory firstly, if memory is not 
sufficient,
+then it fallbacks to persist spilled data to spark worker local disk.
+Return the list of tuple (chunk_id, chunk_row_count, chunk_byte_count).
+This function is only available when it is called from spark driver 
process.
+"""
+spark = dataframe.sparkSession
+if spark is None:
+raise PySparkRuntimeError("Active spark session is required.")

Review Comment:
   Use error class. @itholic why can't linter catch this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437926833


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa

Review Comment:
   Should check if the Arrow is installed or not. Use 
`require_minimum_pyarrow_version`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437926788


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+

Review Comment:
   Define `__all__` for public API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437926770


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+
+
+ChunkMeta = namedtuple("ChunkMeta", ["id", "row_count", "byte_count"])
+
+
+def persist_dataframe_as_chunks(

Review Comment:
   Should be listed in 
https://github.com/apache/spark/tree/master/python/docs/source/reference/pyspark.sql
 for documentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437926648


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+
+
+ChunkMeta = namedtuple("ChunkMeta", ["id", "row_count", "byte_count"])
+
+
+def persist_dataframe_as_chunks(
+dataframe: DataFrame, max_records_per_batch: int
+) -> list[ChunkMeta]:
+"""
+Persist and materialize the spark dataframe as chunks, each chunk is an 
arrow batch.

Review Comment:
   Let's follow numpydoc style 
(https://numpydoc.readthedocs.io/en/latest/format.html). Should also add 
version added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


HyukjinKwon commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437926486


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+
+
+ChunkMeta = namedtuple("ChunkMeta", ["id", "row_count", "byte_count"])
+
+
+def persist_dataframe_as_chunks(

Review Comment:
   `persistDataframeAsChunks` and same for below to be naming consistency



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-28 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1437532433


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, BlockManager}
+
+
+class CachedArrowBatchServer(

Review Comment:
   shall we add a classdoc? To briefly introduce how this service works



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-27 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436987470


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory, 
SparkEnv, TaskContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
+import org.apache.spark.sql.types.{StructType}
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+class PersistDataFrameAsArrowBatchChunksPartitionEvaluator(
+schema: StructType,
+timeZoneId: String,
+errorOnDuplicatedFieldNames: Boolean
+) extends PartitionEvaluator[(Array[Byte], Long), ChunkMeta] {
+
+  def eval(partitionIndex: Int, inputs: Iterator[(Array[Byte], Long)]*): 
Iterator[ChunkMeta] = {
+val blockManager = SparkEnv.get.blockManager
+val chunkMetaList = new ArrayBuffer[ChunkMeta]()
+
+try {
+  for ((arrowBatch, rowCount) <- inputs(0)) {
+val uuid = java.util.UUID.randomUUID()
+val blockId = ArrowBatchBlockId(uuid)
+
+val out = new ByteArrayOutputStream(32 * 1024 * 1024)
+
+val batchWriter =
+  new ArrowBatchStreamWriter(schema, out, timeZoneId, 
errorOnDuplicatedFieldNames)
+
+batchWriter.writeBatches(Iterator.single(arrowBatch))
+batchWriter.end()
+
+val blockData = out.toByteArray
+
+blockManager.putSingle[Array[Byte]](
+  blockId, blockData, StorageLevel.MEMORY_AND_DISK, tellMaster = true
+)
+chunkMetaList.append(
+  ChunkMeta(blockId.toString, rowCount, blockData.length)
+)
+  }
+} catch {
+  case e: Exception =>
+// Clean cached chunks
+for (chunkMeta <- chunkMetaList) {
+  try {
+blockManager.master.removeBlock(BlockId(chunkMeta.id))
+  } catch {
+case _: Exception => ()
+  }
+}
+throw e
+}
+
+chunkMetaList.iterator
+  }
+}
+
+class PersistDataFrameAsArrowBatchChunksPartitionEvaluatorFactory(
+schema: StructType,
+timeZoneId: String,
+errorOnDuplicatedFieldNames: Boolean
+) extends PartitionEvaluatorFactory[(Array[Byte], Long), ChunkMeta] {
+
+  def createEvaluator(): PartitionEvaluator[(Array[Byte], Long), ChunkMeta] = {
+new PersistDataFrameAsArrowBatchChunksPartitionEvaluator(
+  schema, timeZoneId, errorOnDuplicatedFieldNames
+)
+  }
+}
+
+object ChunkReadUtils {
+
+  def persistDataFrameAsArrowBatchChunks(
+  dataFrame: DataFrame, maxRecordsPerBatch: Int
+  ): Array[ChunkMeta] = {
+val sparkSession = SparkSession.getActiveSession.get
+
+val maxRecordsPerBatchVal = if (maxRecordsPerBatch == -1) {
+  sparkSession.sessionState.conf.arrowMaxRecordsPerBatch
+} else {
+  maxRecordsPerBatch
+}
+val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
+val errorOnDuplicatedFieldNames =
+  sparkSession.sessionState.conf.pandasStructHandlingMode == "legacy"
+val schema = dataFrame.schema
+
+val rdd = dataFrame.queryExecution.toRdd.mapPartitionsInternal { iter =>

Review Comment:
   can we do all of these in one `mapPartitionsWithEvaluator` call?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-27 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436987002


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -4526,4 +4526,35 @@ class Dataset[T] private[sql](
   private[sql] def toArrowBatchRdd: RDD[Array[Byte]] = {
 toArrowBatchRdd(queryExecution.executedPlan)
   }
+
+  /** Convert to an RDD of serialized ArrowRecordBatches. */
+  private[sql] def toArrowBatchRddWithBatchRowCount(

Review Comment:
   We can remove it now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-27 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436984060


##
core/src/main/scala/org/apache/spark/SparkContext.scala:
##
@@ -379,6 +380,14 @@ class SparkContext(config: SparkConf) extends Logging {
 override protected def initialValue(): Properties = new Properties()
   }
 
+  private[spark] def cachedArrowBatchServerPort: Int = {
+_cachedArrowBatchServer.get.serverSocket.getLocalPort
+  }
+
+  private[spark] def cachedArrowBatchServerSecret: String = {

Review Comment:
   should return Option?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436737380


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.execution.arrow.ArrowBatchStreamWriter
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+object ChunkReadUtils {
+
+  def persistDataFrameAsArrowBatchChunks(
+  dataFrame: DataFrame, maxRecordsPerBatch: Int
+  ): Array[ChunkMeta] = {
+val sparkSession = SparkSession.getActiveSession.get
+val rdd = dataFrame.toArrowBatchRddWithBatchRowCount(maxRecordsPerBatch)
+val schemaJson = dataFrame.schema.json

Review Comment:
   I don't think so ? I think only class with `Serializable` trait is 
serializable but DataType classes are not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436737380


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.execution.arrow.ArrowBatchStreamWriter
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+object ChunkReadUtils {
+
+  def persistDataFrameAsArrowBatchChunks(
+  dataFrame: DataFrame, maxRecordsPerBatch: Int
+  ): Array[ChunkMeta] = {
+val sparkSession = SparkSession.getActiveSession.get
+val rdd = dataFrame.toArrowBatchRddWithBatchRowCount(maxRecordsPerBatch)
+val schemaJson = dataFrame.schema.json

Review Comment:
   I don't think so ? I think only class with `Serializable` trait is 
serializable but DataType classes are not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436734704


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId}
+
+
+class CachedArrowBatchServer extends Logging {

Review Comment:
   No.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436734212


##
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##
@@ -99,6 +99,10 @@ class SparkEnv (
 
   private[spark] var executorBackend: Option[ExecutorBackend] = None
 
+  private[spark] var cachedArrowBatchServerPort: Option[Int] = None
+
+  private[spark] var cachedArrowBatchServerSecret: Option[String] = None

Review Comment:
   Pyspark code will call these functions, tuple type return value makes python 
code unclear (we have to write `._1()` `._2()` in python code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436448993


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.execution.arrow.ArrowBatchStreamWriter
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+object ChunkReadUtils {
+
+  def persistDataFrameAsArrowBatchChunks(
+  dataFrame: DataFrame, maxRecordsPerBatch: Int
+  ): Array[ChunkMeta] = {
+val sparkSession = SparkSession.getActiveSession.get
+val rdd = dataFrame.toArrowBatchRddWithBatchRowCount(maxRecordsPerBatch)

Review Comment:
   We only call `toArrowBatchRddWithBatchRowCount` once. Shall we inline it? We 
split arrow batches and save them to the block manager at the same time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436448682


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.execution.arrow.ArrowBatchStreamWriter
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+object ChunkReadUtils {
+
+  def persistDataFrameAsArrowBatchChunks(
+  dataFrame: DataFrame, maxRecordsPerBatch: Int
+  ): Array[ChunkMeta] = {
+val sparkSession = SparkSession.getActiveSession.get
+val rdd = dataFrame.toArrowBatchRddWithBatchRowCount(maxRecordsPerBatch)
+val schemaJson = dataFrame.schema.json
+val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
+val errorOnDuplicatedFieldNames =
+  sparkSession.sessionState.conf.pandasStructHandlingMode == "legacy"
+rdd.mapPartitions { iter: Iterator[(Array[Byte], Long)] =>

Review Comment:
   let's use the new `rdd.mapPartitionsWithEvaluator`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436447831


##
sql/core/src/main/scala/org/apache/spark/sql/api/python/ChunkReadUtils.scala:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.execution.arrow.ArrowBatchStreamWriter
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId, StorageLevel}
+
+
+case class ChunkMeta(
+  id: String,
+  rowCount: Long,
+  byteCount: Long
+)
+
+object ChunkReadUtils {
+
+  def persistDataFrameAsArrowBatchChunks(
+  dataFrame: DataFrame, maxRecordsPerBatch: Int
+  ): Array[ChunkMeta] = {
+val sparkSession = SparkSession.getActiveSession.get
+val rdd = dataFrame.toArrowBatchRddWithBatchRowCount(maxRecordsPerBatch)
+val schemaJson = dataFrame.schema.json

Review Comment:
   why do we have this? DataType is serializable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436447341


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -4494,4 +4494,36 @@ class Dataset[T] private[sql](
   private[sql] def toArrowBatchRdd: RDD[Array[Byte]] = {
 toArrowBatchRdd(queryExecution.executedPlan)
   }
+
+  /** Convert to an RDD of serialized ArrowRecordBatches. */
+  private[sql] def toArrowBatchRddWithBatchRowCount(
+  maxRecordsPerBatch: Int
+  ): RDD[(Array[Byte], Long)] = {
+val plan = queryExecution.executedPlan

Review Comment:
   ```suggestion
   val rdd = queryExecution.toRdd
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436446249


##
python/pyspark/sql/chunk_api.py:
##
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+from collections import namedtuple
+
+import pyarrow as pa
+
+from pyspark.rdd import _create_local_socket
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+from pyspark.serializers import read_with_length, write_with_length
+from pyspark.sql.pandas.serializers import ArrowStreamSerializer
+from pyspark.errors import PySparkRuntimeError
+
+
+ChunkMeta = namedtuple("ChunkMeta", ["id", "row_count", "byte_count"])
+
+
+def persist_dataframe_as_chunks(
+dataframe: DataFrame, max_records_per_batch: int
+) -> list[ChunkMeta]:
+"""
+Persist and materialize the spark dataframe as chunks, each chunk is an 
arrow batch.
+It tries to persist data to spark worker memory firstly, if memory is not 
sufficient,
+then it fallbacks to persist spilled data to spark worker local disk.
+Return the list of tuple (chunk_id, chunk_row_count, chunk_byte_count).
+This function is only available when it is called from spark driver 
process.
+"""
+spark = SparkSession.getActiveSession()

Review Comment:
   We can get the SparkSession instance from `dataframe`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436445341


##
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##
@@ -347,6 +348,22 @@ private[spark] class Executor(
 
   metricsPoller.start()
 
+  val cachedArrowBatchServer: Option[CachedArrowBatchServer] = if (
+SparkEnv.get.conf.get(Python.PYTHON_DATAFRAME_CHUNK_READ_ENABLED)

Review Comment:
   `Executor` has a `env` field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436444680


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId}
+
+
+class CachedArrowBatchServer extends Logging {

Review Comment:
   do we copy-pastes the code of this class from somewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436444557


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId}
+
+
+class CachedArrowBatchServer extends Logging {
+
+  val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
+
+  val serverSocket = new ServerSocket(
+0, 1, InetAddress.getLoopbackAddress()
+  )
+
+  protected def readUtf8(s: Socket): String = {
+val din = new DataInputStream(s.getInputStream())
+val len = din.readInt()
+val bytes = new Array[Byte](len)
+din.readFully(bytes)
+new String(bytes, UTF_8)
+  }
+
+  protected def writeUtf8(str: String, s: Socket): Unit = {
+val bytes = str.getBytes(UTF_8)
+val dout = new DataOutputStream(s.getOutputStream())
+dout.writeInt(bytes.length)
+dout.write(bytes, 0, bytes.length)
+dout.flush()
+  }
+
+  private def handleConnection(sock: Socket): Unit = {
+val blockId = BlockId(readUtf8(sock))
+assert(blockId.isInstanceOf[ArrowBatchBlockId])
+
+val blockManager = SparkEnv.get.blockManager

Review Comment:
   can we pass `blockManager` as a constructor parameter to 
`CachedArrowBatchServer`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436443461


##
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##
@@ -99,6 +99,10 @@ class SparkEnv (
 
   private[spark] var executorBackend: Option[ExecutorBackend] = None
 
+  private[spark] var cachedArrowBatchServerPort: Option[Int] = None
+
+  private[spark] var cachedArrowBatchServerSecret: Option[String] = None

Review Comment:
   shall we have a single function to return a tuple2?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-26 Thread via GitHub


cloud-fan commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1436443250


##
core/src/main/scala/org/apache/spark/SparkContext.scala:
##
@@ -379,6 +380,12 @@ class SparkContext(config: SparkConf) extends Logging {
 override protected def initialValue(): Properties = new Properties()
   }
 
+  private var _cachedArrowBatchServerPort: Option[Int] = None
+  private[spark] def cachedArrowBatchServerPort: Int = 
_cachedArrowBatchServerPort.get
+
+  private var _cachedArrowBatchServerSecret: Option[String] = None

Review Comment:
   Do we need the above two `var`s? Seems we can easily get it from 
`_cachedArrowBatchServer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-18 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1430810508


##
core/src/main/scala/org/apache/spark/storage/BlockId.scala:
##
@@ -185,6 +185,11 @@ private[spark] case class TestBlockId(id: String) extends 
BlockId {
   override def name: String = "test_" + id
 }
 
+private[spark] case class ArrowBatchBlockId(id: UUID) extends BlockId {
+  override def name: String = "arrow_batch_" + id
+}

Review Comment:
   I will do it in follow-up PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-15 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1427999506


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId}
+
+
+class CachedArrowBatchServer extends Logging {
+
+  val serverSocket = new ServerSocket(
+0, 1, InetAddress.getLoopbackAddress()
+  )
+
+  protected def readUtf8(s: Socket): String = {
+val din = new DataInputStream(s.getInputStream())
+val len = din.readInt()
+val bytes = new Array[Byte](len)
+din.readFully(bytes)
+new String(bytes, UTF_8)
+  }
+
+  protected def writeUtf8(str: String, s: Socket): Unit = {
+val bytes = str.getBytes(UTF_8)
+val dout = new DataOutputStream(s.getOutputStream())
+dout.writeInt(bytes.length)
+dout.write(bytes, 0, bytes.length)
+dout.flush()
+  }
+
+  def handleConnection(sock: Socket): Unit = {
+val blockId = BlockId(readUtf8(sock))
+assert(blockId.isInstanceOf[ArrowBatchBlockId])
+
+val blockManager = SparkEnv.get.blockManager
+
+val blockData =
+  
blockManager.get[Array[Byte]](blockId).get.data.next().asInstanceOf[Array[Byte]]
+val out = new BufferedOutputStream(sock.getOutputStream())
+out.write(blockData)
+out.flush()
+  }
+
+  def start(): (Int, String) = {
+logTrace("Creating listening socket")
+
+val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
+
+new Thread(s"CachedArrowBatchServer-listener") {
+  setDaemon(true)
+
+  override def run(): Unit = {
+var sock: Socket = null
+
+var connectionCount = 0
+try {
+  while (true) {
+sock = serverSocket.accept()
+connectionCount += 1
+new Thread(s"CachedArrowBatchServer-connection-$connectionCount") {

Review Comment:
   Rethink this:
   
   thread pool is for reducing thread creation overhead, for this use-case, 
because each thread handles a batch, one batch is usually 64MB or larger, so 
that thread creation overhead is negligible.
   
   On the other hand, thread-pool limits the maximum concurrency, but we don't 
want to limit the concurrency because we cannot estimate parallelism of 
requests created by 3rd-party libs.
   
   So that I think using thread is still better :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-13 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1425410174


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId}
+
+
+class CachedArrowBatchServer extends Logging {
+
+  val serverSocket = new ServerSocket(
+0, 1, InetAddress.getLoopbackAddress()
+  )
+
+  protected def readUtf8(s: Socket): String = {
+val din = new DataInputStream(s.getInputStream())
+val len = din.readInt()
+val bytes = new Array[Byte](len)
+din.readFully(bytes)
+new String(bytes, UTF_8)
+  }
+
+  protected def writeUtf8(str: String, s: Socket): Unit = {
+val bytes = str.getBytes(UTF_8)
+val dout = new DataOutputStream(s.getOutputStream())
+dout.writeInt(bytes.length)
+dout.write(bytes, 0, bytes.length)
+dout.flush()
+  }
+
+  def handleConnection(sock: Socket): Unit = {
+val blockId = BlockId(readUtf8(sock))
+assert(blockId.isInstanceOf[ArrowBatchBlockId])
+
+val blockManager = SparkEnv.get.blockManager
+
+val blockData =
+  
blockManager.get[Array[Byte]](blockId).get.data.next().asInstanceOf[Array[Byte]]
+val out = new BufferedOutputStream(sock.getOutputStream())
+out.write(blockData)
+out.flush()
+  }
+
+  def start(): (Int, String) = {
+logTrace("Creating listening socket")
+
+val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
+
+new Thread(s"CachedArrowBatchServer-listener") {
+  setDaemon(true)
+
+  override def run(): Unit = {
+var sock: Socket = null
+
+var connectionCount = 0
+try {
+  while (true) {
+sock = serverSocket.accept()
+connectionCount += 1
+new Thread(s"CachedArrowBatchServer-connection-$connectionCount") {

Review Comment:
   The connection concurrency depends on `read_chunk` calls concurrency 
dispatched to the server, it's up to how 3rd-party datasource loader use it, 
but thread pool is a good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-13 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1425407299


##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId}
+
+
+class CachedArrowBatchServer extends Logging {
+
+  val serverSocket = new ServerSocket(
+0, 1, InetAddress.getLoopbackAddress()
+  )
+
+  protected def readUtf8(s: Socket): String = {
+val din = new DataInputStream(s.getInputStream())
+val len = din.readInt()
+val bytes = new Array[Byte](len)
+din.readFully(bytes)
+new String(bytes, UTF_8)
+  }
+
+  protected def writeUtf8(str: String, s: Socket): Unit = {
+val bytes = str.getBytes(UTF_8)
+val dout = new DataOutputStream(s.getOutputStream())
+dout.writeInt(bytes.length)
+dout.write(bytes, 0, bytes.length)
+dout.flush()
+  }
+
+  def handleConnection(sock: Socket): Unit = {
+val blockId = BlockId(readUtf8(sock))
+assert(blockId.isInstanceOf[ArrowBatchBlockId])
+
+val blockManager = SparkEnv.get.blockManager
+
+val blockData =
+  
blockManager.get[Array[Byte]](blockId).get.data.next().asInstanceOf[Array[Byte]]
+val out = new BufferedOutputStream(sock.getOutputStream())
+out.write(blockData)
+out.flush()

Review Comment:
   It closes the socket in the caller function `run`, see line 
`JavaUtils.closeQuietly(sock)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-13 Thread via GitHub


Ngone51 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1425340638


##
core/src/main/scala/org/apache/spark/SparkContext.scala:
##
@@ -486,6 +493,12 @@ class SparkContext(config: SparkConf) extends Logging {
 _env = createSparkEnv(_conf, isLocal, listenerBus)
 SparkEnv.set(_env)
 
+val cachedArrowBatchServer = new CachedArrowBatchServer()

Review Comment:
   Shall we add a feature flag?



##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId}
+
+
+class CachedArrowBatchServer extends Logging {
+
+  val serverSocket = new ServerSocket(
+0, 1, InetAddress.getLoopbackAddress()
+  )
+
+  protected def readUtf8(s: Socket): String = {
+val din = new DataInputStream(s.getInputStream())
+val len = din.readInt()
+val bytes = new Array[Byte](len)
+din.readFully(bytes)
+new String(bytes, UTF_8)
+  }
+
+  protected def writeUtf8(str: String, s: Socket): Unit = {
+val bytes = str.getBytes(UTF_8)
+val dout = new DataOutputStream(s.getOutputStream())
+dout.writeInt(bytes.length)
+dout.write(bytes, 0, bytes.length)
+dout.flush()
+  }
+
+  def handleConnection(sock: Socket): Unit = {
+val blockId = BlockId(readUtf8(sock))
+assert(blockId.isInstanceOf[ArrowBatchBlockId])
+
+val blockManager = SparkEnv.get.blockManager
+
+val blockData =
+  
blockManager.get[Array[Byte]](blockId).get.data.next().asInstanceOf[Array[Byte]]
+val out = new BufferedOutputStream(sock.getOutputStream())
+out.write(blockData)
+out.flush()

Review Comment:
   ```suggestion
   out.flush()
   out.close()
   ```



##
core/src/main/scala/org/apache/spark/api/python/CachedArrowBatchServer.scala:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{BufferedOutputStream, DataInputStream, DataOutputStream}
+import java.net.{InetAddress, ServerSocket, Socket}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.SocketAuthHelper
+import org.apache.spark.storage.{ArrowBatchBlockId, BlockId}
+
+
+class CachedArrowBatchServer extends Logging {
+
+  val serverSocket = new ServerSocket(
+0, 1, InetAddress.getLoopbackAddress()
+  )
+
+  protected def readUtf8(s: Socket): String = {
+val din = new DataInputStream(s.getInputStream())
+val len = din.readInt()
+val bytes = new Array[Byte](len)
+din.readFully(bytes)
+new String(bytes, UTF_8)
+  }
+
+  protected def writeUtf8(str: String, s: Socket): Unit = {
+val bytes = str.getBytes(UTF_8)
+val dout = new DataOutputStream(s.getOutputStream())
+dout.writeInt(bytes.length)
+dout.write(bytes, 0, bytes.length)
+

Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-13 Thread via GitHub


WeichenXu123 commented on code in PR #44294:
URL: https://github.com/apache/spark/pull/44294#discussion_r1425190857


##
core/src/main/scala/org/apache/spark/storage/BlockId.scala:
##
@@ -185,6 +185,11 @@ private[spark] case class TestBlockId(id: String) extends 
BlockId {
   override def name: String = "test_" + id
 }
 
+private[spark] case class ArrowBatchBlockId(id: UUID) extends BlockId {
+  override def name: String = "arrow_batch_" + id
+}

Review Comment:
   TODO: add a new RDD cache decommission migration type and config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46361][PYTHON][CORE] Spark dataset chunk read api [spark]

2023-12-12 Thread via GitHub


WeichenXu123 commented on PR #44294:
URL: https://github.com/apache/spark/pull/44294#issuecomment-1851481041

   Manual testing passed. The PR is ready for initial pass review. I will add 
unit test later. CC @Ngone51 @cloud-fan @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org