Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-11 Thread via GitHub


HeartSaVioR closed pull request #45023: [SPARK-46962][SS][PYTHON] Add interface 
for python streaming data source API and implement python worker to run python 
streaming data source
URL: https://github.com/apache/spark/pull/45023


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-11 Thread via GitHub


HeartSaVioR commented on PR #45023:
URL: https://github.com/apache/spark/pull/45023#issuecomment-1990058421

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-11 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1520802905


##
python/pyspark/sql/datasource.py:
##
@@ -426,6 +426,10 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 in the final DataFrame.
 """
 ...
+raise PySparkNotImplementedError(

Review Comment:
   @chaoqin-li1123 
   nit: if not needed, let's remove @abstract in the method definition.
   Please do this as follow-up PR. I'd like to unblock this PR to let you move 
forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-11 Thread via GitHub


allisonwang-db commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1520455477


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  pythonWorker.get.channel.socket().getOutputStream, bufferSize)
+dataOut = new DataOutputStream(stream)
+
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+
+val pythonIncludes = func.pythonIncludes.asScala.toSet
+PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, 
dataOut)
+
+// Send the user function to python process
+PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+// Send output schema
+PythonWorkerUtils.writeUTF(outputSchema.json, dataOut)
+
+dataOut.flush()
+
+dataIn = new 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-11 Thread via GitHub


allisonwang-db commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1520455477


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  pythonWorker.get.channel.socket().getOutputStream, bufferSize)
+dataOut = new DataOutputStream(stream)
+
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+
+val pythonIncludes = func.pythonIncludes.asScala.toSet
+PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, 
dataOut)
+
+// Send the user function to python process
+PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+// Send output schema
+PythonWorkerUtils.writeUTF(outputSchema.json, dataOut)
+
+dataOut.flush()
+
+dataIn = new 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-11 Thread via GitHub


allisonwang-db commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1520450074


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def latestOffset(self) -> dict:
+"""
+Returns the most recent offset available.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def latestOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "latestOffset"},
+)
+
+def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
+"""
+Returns a list of InputPartition  given the start and end offsets. 
Each InputPartition
+represents a data split that can be processed by one Spark task.
+
+Parameters
+--
+start : dict
+The start offset of the microbatch to plan partitioning.
+end : dict
+The end offset of the microbatch to plan partitioning.
+
+Returns
+---
+Sequence[InputPartition]
+A sequence of partitions for this data source. Each partition value
+must be an instance of `InputPartition` or a subclass of it.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "partitions"},
+)
+
+@abstractmethod
+def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]:

Review Comment:
   Yes let's make it `Iterator[Tuple]`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-11 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1520368942


##
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, 
shouldTestPandasUDFs}
+import 
org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, 
PythonMicroBatchStream, PythonStreamingSourceOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
+
+  protected def simpleDataStreamReaderScript: String =
+"""
+  |from pyspark.sql.datasource import DataSourceStreamReader, 
InputPartition
+  |
+  |class SimpleDataStreamReader(DataSourceStreamReader):
+  |def initialOffset(self):
+  |return {"offset": {"partition-1": 0}}
+  |def latestOffset(self):
+  |return {"offset": {"partition-1": 2}}
+  |def partitions(self, start: dict, end: dict):
+  |start_index = start["offset"]["partition-1"]
+  |end_index = end["offset"]["partition-1"]
+  |return [InputPartition(i) for i in range(start_index, 
end_index)]
+  |def commit(self, end: dict):
+  |1 + 2
+  |def read(self, partition):
+  |yield (0, partition.value)
+  |yield (1, partition.value)
+  |yield (2, partition.value)
+  |""".stripMargin
+
+  protected def errorDataStreamReaderScript: String =
+"""
+  |from pyspark.sql.datasource import DataSourceStreamReader, 
InputPartition
+  |
+  |class ErrorDataStreamReader(DataSourceStreamReader):
+  |def initialOffset(self):
+  |raise Exception("error reading initial offset")
+  |def latestOffset(self):
+  |raise Exception("error reading latest offset")
+  |def partitions(self, start: dict, end: dict):
+  |raise Exception("error planning partitions")
+  |def commit(self, end: dict):
+  |raise Exception("error committing offset")
+  |def read(self, partition):
+  |yield (0, partition.value)
+  |yield (1, partition.value)
+  |yield (2, partition.value)
+  |""".stripMargin
+
+  private val errorDataSourceName = "ErrorDataSource"
+
+  test("simple data stream source") {
+assume(shouldTestPandasUDFs)
+val dataSourceScript =
+  s"""
+ |from pyspark.sql.datasource import DataSource
+ |$simpleDataStreamReaderScript
+ |
+ |class $dataSourceName(DataSource):
+ |def streamReader(self, schema):
+ |return SimpleDataStreamReader()
+ |""".stripMargin
+val inputSchema = StructType.fromDDL("input BINARY")
+
+val dataSource = createUserDefinedPythonDataSource(dataSourceName, 
dataSourceScript)
+spark.dataSource.registerPython(dataSourceName, dataSource)
+val pythonDs = new PythonDataSourceV2
+pythonDs.setShortName("SimpleDataSource")
+val stream = new PythonMicroBatchStream(
+  pythonDs, dataSourceName, inputSchema, CaseInsensitiveStringMap.empty())
+
+val initialOffset = stream.initialOffset()
+assert(initialOffset.json == "{\"offset\": {\"partition-1\": 0}}")
+for (_ <- 1 to 50) {

Review Comment:
   It is a stress test to prove that the RPC calls can be invoked many times.



##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-11 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1520229021


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.

Review Comment:
   It is just discouraged, list and float are also supposed to work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-11 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1519254087


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.

Review Comment:
   Say `str, int, bool`, as these are actual representation of "python" type? 
   https://docs.python.org/3/library/json.html#json.dump
   
   Also do we disallow other types or they are just discouraged to use (for us) 
so we don't document? There are more types JSON module in the python standard 
library supports.
   https://docs.python.org/3/library/json.html#json.JSONDecoder



##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def latestOffset(self) -> dict:
+"""
+Returns the most recent offset available.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes

Review Comment:
   ditto



##
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, 
shouldTestPandasUDFs}
+import 
org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, 
PythonMicroBatchStream, PythonStreamingSourceOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
+
+  protected def simpleDataStreamReaderScript: String =
+"""
+  |from pyspark.sql.datasource import DataSourceStreamReader, 
InputPartition
+  |
+  |class SimpleDataStreamReader(DataSourceStreamReader):
+  |def initialOffset(self):
+  |return {"offset": {"partition-1": 0}}
+  |def latestOffset(self):
+  |return {"offset": {"partition-1": 2}}
+  |def partitions(self, start: dict, end: dict):
+  

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-08 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1518252566


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  val initialOffsetFuncId = 884
+  val latestOffsetFuncId = 885
+  val partitionsFuncId = 886
+  val commitFuncId = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  protected val bufferSize: Int = conf.get(BUFFER_SIZE)
+  protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  protected val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)

Review Comment:
   Thanks for pointing it out, I removed the redundant conf reset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-08 Thread via GitHub


WweiL commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1518173173


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  pythonWorker.get.channel.socket().getOutputStream, bufferSize)
+dataOut = new DataOutputStream(stream)
+
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+
+val pythonIncludes = func.pythonIncludes.asScala.toSet
+PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, 
dataOut)
+
+// Send the user function to python process
+PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+// Send output schema
+PythonWorkerUtils.writeUTF(outputSchema.json, dataOut)
+
+dataOut.flush()
+
+dataIn = new 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-08 Thread via GitHub


WweiL commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1518173173


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  pythonWorker.get.channel.socket().getOutputStream, bufferSize)
+dataOut = new DataOutputStream(stream)
+
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+
+val pythonIncludes = func.pythonIncludes.asScala.toSet
+PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, 
dataOut)
+
+// Send the user function to python process
+PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+// Send output schema
+PythonWorkerUtils.writeUTF(outputSchema.json, dataOut)
+
+dataOut.flush()
+
+dataIn = new 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-08 Thread via GitHub


WweiL commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1518165666


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  val initialOffsetFuncId = 884
+  val latestOffsetFuncId = 885
+  val partitionsFuncId = 886
+  val commitFuncId = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  protected val bufferSize: Int = conf.get(BUFFER_SIZE)
+  protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  protected val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)

Review Comment:
   Yes I agree we don't need to do this temporary set-reset here. This logic 
was in connect StreamingPythonRunner because of some concurrnet development 
issue. I'll also create a PR to remove it there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-08 Thread via GitHub


HyukjinKwon commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517362978


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def latestOffset(self) -> dict:
+"""
+Returns the most recent offset available.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def latestOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "latestOffset"},
+)
+
+def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
+"""
+Returns a list of InputPartition  given the start and end offsets. 
Each InputPartition
+represents a data split that can be processed by one Spark task.
+
+Parameters
+--
+start : dict
+The start offset of the microbatch to plan partitioning.
+end : dict
+The end offset of the microbatch to plan partitioning.
+
+Returns
+---
+Sequence[InputPartition]
+A sequence of partitions for this data source. Each partition value
+must be an instance of `InputPartition` or a subclass of it.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "partitions"},
+)
+
+@abstractmethod
+def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]:

Review Comment:
   It has to be `Iterator[Tuple]`. We should fix batch side too. Row inherits 
`tuple` Mind fixing it here together if you don't mind. cc @allisonwang-db 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-08 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517359698


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  pythonWorker.get.channel.socket().getOutputStream, bufferSize)
+dataOut = new DataOutputStream(stream)
+
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+
+val pythonIncludes = func.pythonIncludes.asScala.toSet
+PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, 
dataOut)
+
+// Send the user function to python process
+PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+// Send output schema
+PythonWorkerUtils.writeUTF(outputSchema.json, dataOut)
+
+dataOut.flush()
+
+dataIn = new 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517356903


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  pythonWorker.get.channel.socket().getOutputStream, bufferSize)
+dataOut = new DataOutputStream(stream)
+
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+
+val pythonIncludes = func.pythonIncludes.asScala.toSet
+PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, 
dataOut)
+
+// Send the user function to python process
+PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+// Send output schema
+PythonWorkerUtils.writeUTF(outputSchema.json, dataOut)
+
+dataOut.flush()
+
+dataIn = new 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517356903


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  pythonWorker.get.channel.socket().getOutputStream, bufferSize)
+dataOut = new DataOutputStream(stream)
+
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+
+val pythonIncludes = func.pythonIncludes.asScala.toSet
+PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, 
dataOut)
+
+// Send the user function to python process
+PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+// Send output schema
+PythonWorkerUtils.writeUTF(outputSchema.json, dataOut)
+
+dataOut.flush()
+
+dataIn = new 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517356072


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def latestOffset(self) -> dict:
+"""
+Returns the most recent offset available.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def latestOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "latestOffset"},
+)
+
+def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
+"""
+Returns a list of InputPartition  given the start and end offsets. 
Each InputPartition
+represents a data split that can be processed by one Spark task.
+
+Parameters
+--
+start : dict
+The start offset of the microbatch to plan partitioning.
+end : dict
+The end offset of the microbatch to plan partitioning.
+
+Returns
+---
+Sequence[InputPartition]
+A sequence of partitions for this data source. Each partition value
+must be an instance of `InputPartition` or a subclass of it.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "partitions"},
+)
+
+@abstractmethod
+def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]:

Review Comment:
   I copy the interface from batch python data source.  Why is this 
Iterator[Union] instead of Union[Iterator]? @HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517351498


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {

Review Comment:
   Could either @rangadi or @WweiL take a look? This is somewhat similar with 
foreachBatch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517348253


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:

Review Comment:
   IDE seems not catching this case. Things are runtime as Jungtaek said so 
it'd be difficult to statically prevent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517209325


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:

Review Comment:
   python will always fail in runtime, so it's more likely whichever error 
message is better to understand...
   
   But, I don't know, IDE could be the rescue. @HyukjinKwon Will IDE be able to 
point out if we define this as abstract and implementation class does not 
implement this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


allisonwang-db commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1516609093


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  val initialOffsetFuncId = 884
+  val latestOffsetFuncId = 885
+  val partitionsFuncId = 886
+  val commitFuncId = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  protected val bufferSize: Int = conf.get(BUFFER_SIZE)
+  protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  protected val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)

Review Comment:
   Do we need to set this config? 
`workerFactory.createSimpleWorker(blockingMode = true)` this is already not 
using the daemon mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


sahnib commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1516471532


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:

Review Comment:
   Should we make this abstract to force user to implement it? 



##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-04 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1511914150


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   We have agreed that the offset interface be dict or recursive dict whose key 
and value are primitive type offline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-23 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1501274208


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def latestOffset(self) -> dict:
+"""
+Returns the most recent offset available.
+
+Returns
+---
+dict
+A dict whose key and values are str type.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "latestOffset"},
+)
+
+def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
+"""
+Returns a list of InputPartition  given the start and end offsets. 
Each InputPartition
+represents a data split that can be processed by one Spark task.
+
+Parameters
+--
+start : dict
+The start offset of the microbatch to plan partitioning.
+end : dict
+The end offset of the microbatch to plan partitioning.

Review Comment:
   We use dict here because we want to enforce the object to be json 
serializable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-23 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1501273741


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict

Review Comment:
   Changed to dict or recursive whose key and value must be primitive type.



##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+

Review Comment:
   Example added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-16 Thread via GitHub


allisonwang-db commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1492090151


##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import json
+from typing import IO
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+read_int,
+write_int,
+write_with_length,
+SpecialLengths,
+)
+from pyspark.sql.datasource import DataSource, DataSourceStreamReader
+from pyspark.sql.types import (
+_parse_datatype_json_string,
+StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+check_python_version,
+read_command,
+pickleSer,
+send_accumulator_updates,
+setup_memory_limits,
+setup_spark_files,
+utf8_deserializer,
+)
+
+initial_offset_func_id = 884
+latest_offset_func_id = 885
+partitions_func_id = 886
+commit_func_id = 887
+
+
+def initial_offset_func(reader: DataSourceStreamReader, outfile: IO) -> None:
+offset = reader.initialOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def latest_offset_func(reader: DataSourceStreamReader, outfile: IO) -> None:
+offset = reader.latestOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def partitions_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) 
-> None:
+start_offset = json.loads(utf8_deserializer.loads(infile))
+end_offset = json.loads(utf8_deserializer.loads(infile))
+partitions = reader.partitions(start_offset, end_offset)
+# Return the serialized partition values.
+write_int(len(partitions), outfile)
+for partition in partitions:
+pickleSer._write_with_length(partition, outfile)
+
+
+def commit_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> 
None:
+end_offset = json.loads(utf8_deserializer.loads(infile))
+reader.commit(end_offset)
+write_int(0, outfile)
+
+
+def main(infile: IO, outfile: IO) -> None:
+try:
+check_python_version(infile)
+setup_spark_files(infile)
+
+memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
+setup_memory_limits(memory_limit_mb)
+
+_accumulatorRegistry.clear()
+
+# Receive the data source instance.
+data_source = read_command(pickleSer, infile)
+
+if not isinstance(data_source, DataSource):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "a Python data source instance of type 
'DataSource'",
+"actual": f"'{type(data_source).__name__}'",
+},
+)
+
+# Receive the data source output schema.
+schema_json = utf8_deserializer.loads(infile)
+schema = _parse_datatype_json_string(schema_json)
+if not isinstance(schema, StructType):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "an output schema of type 'StructType'",
+"actual": f"'{type(schema).__name__}'",
+},
+)
+
+# Instantiate data source reader.
+try:
+reader = data_source.streamReader(schema=schema)
+# Initialization succeed.
+write_int(0, outfile)
+outfile.flush()
+
+# handle method call from socket
+while True:
+func_id = read_int(infile)
+if func_id == initial_offset_func_id:
+initial_offset_func(reader, outfile)
+elif func_id == latest_offset_func_id:
+latest_offset_func(reader, outfile)
+elif func_id == partitions_func_id:
+partitions_func(reader, infile, outfile)
+elif func_id == 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-15 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1491869371


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  val initialOffsetFuncId = 884
+  val latestOffsetFuncId = 885
+  val partitionsFuncId = 886
+  val commitFuncId = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  protected val bufferSize: Int = conf.get(BUFFER_SIZE)
+  protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  protected val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  pythonWorker.get.channel.socket().getOutputStream, bufferSize)
+dataOut = new DataOutputStream(stream)
+
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+
+val pythonIncludes = func.pythonIncludes.asScala.toSet
+PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, 
dataOut)
+
+// Send the user function to python process
+PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+// Send output schema
+PythonWorkerUtils.writeUTF(outputSchema.json, dataOut)
+
+// Send configurations
+dataOut.writeInt(SQLConf.get.arrowMaxRecordsPerBatch)
+dataOut.flush()
+
+dataIn = new DataInputStream(
+  new 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-15 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1491663766


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def latestOffset(self) -> dict:
+"""
+Returns the most recent offset available.
+
+Returns
+---
+dict
+A dict whose key and values are str type.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "latestOffset"},
+)
+
+def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
+"""
+Returns a list of InputPartition  given the start and end offsets. 
Each InputPartition
+represents a data split that can be processed by one Spark task.
+
+Parameters
+--
+start : dict
+The start offset of the microbatch to plan partitioning.
+end : dict
+The end offset of the microbatch to plan partitioning.
+
+Returns
+---
+Sequence[InputPartition]
+A sequence of partitions for this data source. Each partition value
+must be an instance of `InputPartition` or a subclass of it.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "partitions"},
+)
+
+@abstractmethod
+def read(self, partition) -> Iterator[Union[Tuple, Row]]:
+"""
+Generates data for a given partition and returns an iterator of tuples 
or rows.
+
+This method is invoked once per partition to read the data. 
Implementing
+this method is required for stream reader. You can initialize any
+non-serializable resources required for reading data from the data 
source
+within this method.
+This method is static and stateless. You shouldn't access mutable 
class member
+or keep in memory state between different invocations of read().
+
+Parameters
+--
+partition : object
+The partition to read. It must be one of the partition values 
returned by
+``partitions()``.
+
+Returns
+---
+Iterator[Tuple] or Iterator[Row]
+An iterator of tuples or rows. Each tuple or row will be converted 
to a row
+in the final DataFrame.
+"""
+...
+
+def commit(self, end: dict):

Review Comment:
   Return type added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-15 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1491464370


##
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, 
shouldTestPandasUDFs}
+import 
org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, 
PythonMicroBatchStream, PythonStreamingSourceOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
+
+  protected def simpleDataStreamReaderScript: String =
+"""
+  |from pyspark.sql.datasource import DataSourceStreamReader, 
InputPartition
+  |
+  |class SimpleDataStreamReader(DataSourceStreamReader):
+  |def initialOffset(self):
+  |return {"offset": "0"}
+  |def latestOffset(self):
+  |return {"offset": "2"}
+  |def partitions(self, start: dict, end: dict):
+  |return [InputPartition(i) for i in range(int(start["offset"]))]
+  |def commit(self, end: dict):
+  |1 + 2
+  |def read(self, partition):
+  |yield (0, partition.value)
+  |yield (1, partition.value)
+  |yield (2, partition.value)
+  |""".stripMargin
+
+  protected def errorDataStreamReaderScript: String =
+"""
+  |from pyspark.sql.datasource import DataSourceStreamReader, 
InputPartition
+  |
+  |class ErrorDataStreamReader(DataSourceStreamReader):
+  |def initialOffset(self):
+  |raise Exception("error reading initial offset")
+  |def latestOffset(self):
+  |raise Exception("error reading latest offset")
+  |def partitions(self, start: dict, end: dict):
+  |raise Exception("error planning partitions")
+  |def commit(self, end: dict):
+  |raise Exception("error committing offset")
+  |def read(self, partition):
+  |yield (0, partition.value)
+  |yield (1, partition.value)
+  |yield (2, partition.value)
+  |""".stripMargin
+
+  private val errorDataSourceName = "ErrorDataSource"
+
+  test("simple data stream source") {
+assume(shouldTestPandasUDFs)
+val dataSourceScript =
+  s"""
+ |from pyspark.sql.datasource import DataSource
+ |$simpleDataStreamReaderScript
+ |
+ |class $dataSourceName(DataSource):
+ |def streamReader(self, schema):
+ |return SimpleDataStreamReader()
+ |""".stripMargin
+val inputSchema = StructType.fromDDL("input BINARY")
+
+val dataSource = createUserDefinedPythonDataSource(dataSourceName, 
dataSourceScript)
+spark.dataSource.registerPython(dataSourceName, dataSource)
+val pythonDs = new PythonDataSourceV2
+pythonDs.setShortName("SimpleDataSource")
+val stream = new PythonMicroBatchStream(
+  pythonDs, dataSourceName, inputSchema, CaseInsensitiveStringMap.empty())
+
+val initialOffset = stream.initialOffset()
+assert(initialOffset.json == "{\"offset\": \"0\"}")
+for (_ <- 1 to 50) {
+  val offset = stream.latestOffset()
+  assert(offset.json == "{\"offset\": \"2\"}")
+  assert(stream.planInputPartitions(offset, offset).size == 2)
+  stream.commit(offset)
+}
+stream.stop()

Review Comment:
   read will be implemented in the next PR, it is executed in executor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-15 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1491464124


##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import json
+from typing import IO
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+read_int,
+write_int,
+write_with_length,
+SpecialLengths,
+)
+from pyspark.sql.datasource import DataSource
+from pyspark.sql.types import (
+_parse_datatype_json_string,
+StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+check_python_version,
+read_command,
+pickleSer,
+send_accumulator_updates,
+setup_memory_limits,
+setup_spark_files,
+utf8_deserializer,
+)
+
+initial_offset_func_id = 884
+latest_offset_func_id = 885
+partitions_func_id = 886
+commit_func_id = 887
+
+
+def initial_offset_func(reader, outfile):
+offset = reader.initialOffset()

Review Comment:
   We throw unimplemented error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-15 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1491463836


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def latestOffset(self) -> dict:
+"""
+Returns the most recent offset available.
+
+Returns
+---
+dict
+A dict whose key and values are str type.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "latestOffset"},
+)
+
+def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
+"""
+Returns a list of InputPartition  given the start and end offsets. 
Each InputPartition
+represents a data split that can be processed by one Spark task.
+
+Parameters
+--
+start : dict
+The start offset of the microbatch to plan partitioning.
+end : dict
+The end offset of the microbatch to plan partitioning.
+
+Returns
+---
+Sequence[InputPartition]
+A sequence of partitions for this data source. Each partition value
+must be an instance of `InputPartition` or a subclass of it.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "partitions"},
+)
+
+@abstractmethod
+def read(self, partition) -> Iterator[Union[Tuple, Row]]:
+"""
+Generates data for a given partition and returns an iterator of tuples 
or rows.
+
+This method is invoked once per partition to read the data. 
Implementing
+this method is required for stream reader. You can initialize any
+non-serializable resources required for reading data from the data 
source
+within this method.
+This method is static and stateless. You shouldn't access mutable 
class member
+or keep in memory state between different invocations of read().
+
+Parameters
+--
+partition : object
+The partition to read. It must be one of the partition values 
returned by
+``partitions()``.
+
+Returns
+---
+Iterator[Tuple] or Iterator[Row]
+An iterator of tuples or rows. Each tuple or row will be converted 
to a row
+in the final DataFrame.
+"""
+...
+
+def commit(self, end: dict):

Review Comment:
   This function is not supposed to return anything, how do we express that in 
python?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-15 Thread via GitHub


allisonwang-db commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490557068


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict

Review Comment:
   Nit: Dict[str, str]? Is this case sensitive or case insensitive?



##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import json
+from typing import IO
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+read_int,
+write_int,
+write_with_length,
+SpecialLengths,
+)
+from pyspark.sql.datasource import DataSource
+from pyspark.sql.types import (
+_parse_datatype_json_string,
+StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+check_python_version,
+read_command,
+pickleSer,
+send_accumulator_updates,
+setup_memory_limits,
+setup_spark_files,
+utf8_deserializer,
+)
+
+initial_offset_func_id = 884
+latest_offset_func_id = 885
+partitions_func_id = 886
+commit_func_id = 887
+
+
+def initial_offset_func(reader, outfile):
+offset = reader.initialOffset()

Review Comment:
   What if the initialOffset is not implemented?



##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def latestOffset(self) -> dict:
+"""
+Returns the most recent offset available.
+
+Returns

Review Comment:
   ditto for examples



##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+

Review Comment:
   Could you also provide an example of what the dictionary looks like?
   ```
   Examples
   -
   ...
   ```



##

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490256089


##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import functools
+import json
+from itertools import islice
+from typing import IO, List, Iterator, Iterable
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+read_int,
+write_int,
+write_with_length,
+SpecialLengths,
+)
+from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, 
LocalDataToArrowConversion
+from pyspark.sql.datasource import DataSource, InputPartition
+from pyspark.sql.pandas.types import to_arrow_schema
+from pyspark.sql.types import (
+_parse_datatype_json_string,
+BinaryType,
+StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+check_python_version,
+read_command,
+pickleSer,
+send_accumulator_updates,
+setup_broadcasts,
+setup_memory_limits,
+setup_spark_files,
+utf8_deserializer,
+)
+
+initial_offset_func_id = 884
+latest_offset_func_id = 885
+partitions_func_id = 886
+commit_func_id = 887
+
+
+def initial_offset_func(reader, outfile):
+offset = reader.initialOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def latest_offset_func(reader, outfile):
+offset = reader.latestOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def partitions_func(reader, infile, outfile):
+start_offset = json.loads(utf8_deserializer.loads(infile))
+end_offset = json.loads(utf8_deserializer.loads(infile))
+partitions = reader.partitions(start_offset, end_offset)
+# Return the serialized partition values.
+write_int(len(partitions), outfile)
+for partition in partitions:
+pickleSer._write_with_length(partition, outfile)
+
+
+def commit_func(reader, infile, outfile):
+end_offset = json.loads(utf8_deserializer.loads(infile))
+reader.commit(end_offset)
+write_int(0, outfile)
+
+
+def main(infile: IO, outfile: IO) -> None:
+try:
+check_python_version(infile)
+setup_spark_files(infile)
+
+memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
+setup_memory_limits(memory_limit_mb)
+
+_accumulatorRegistry.clear()
+
+# Receive the data source instance.
+data_source = read_command(pickleSer, infile)
+
+if not isinstance(data_source, DataSource):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "a Python data source instance of type 
'DataSource'",
+"actual": f"'{type(data_source).__name__}'",
+},
+)
+
+# Receive the data source output schema.
+schema_json = utf8_deserializer.loads(infile)

Review Comment:
   We only use data source schema when user doesn't specify schema.
   
https://github.com/apache/spark/blob/736d8ab3f00e7c5ba1b01c22f6398b636b8492ea/python/pyspark/sql/worker/create_data_source.py#L144



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490215001


##
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, 
shouldTestPandasUDFs}
+import 
org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, 
PythonMicroBatchStream, PythonStreamingSourceOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
+
+  protected def simpleDataStreamReaderScript: String =
+"""
+  |from pyspark.sql.datasource import DataSourceStreamReader, 
InputPartition
+  |
+  |class SimpleDataStreamReader(DataSourceStreamReader):
+  |def initialOffset(self):
+  |return {"0": "2"}
+  |def latestOffset(self):
+  |return {"0": "2"}

Review Comment:
   Nice suggestion, changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490214832


##
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, 
shouldTestPandasUDFs}
+import 
org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, 
PythonMicroBatchStream, PythonStreamingSourceOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
+
+  protected def simpleDataStreamReaderScript: String =
+"""
+  |from pyspark.sql.datasource import DataSourceStreamReader, 
InputPartition
+  |
+  |class SimpleDataStreamReader(DataSourceStreamReader):
+  |def initialOffset(self):
+  |return {"0": "2"}
+  |def latestOffset(self):
+  |return {"0": "2"}
+  |def partitions(self, start: dict, end: dict):
+  |return [InputPartition(i) for i in range(int(start["0"]))]
+  |def commit(self, end: dict):
+  |1 + 2
+  |def read(self, partition):
+  |yield (0, partition.value)
+  |yield (1, partition.value)
+  |yield (2, partition.value)
+  |""".stripMargin
+
+  protected def errorDataStreamReaderScript: String =
+"""
+  |from pyspark.sql.datasource import DataSourceStreamReader, 
InputPartition
+  |
+  |class ErrorDataStreamReader(DataSourceStreamReader):
+  |def initialOffset(self):
+  |raise Exception("error reading initial offset")
+  |def latestOffset(self):
+  |raise Exception("error reading latest offset")
+  |def partitions(self, start: dict, end: dict):
+  |raise Exception("error planning partitions")
+  |def commit(self, end: dict):
+  |raise Exception("error committing offset")
+  |def read(self, partition):
+  |yield (0, partition.value)
+  |yield (1, partition.value)
+  |yield (2, partition.value)
+  |""".stripMargin
+
+  private val errorDataSourceName = "ErrorDataSource"
+
+  test("simple data stream source") {
+assume(shouldTestPandasUDFs)
+val dataSourceScript =
+  s"""
+ |from pyspark.sql.datasource import DataSource
+ |$simpleDataStreamReaderScript
+ |
+ |class $dataSourceName(DataSource):
+ |def streamReader(self, schema):
+ |return SimpleDataStreamReader()
+ |""".stripMargin
+val inputSchema = StructType.fromDDL("input BINARY")
+
+val dataSource = createUserDefinedPythonDataSource(dataSourceName, 
dataSourceScript)
+spark.dataSource.registerPython(dataSourceName, dataSource)
+val pythonDs = new PythonDataSourceV2
+pythonDs.setShortName("SimpleDataSource")
+val stream = new PythonMicroBatchStream(
+  pythonDs, dataSourceName, inputSchema, CaseInsensitiveStringMap.empty())
+
+val initialOffset = stream.initialOffset()
+assert(initialOffset.json == "{\"0\": \"2\"}")
+for (_ <- 1 to 50) {
+  val offset = stream.latestOffset()
+  assert(offset.json == "{\"0\": \"2\"}")
+  assert(stream.planInputPartitions(offset, offset).size == 2)
+  stream.commit(offset)
+}
+stream.stop()
+  }
+
+  test("Error creating stream reader") {
+assume(shouldTestPandasUDFs)
+val dataSourceScript =
+  s"""
+ |from pyspark.sql.datasource import DataSource
+ |class $dataSourceName(DataSource):
+ |def streamReader(self, schema):
+ |raise Exception("error creating stream reader")
+ |""".stripMargin
+val dataSource = createUserDefinedPythonDataSource(
+  name = dataSourceName, pythonScript = dataSourceScript)
+spark.dataSource.registerPython(dataSourceName, 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490214280


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  val initialOffsetFuncId = 884
+  val latestOffsetFuncId = 885
+  val partitionsFuncId = 886
+  val commitFuncId = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  protected val bufferSize: Int = conf.get(BUFFER_SIZE)
+  protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  protected val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  pythonWorker.get.channel.socket().getOutputStream, bufferSize)
+dataOut = new DataOutputStream(stream)
+
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+
+val pythonIncludes = func.pythonIncludes.asScala.toSet
+PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, 
dataOut)
+
+// Send the user function to python process
+PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+// Send output schema
+PythonWorkerUtils.writeUTF(outputSchema.json, dataOut)
+
+// Send configurations
+dataOut.writeInt(SQLConf.get.arrowMaxRecordsPerBatch)
+dataOut.flush()
+
+dataIn = new DataInputStream(
+  new 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490213270


##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import functools
+import json
+from itertools import islice
+from typing import IO, List, Iterator, Iterable
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+read_int,
+write_int,
+write_with_length,
+SpecialLengths,
+)
+from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, 
LocalDataToArrowConversion
+from pyspark.sql.datasource import DataSource, InputPartition
+from pyspark.sql.pandas.types import to_arrow_schema
+from pyspark.sql.types import (
+_parse_datatype_json_string,
+BinaryType,
+StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+check_python_version,
+read_command,
+pickleSer,
+send_accumulator_updates,
+setup_broadcasts,
+setup_memory_limits,
+setup_spark_files,
+utf8_deserializer,
+)
+
+initial_offset_func_id = 884
+latest_offset_func_id = 885
+partitions_func_id = 886
+commit_func_id = 887
+
+
+def initial_offset_func(reader, outfile):
+offset = reader.initialOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def latest_offset_func(reader, outfile):
+offset = reader.latestOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def partitions_func(reader, infile, outfile):
+start_offset = json.loads(utf8_deserializer.loads(infile))
+end_offset = json.loads(utf8_deserializer.loads(infile))
+partitions = reader.partitions(start_offset, end_offset)
+# Return the serialized partition values.
+write_int(len(partitions), outfile)
+for partition in partitions:
+pickleSer._write_with_length(partition, outfile)
+
+
+def commit_func(reader, infile, outfile):
+end_offset = json.loads(utf8_deserializer.loads(infile))
+reader.commit(end_offset)
+write_int(0, outfile)
+
+
+def main(infile: IO, outfile: IO) -> None:
+try:
+check_python_version(infile)
+setup_spark_files(infile)
+
+memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
+setup_memory_limits(memory_limit_mb)
+
+_accumulatorRegistry.clear()
+
+# Receive the data source instance.
+data_source = read_command(pickleSer, infile)
+
+if not isinstance(data_source, DataSource):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "a Python data source instance of type 
'DataSource'",
+"actual": f"'{type(data_source).__name__}'",
+},
+)
+
+# Receive the data source output schema.
+schema_json = utf8_deserializer.loads(infile)
+schema = _parse_datatype_json_string(schema_json)
+if not isinstance(schema, StructType):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "an output schema of type 'StructType'",
+"actual": f"'{type(schema).__name__}'",
+},
+)
+
+# Receive the configuration values.

Review Comment:
   Removed. It is no longer necessary because the pickled read function will be 
created from another python worker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490211705


##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import functools
+import json
+from itertools import islice
+from typing import IO, List, Iterator, Iterable
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+read_int,
+write_int,
+write_with_length,
+SpecialLengths,
+)
+from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, 
LocalDataToArrowConversion
+from pyspark.sql.datasource import DataSource, InputPartition
+from pyspark.sql.pandas.types import to_arrow_schema
+from pyspark.sql.types import (
+_parse_datatype_json_string,
+BinaryType,
+StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+check_python_version,
+read_command,
+pickleSer,
+send_accumulator_updates,
+setup_broadcasts,
+setup_memory_limits,
+setup_spark_files,
+utf8_deserializer,
+)
+
+initial_offset_func_id = 884
+latest_offset_func_id = 885
+partitions_func_id = 886
+commit_func_id = 887
+
+
+def initial_offset_func(reader, outfile):
+offset = reader.initialOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def latest_offset_func(reader, outfile):
+offset = reader.latestOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def partitions_func(reader, infile, outfile):
+start_offset = json.loads(utf8_deserializer.loads(infile))
+end_offset = json.loads(utf8_deserializer.loads(infile))
+partitions = reader.partitions(start_offset, end_offset)
+# Return the serialized partition values.
+write_int(len(partitions), outfile)
+for partition in partitions:
+pickleSer._write_with_length(partition, outfile)
+
+
+def commit_func(reader, infile, outfile):
+end_offset = json.loads(utf8_deserializer.loads(infile))
+reader.commit(end_offset)
+write_int(0, outfile)
+
+
+def main(infile: IO, outfile: IO) -> None:
+try:
+check_python_version(infile)
+setup_spark_files(infile)
+
+memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
+setup_memory_limits(memory_limit_mb)
+
+_accumulatorRegistry.clear()
+
+# Receive the data source instance.
+data_source = read_command(pickleSer, infile)
+
+if not isinstance(data_source, DataSource):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "a Python data source instance of type 
'DataSource'",
+"actual": f"'{type(data_source).__name__}'",
+},
+)
+
+# Receive the data source output schema.
+schema_json = utf8_deserializer.loads(infile)
+schema = _parse_datatype_json_string(schema_json)
+if not isinstance(schema, StructType):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "an output schema of type 'StructType'",
+"actual": f"'{type(schema).__name__}'",
+},
+)
+
+# Receive the configuration values.
+max_arrow_batch_size = read_int(infile)
+assert max_arrow_batch_size > 0, (
+"The maximum arrow batch size should be greater than 0, but got "
+f"'{max_arrow_batch_size}'"
+)
+
+# Instantiate data source reader.
+try:
+reader = data_source.streamReader(schema=schema)
+# Initialization succeed.
+write_int(0, outfile)
+outfile.flush()
+
+# handle method call from socket
+while True:
+func_id = read_int(infile)
+

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490134034


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   UPDATE: we are discussing offline now and it could take time - this is tied 
to the interface and crazily uneasy to change after it is shipped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490111896


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   That essentially means we need to add the deserializeOffset() interface to 
the StreamReader for deserialization. Or is there any alternative?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490109817


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   Maybe yes. I'm OK with dict with clearly explaining which type they can use 
as value (flipping the coin we need to still restrict the types as we have to 
convert this to json), but probably less headache to let data source 
implementation to take care of themselves.



##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   Maybe yes. I'm OK with dict with clearly explaining which type they can use 
as value (flipping the coin we need to still restrict the available types as we 
have to convert this to json), but probably less headache to let data source 
implementation to take care of themselves.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490109817


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   Maybe yes. I'm OK with dict with clearly explaining which type they can use 
as value, but probably less headache to let data source implementation to take 
care of themselves.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490107780


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   So data source implementation provides serde between its offset model <-> 
json, and python worker will handle it with serde being provided. For python 
worker <-> JVM, json is used. Does this make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490106637


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   Does that mean we also need to add deserializeOffset() to the python 
interface?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490104863


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   We expect data source to know about its offset model - we pass a json format 
of offset to specific data source, which is expected to be deserialized 
successfully to its offset model. It will come to the runtime error if the json 
format of offset and offset model aren't compatible, which I think Scala side 
is doing the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1489903039


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   The serialization is easy, but deserialization will be tricky. We don't have 
runtime type information to deserialize a plain json text back to python 
object.(because we don't even know what python type the json is serialized from 
unless we keep extra type information in the json)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1489279083


##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import functools
+import json
+from itertools import islice
+from typing import IO, List, Iterator, Iterable
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+read_int,
+write_int,
+write_with_length,
+SpecialLengths,
+)
+from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, 
LocalDataToArrowConversion
+from pyspark.sql.datasource import DataSource, InputPartition
+from pyspark.sql.pandas.types import to_arrow_schema
+from pyspark.sql.types import (
+_parse_datatype_json_string,
+BinaryType,
+StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+check_python_version,
+read_command,
+pickleSer,
+send_accumulator_updates,
+setup_broadcasts,
+setup_memory_limits,
+setup_spark_files,
+utf8_deserializer,
+)
+
+initial_offset_func_id = 884
+latest_offset_func_id = 885
+partitions_func_id = 886
+commit_func_id = 887
+
+
+def initial_offset_func(reader, outfile):
+offset = reader.initialOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def latest_offset_func(reader, outfile):
+offset = reader.latestOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def partitions_func(reader, infile, outfile):
+start_offset = json.loads(utf8_deserializer.loads(infile))
+end_offset = json.loads(utf8_deserializer.loads(infile))
+partitions = reader.partitions(start_offset, end_offset)
+# Return the serialized partition values.
+write_int(len(partitions), outfile)
+for partition in partitions:
+pickleSer._write_with_length(partition, outfile)
+
+
+def commit_func(reader, infile, outfile):
+end_offset = json.loads(utf8_deserializer.loads(infile))
+reader.commit(end_offset)
+write_int(0, outfile)
+
+
+def main(infile: IO, outfile: IO) -> None:
+try:
+check_python_version(infile)
+setup_spark_files(infile)
+
+memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
+setup_memory_limits(memory_limit_mb)
+
+_accumulatorRegistry.clear()
+
+# Receive the data source instance.
+data_source = read_command(pickleSer, infile)
+
+if not isinstance(data_source, DataSource):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "a Python data source instance of type 
'DataSource'",
+"actual": f"'{type(data_source).__name__}'",
+},
+)
+
+# Receive the data source output schema.
+schema_json = utf8_deserializer.loads(infile)

Review Comment:
   What is the expected behavior if user does not specify the schema and relies 
on the schema of data source, which we say, inference (which is mostly applied 
to the majority of data sources)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1489259159


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   I just realized this is too limited - there are data sources which has an 
offset structure as complex model, e.g. offset model consists of a list 
containing case class instance as element. It probably couldn't be bound to 
flattened dictionary.
   
   Shall we revisit the design for the structure of offset? Probably the 
closest approach from proposed change to do is to simply allow dict, with 
noticing that we are serializing dict to json via following approach underneath 
(hence the dict has to be compatible). 
   https://docs.python.org/3/library/json.html#py-to-json-table
   
   Or just introduce an interface for Offset which is expected to handle json 
serde by itself. We could probably document the example; how to implement json 
serde for the complex type of offset.



##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import functools
+import json
+from itertools import islice
+from typing import IO, List, Iterator, Iterable
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+read_int,
+write_int,
+write_with_length,
+SpecialLengths,
+)
+from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, 
LocalDataToArrowConversion
+from pyspark.sql.datasource import DataSource, InputPartition
+from pyspark.sql.pandas.types import to_arrow_schema
+from pyspark.sql.types import (
+_parse_datatype_json_string,
+BinaryType,
+StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+check_python_version,
+read_command,
+pickleSer,
+send_accumulator_updates,
+setup_broadcasts,
+setup_memory_limits,
+setup_spark_files,
+utf8_deserializer,
+)
+
+initial_offset_func_id = 884
+latest_offset_func_id = 885
+partitions_func_id = 886
+commit_func_id = 887
+
+
+def initial_offset_func(reader, outfile):
+offset = reader.initialOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def latest_offset_func(reader, outfile):
+offset = reader.latestOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def partitions_func(reader, infile, outfile):
+start_offset = json.loads(utf8_deserializer.loads(infile))
+end_offset = json.loads(utf8_deserializer.loads(infile))
+partitions = reader.partitions(start_offset, end_offset)
+# Return the serialized partition values.
+write_int(len(partitions), outfile)
+for partition in partitions:
+pickleSer._write_with_length(partition, outfile)
+
+
+def commit_func(reader, infile, outfile):
+end_offset = json.loads(utf8_deserializer.loads(infile))
+reader.commit(end_offset)
+write_int(0, outfile)
+
+
+def main(infile: IO, outfile: IO) -> None:
+try:
+check_python_version(infile)
+setup_spark_files(infile)
+
+memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
+setup_memory_limits(memory_limit_mb)
+
+

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-13 Thread via GitHub


HeartSaVioR commented on PR #45023:
URL: https://github.com/apache/spark/pull/45023#issuecomment-1943113395

   Could you please check the GA build result and fix accordingly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org