Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR closed pull request #45023: [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source URL: https://github.com/apache/spark/pull/45023 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on PR #45023: URL: https://github.com/apache/spark/pull/45023#issuecomment-1990058421 Thanks! Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1520802905 ## python/pyspark/sql/datasource.py: ## @@ -426,6 +426,10 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: in the final DataFrame. """ ... +raise PySparkNotImplementedError( Review Comment: @chaoqin-li1123 nit: if not needed, let's remove @abstract in the method definition. Please do this as follow-up PR. I'd like to unblock this PR to let you move forward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
allisonwang-db commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1520455477 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStream, bufferSize) +dataOut = new DataOutputStream(stream) + +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + +val pythonIncludes = func.pythonIncludes.asScala.toSet +PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, dataOut) + +// Send the user function to python process +PythonWorkerUtils.writePythonFunction(func, dataOut) + +// Send output schema +PythonWorkerUtils.writeUTF(outputSchema.json, dataOut) + +dataOut.flush() + +dataIn = new DataIn
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
allisonwang-db commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1520455477 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStream, bufferSize) +dataOut = new DataOutputStream(stream) + +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + +val pythonIncludes = func.pythonIncludes.asScala.toSet +PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, dataOut) + +// Send the user function to python process +PythonWorkerUtils.writePythonFunction(func, dataOut) + +// Send output schema +PythonWorkerUtils.writeUTF(outputSchema.json, dataOut) + +dataOut.flush() + +dataIn = new DataIn
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
allisonwang-db commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1520450074 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def initialOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" + +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def latestOffset(self) -> dict: +""" +Returns the most recent offset available. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def latestOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "latestOffset"}, +) + +def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]: +""" +Returns a list of InputPartition given the start and end offsets. Each InputPartition +represents a data split that can be processed by one Spark task. + +Parameters +-- +start : dict +The start offset of the microbatch to plan partitioning. +end : dict +The end offset of the microbatch to plan partitioning. + +Returns +--- +Sequence[InputPartition] +A sequence of partitions for this data source. Each partition value +must be an instance of `InputPartition` or a subclass of it. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "partitions"}, +) + +@abstractmethod +def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: Review Comment: Yes let's make it `Iterator[Tuple]`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1520368942 ## sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, shouldTestPandasUDFs} +import org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, PythonMicroBatchStream, PythonStreamingSourceOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { + + protected def simpleDataStreamReaderScript: String = +""" + |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition + | + |class SimpleDataStreamReader(DataSourceStreamReader): + |def initialOffset(self): + |return {"offset": {"partition-1": 0}} + |def latestOffset(self): + |return {"offset": {"partition-1": 2}} + |def partitions(self, start: dict, end: dict): + |start_index = start["offset"]["partition-1"] + |end_index = end["offset"]["partition-1"] + |return [InputPartition(i) for i in range(start_index, end_index)] + |def commit(self, end: dict): + |1 + 2 + |def read(self, partition): + |yield (0, partition.value) + |yield (1, partition.value) + |yield (2, partition.value) + |""".stripMargin + + protected def errorDataStreamReaderScript: String = +""" + |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition + | + |class ErrorDataStreamReader(DataSourceStreamReader): + |def initialOffset(self): + |raise Exception("error reading initial offset") + |def latestOffset(self): + |raise Exception("error reading latest offset") + |def partitions(self, start: dict, end: dict): + |raise Exception("error planning partitions") + |def commit(self, end: dict): + |raise Exception("error committing offset") + |def read(self, partition): + |yield (0, partition.value) + |yield (1, partition.value) + |yield (2, partition.value) + |""".stripMargin + + private val errorDataSourceName = "ErrorDataSource" + + test("simple data stream source") { +assume(shouldTestPandasUDFs) +val dataSourceScript = + s""" + |from pyspark.sql.datasource import DataSource + |$simpleDataStreamReaderScript + | + |class $dataSourceName(DataSource): + |def streamReader(self, schema): + |return SimpleDataStreamReader() + |""".stripMargin +val inputSchema = StructType.fromDDL("input BINARY") + +val dataSource = createUserDefinedPythonDataSource(dataSourceName, dataSourceScript) +spark.dataSource.registerPython(dataSourceName, dataSource) +val pythonDs = new PythonDataSourceV2 +pythonDs.setShortName("SimpleDataSource") +val stream = new PythonMicroBatchStream( + pythonDs, dataSourceName, inputSchema, CaseInsensitiveStringMap.empty()) + +val initialOffset = stream.initialOffset() +assert(initialOffset.json == "{\"offset\": {\"partition-1\": 0}}") +for (_ <- 1 to 50) { Review Comment: It is a stress test to prove that the RPC calls can be invoked many times. ## python/pyspark/sql/streaming/python_streaming_source_runner.py: ## @@ -0,0 +1,160 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the Li
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1520229021 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. Review Comment: It is just discouraged, list and float are also supposed to work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1519254087 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. Review Comment: Say `str, int, bool`, as these are actual representation of "python" type? https://docs.python.org/3/library/json.html#json.dump Also do we disallow other types or they are just discouraged to use (for us) so we don't document? There are more types JSON module in the python standard library supports. https://docs.python.org/3/library/json.html#json.JSONDecoder ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def initialOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" + +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def latestOffset(self) -> dict: +""" +Returns the most recent offset available. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes Review Comment: ditto ## sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, shouldTestPandasUDFs} +import org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, PythonMicroBatchStream, PythonStreamingSourceOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { + + protected def simpleDataStreamReaderScript: String = +""" + |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition + | + |class SimpleDataStreamReader(DataSourceStreamReader): + |def initialOffset(self): + |return {"offset": {"partition-1": 0}} + |def latestOffset(self): + |return {"offset": {"partition-1": 2}} + |def partitions(self, start: dict, end: dict): + |
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1518252566 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + val initialOffsetFuncId = 884 + val latestOffsetFuncId = 885 + val partitionsFuncId = 886 + val commitFuncId = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + protected val bufferSize: Int = conf.get(BUFFER_SIZE) + protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + protected val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) Review Comment: Thanks for pointing it out, I removed the redundant conf reset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
WweiL commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1518173173 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStream, bufferSize) +dataOut = new DataOutputStream(stream) + +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + +val pythonIncludes = func.pythonIncludes.asScala.toSet +PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, dataOut) + +// Send the user function to python process +PythonWorkerUtils.writePythonFunction(func, dataOut) + +// Send output schema +PythonWorkerUtils.writeUTF(outputSchema.json, dataOut) + +dataOut.flush() + +dataIn = new DataInputStream
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
WweiL commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1518173173 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStream, bufferSize) +dataOut = new DataOutputStream(stream) + +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + +val pythonIncludes = func.pythonIncludes.asScala.toSet +PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, dataOut) + +// Send the user function to python process +PythonWorkerUtils.writePythonFunction(func, dataOut) + +// Send output schema +PythonWorkerUtils.writeUTF(outputSchema.json, dataOut) + +dataOut.flush() + +dataIn = new DataInputStream
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
WweiL commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1518165666 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + val initialOffsetFuncId = 884 + val latestOffsetFuncId = 885 + val partitionsFuncId = 886 + val commitFuncId = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + protected val bufferSize: Int = conf.get(BUFFER_SIZE) + protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + protected val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) Review Comment: Yes I agree we don't need to do this temporary set-reset here. This logic was in connect StreamingPythonRunner because of some concurrnet development issue. I'll also create a PR to remove it there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HyukjinKwon commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517362978 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def initialOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" + +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def latestOffset(self) -> dict: +""" +Returns the most recent offset available. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def latestOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "latestOffset"}, +) + +def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]: +""" +Returns a list of InputPartition given the start and end offsets. Each InputPartition +represents a data split that can be processed by one Spark task. + +Parameters +-- +start : dict +The start offset of the microbatch to plan partitioning. +end : dict +The end offset of the microbatch to plan partitioning. + +Returns +--- +Sequence[InputPartition] +A sequence of partitions for this data source. Each partition value +must be an instance of `InputPartition` or a subclass of it. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "partitions"}, +) + +@abstractmethod +def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: Review Comment: It has to be `Iterator[Tuple]`. We should fix batch side too. Row inherits `tuple` Mind fixing it here together if you don't mind. cc @allisonwang-db -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517359698 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStream, bufferSize) +dataOut = new DataOutputStream(stream) + +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + +val pythonIncludes = func.pythonIncludes.asScala.toSet +PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, dataOut) + +// Send the user function to python process +PythonWorkerUtils.writePythonFunction(func, dataOut) + +// Send output schema +PythonWorkerUtils.writeUTF(outputSchema.json, dataOut) + +dataOut.flush() + +dataIn = new DataIn
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517356903 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStream, bufferSize) +dataOut = new DataOutputStream(stream) + +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + +val pythonIncludes = func.pythonIncludes.asScala.toSet +PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, dataOut) + +// Send the user function to python process +PythonWorkerUtils.writePythonFunction(func, dataOut) + +// Send output schema +PythonWorkerUtils.writeUTF(outputSchema.json, dataOut) + +dataOut.flush() + +dataIn = new DataIn
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517356903 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStream, bufferSize) +dataOut = new DataOutputStream(stream) + +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + +val pythonIncludes = func.pythonIncludes.asScala.toSet +PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, dataOut) + +// Send the user function to python process +PythonWorkerUtils.writePythonFunction(func, dataOut) + +// Send output schema +PythonWorkerUtils.writeUTF(outputSchema.json, dataOut) + +dataOut.flush() + +dataIn = new DataIn
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517356072 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def initialOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" + +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def latestOffset(self) -> dict: +""" +Returns the most recent offset available. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def latestOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "latestOffset"}, +) + +def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]: +""" +Returns a list of InputPartition given the start and end offsets. Each InputPartition +represents a data split that can be processed by one Spark task. + +Parameters +-- +start : dict +The start offset of the microbatch to plan partitioning. +end : dict +The end offset of the microbatch to plan partitioning. + +Returns +--- +Sequence[InputPartition] +A sequence of partitions for this data source. Each partition value +must be an instance of `InputPartition` or a subclass of it. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "partitions"}, +) + +@abstractmethod +def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: Review Comment: I copy the interface from batch python data source. Why is this Iterator[Union] instead of Union[Iterator]? @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HyukjinKwon commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517351498 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { Review Comment: Could either @rangadi or @WweiL take a look? This is somewhat similar with foreachBatch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HyukjinKwon commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517348253 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: Review Comment: IDE seems not catching this case. Things are runtime as Jungtaek said so it'd be difficult to statically prevent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517209325 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: Review Comment: python will always fail in runtime, so it's more likely whichever error message is better to understand... But, I don't know, IDE could be the rescue. @HyukjinKwon Will IDE be able to point out if we define this as abstract and implementation class does not implement this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
allisonwang-db commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1516609093 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + val initialOffsetFuncId = 884 + val latestOffsetFuncId = 885 + val partitionsFuncId = 886 + val commitFuncId = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + protected val bufferSize: Int = conf.get(BUFFER_SIZE) + protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + protected val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) Review Comment: Do we need to set this config? `workerFactory.createSimpleWorker(blockingMode = true)` this is already not using the daemon mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
sahnib commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1516471532 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: Review Comment: Should we make this abstract to force user to implement it? ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStr
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1511914150 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: We have agreed that the offset interface be dict or recursive dict whose key and value are primitive type offline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1501274208 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def latestOffset(self) -> dict: +""" +Returns the most recent offset available. + +Returns +--- +dict +A dict whose key and values are str type. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "latestOffset"}, +) + +def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]: +""" +Returns a list of InputPartition given the start and end offsets. Each InputPartition +represents a data split that can be processed by one Spark task. + +Parameters +-- +start : dict +The start offset of the microbatch to plan partitioning. +end : dict +The end offset of the microbatch to plan partitioning. Review Comment: We use dict here because we want to enforce the object to be json serializable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1501273741 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict Review Comment: Changed to dict or recursive whose key and value must be primitive type. ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + Review Comment: Example added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
allisonwang-db commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1492090151 ## python/pyspark/sql/streaming/python_streaming_source_runner.py: ## @@ -0,0 +1,160 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import json +from typing import IO + +from pyspark.accumulators import _accumulatorRegistry +from pyspark.errors import PySparkAssertionError, PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.serializers import ( +read_int, +write_int, +write_with_length, +SpecialLengths, +) +from pyspark.sql.datasource import DataSource, DataSourceStreamReader +from pyspark.sql.types import ( +_parse_datatype_json_string, +StructType, +) +from pyspark.util import handle_worker_exception +from pyspark.worker_util import ( +check_python_version, +read_command, +pickleSer, +send_accumulator_updates, +setup_memory_limits, +setup_spark_files, +utf8_deserializer, +) + +initial_offset_func_id = 884 +latest_offset_func_id = 885 +partitions_func_id = 886 +commit_func_id = 887 + + +def initial_offset_func(reader: DataSourceStreamReader, outfile: IO) -> None: +offset = reader.initialOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def latest_offset_func(reader: DataSourceStreamReader, outfile: IO) -> None: +offset = reader.latestOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def partitions_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> None: +start_offset = json.loads(utf8_deserializer.loads(infile)) +end_offset = json.loads(utf8_deserializer.loads(infile)) +partitions = reader.partitions(start_offset, end_offset) +# Return the serialized partition values. +write_int(len(partitions), outfile) +for partition in partitions: +pickleSer._write_with_length(partition, outfile) + + +def commit_func(reader: DataSourceStreamReader, infile: IO, outfile: IO) -> None: +end_offset = json.loads(utf8_deserializer.loads(infile)) +reader.commit(end_offset) +write_int(0, outfile) + + +def main(infile: IO, outfile: IO) -> None: +try: +check_python_version(infile) +setup_spark_files(infile) + +memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1")) +setup_memory_limits(memory_limit_mb) + +_accumulatorRegistry.clear() + +# Receive the data source instance. +data_source = read_command(pickleSer, infile) + +if not isinstance(data_source, DataSource): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "a Python data source instance of type 'DataSource'", +"actual": f"'{type(data_source).__name__}'", +}, +) + +# Receive the data source output schema. +schema_json = utf8_deserializer.loads(infile) +schema = _parse_datatype_json_string(schema_json) +if not isinstance(schema, StructType): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "an output schema of type 'StructType'", +"actual": f"'{type(schema).__name__}'", +}, +) + +# Instantiate data source reader. +try: +reader = data_source.streamReader(schema=schema) +# Initialization succeed. +write_int(0, outfile) +outfile.flush() + +# handle method call from socket +while True: +func_id = read_int(infile) +if func_id == initial_offset_func_id: +initial_offset_func(reader, outfile) +elif func_id == latest_offset_func_id: +latest_offset_func(reader, outfile) +elif func_id == partitions_func_id: +partitions_func(reader, infile, outfile) +elif func_id == commit_func_
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1491869371 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + val initialOffsetFuncId = 884 + val latestOffsetFuncId = 885 + val partitionsFuncId = 886 + val commitFuncId = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + protected val bufferSize: Int = conf.get(BUFFER_SIZE) + protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + protected val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStream, bufferSize) +dataOut = new DataOutputStream(stream) + +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + +val pythonIncludes = func.pythonIncludes.asScala.toSet +PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, dataOut) + +// Send the user function to python process +PythonWorkerUtils.writePythonFunction(func, dataOut) + +// Send output schema +PythonWorkerUtils.writeUTF(outputSchema.json, dataOut) + +// Send configurations +dataOut.writeInt(SQLConf.get.arrowMaxRecordsPerBatch) +dataOut.flush() + +dataIn = new DataInputStream( + new BufferedInputStream(pythonWo
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1491663766 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def latestOffset(self) -> dict: +""" +Returns the most recent offset available. + +Returns +--- +dict +A dict whose key and values are str type. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "latestOffset"}, +) + +def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]: +""" +Returns a list of InputPartition given the start and end offsets. Each InputPartition +represents a data split that can be processed by one Spark task. + +Parameters +-- +start : dict +The start offset of the microbatch to plan partitioning. +end : dict +The end offset of the microbatch to plan partitioning. + +Returns +--- +Sequence[InputPartition] +A sequence of partitions for this data source. Each partition value +must be an instance of `InputPartition` or a subclass of it. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "partitions"}, +) + +@abstractmethod +def read(self, partition) -> Iterator[Union[Tuple, Row]]: +""" +Generates data for a given partition and returns an iterator of tuples or rows. + +This method is invoked once per partition to read the data. Implementing +this method is required for stream reader. You can initialize any +non-serializable resources required for reading data from the data source +within this method. +This method is static and stateless. You shouldn't access mutable class member +or keep in memory state between different invocations of read(). + +Parameters +-- +partition : object +The partition to read. It must be one of the partition values returned by +``partitions()``. + +Returns +--- +Iterator[Tuple] or Iterator[Row] +An iterator of tuples or rows. Each tuple or row will be converted to a row +in the final DataFrame. +""" +... + +def commit(self, end: dict): Review Comment: Return type added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1491464370 ## sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, shouldTestPandasUDFs} +import org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, PythonMicroBatchStream, PythonStreamingSourceOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { + + protected def simpleDataStreamReaderScript: String = +""" + |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition + | + |class SimpleDataStreamReader(DataSourceStreamReader): + |def initialOffset(self): + |return {"offset": "0"} + |def latestOffset(self): + |return {"offset": "2"} + |def partitions(self, start: dict, end: dict): + |return [InputPartition(i) for i in range(int(start["offset"]))] + |def commit(self, end: dict): + |1 + 2 + |def read(self, partition): + |yield (0, partition.value) + |yield (1, partition.value) + |yield (2, partition.value) + |""".stripMargin + + protected def errorDataStreamReaderScript: String = +""" + |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition + | + |class ErrorDataStreamReader(DataSourceStreamReader): + |def initialOffset(self): + |raise Exception("error reading initial offset") + |def latestOffset(self): + |raise Exception("error reading latest offset") + |def partitions(self, start: dict, end: dict): + |raise Exception("error planning partitions") + |def commit(self, end: dict): + |raise Exception("error committing offset") + |def read(self, partition): + |yield (0, partition.value) + |yield (1, partition.value) + |yield (2, partition.value) + |""".stripMargin + + private val errorDataSourceName = "ErrorDataSource" + + test("simple data stream source") { +assume(shouldTestPandasUDFs) +val dataSourceScript = + s""" + |from pyspark.sql.datasource import DataSource + |$simpleDataStreamReaderScript + | + |class $dataSourceName(DataSource): + |def streamReader(self, schema): + |return SimpleDataStreamReader() + |""".stripMargin +val inputSchema = StructType.fromDDL("input BINARY") + +val dataSource = createUserDefinedPythonDataSource(dataSourceName, dataSourceScript) +spark.dataSource.registerPython(dataSourceName, dataSource) +val pythonDs = new PythonDataSourceV2 +pythonDs.setShortName("SimpleDataSource") +val stream = new PythonMicroBatchStream( + pythonDs, dataSourceName, inputSchema, CaseInsensitiveStringMap.empty()) + +val initialOffset = stream.initialOffset() +assert(initialOffset.json == "{\"offset\": \"0\"}") +for (_ <- 1 to 50) { + val offset = stream.latestOffset() + assert(offset.json == "{\"offset\": \"2\"}") + assert(stream.planInputPartitions(offset, offset).size == 2) + stream.commit(offset) +} +stream.stop() Review Comment: read will be implemented in the next PR, it is executed in executor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr..
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1491464124 ## python/pyspark/sql/streaming/python_streaming_source_runner.py: ## @@ -0,0 +1,159 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import json +from typing import IO + +from pyspark.accumulators import _accumulatorRegistry +from pyspark.errors import PySparkAssertionError, PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.serializers import ( +read_int, +write_int, +write_with_length, +SpecialLengths, +) +from pyspark.sql.datasource import DataSource +from pyspark.sql.types import ( +_parse_datatype_json_string, +StructType, +) +from pyspark.util import handle_worker_exception +from pyspark.worker_util import ( +check_python_version, +read_command, +pickleSer, +send_accumulator_updates, +setup_memory_limits, +setup_spark_files, +utf8_deserializer, +) + +initial_offset_func_id = 884 +latest_offset_func_id = 885 +partitions_func_id = 886 +commit_func_id = 887 + + +def initial_offset_func(reader, outfile): +offset = reader.initialOffset() Review Comment: We throw unimplemented error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1491463836 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def latestOffset(self) -> dict: +""" +Returns the most recent offset available. + +Returns +--- +dict +A dict whose key and values are str type. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "latestOffset"}, +) + +def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]: +""" +Returns a list of InputPartition given the start and end offsets. Each InputPartition +represents a data split that can be processed by one Spark task. + +Parameters +-- +start : dict +The start offset of the microbatch to plan partitioning. +end : dict +The end offset of the microbatch to plan partitioning. + +Returns +--- +Sequence[InputPartition] +A sequence of partitions for this data source. Each partition value +must be an instance of `InputPartition` or a subclass of it. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "partitions"}, +) + +@abstractmethod +def read(self, partition) -> Iterator[Union[Tuple, Row]]: +""" +Generates data for a given partition and returns an iterator of tuples or rows. + +This method is invoked once per partition to read the data. Implementing +this method is required for stream reader. You can initialize any +non-serializable resources required for reading data from the data source +within this method. +This method is static and stateless. You shouldn't access mutable class member +or keep in memory state between different invocations of read(). + +Parameters +-- +partition : object +The partition to read. It must be one of the partition values returned by +``partitions()``. + +Returns +--- +Iterator[Tuple] or Iterator[Row] +An iterator of tuples or rows. Each tuple or row will be converted to a row +in the final DataFrame. +""" +... + +def commit(self, end: dict): Review Comment: This function is not supposed to return anything, how do we express that in python? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
allisonwang-db commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490557068 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict Review Comment: Nit: Dict[str, str]? Is this case sensitive or case insensitive? ## python/pyspark/sql/streaming/python_streaming_source_runner.py: ## @@ -0,0 +1,159 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import json +from typing import IO + +from pyspark.accumulators import _accumulatorRegistry +from pyspark.errors import PySparkAssertionError, PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.serializers import ( +read_int, +write_int, +write_with_length, +SpecialLengths, +) +from pyspark.sql.datasource import DataSource +from pyspark.sql.types import ( +_parse_datatype_json_string, +StructType, +) +from pyspark.util import handle_worker_exception +from pyspark.worker_util import ( +check_python_version, +read_command, +pickleSer, +send_accumulator_updates, +setup_memory_limits, +setup_spark_files, +utf8_deserializer, +) + +initial_offset_func_id = 884 +latest_offset_func_id = 885 +partitions_func_id = 886 +commit_func_id = 887 + + +def initial_offset_func(reader, outfile): +offset = reader.initialOffset() Review Comment: What if the initialOffset is not implemented? ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def latestOffset(self) -> dict: +""" +Returns the most recent offset available. + +Returns Review Comment: ditto for examples ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,117 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + Review Comment: Could you also provide an example of what the dictionary looks like? ``` Examples - ... ``` ## python/pys
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490256089 ## python/pyspark/sql/streaming/python_streaming_source_runner.py: ## @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import functools +import json +from itertools import islice +from typing import IO, List, Iterator, Iterable + +from pyspark.accumulators import _accumulatorRegistry +from pyspark.errors import PySparkAssertionError, PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.serializers import ( +read_int, +write_int, +write_with_length, +SpecialLengths, +) +from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, LocalDataToArrowConversion +from pyspark.sql.datasource import DataSource, InputPartition +from pyspark.sql.pandas.types import to_arrow_schema +from pyspark.sql.types import ( +_parse_datatype_json_string, +BinaryType, +StructType, +) +from pyspark.util import handle_worker_exception +from pyspark.worker_util import ( +check_python_version, +read_command, +pickleSer, +send_accumulator_updates, +setup_broadcasts, +setup_memory_limits, +setup_spark_files, +utf8_deserializer, +) + +initial_offset_func_id = 884 +latest_offset_func_id = 885 +partitions_func_id = 886 +commit_func_id = 887 + + +def initial_offset_func(reader, outfile): +offset = reader.initialOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def latest_offset_func(reader, outfile): +offset = reader.latestOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def partitions_func(reader, infile, outfile): +start_offset = json.loads(utf8_deserializer.loads(infile)) +end_offset = json.loads(utf8_deserializer.loads(infile)) +partitions = reader.partitions(start_offset, end_offset) +# Return the serialized partition values. +write_int(len(partitions), outfile) +for partition in partitions: +pickleSer._write_with_length(partition, outfile) + + +def commit_func(reader, infile, outfile): +end_offset = json.loads(utf8_deserializer.loads(infile)) +reader.commit(end_offset) +write_int(0, outfile) + + +def main(infile: IO, outfile: IO) -> None: +try: +check_python_version(infile) +setup_spark_files(infile) + +memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1")) +setup_memory_limits(memory_limit_mb) + +_accumulatorRegistry.clear() + +# Receive the data source instance. +data_source = read_command(pickleSer, infile) + +if not isinstance(data_source, DataSource): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "a Python data source instance of type 'DataSource'", +"actual": f"'{type(data_source).__name__}'", +}, +) + +# Receive the data source output schema. +schema_json = utf8_deserializer.loads(infile) Review Comment: We only use data source schema when user doesn't specify schema. https://github.com/apache/spark/blob/736d8ab3f00e7c5ba1b01c22f6398b636b8492ea/python/pyspark/sql/worker/create_data_source.py#L144 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490215001 ## sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, shouldTestPandasUDFs} +import org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, PythonMicroBatchStream, PythonStreamingSourceOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { + + protected def simpleDataStreamReaderScript: String = +""" + |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition + | + |class SimpleDataStreamReader(DataSourceStreamReader): + |def initialOffset(self): + |return {"0": "2"} + |def latestOffset(self): + |return {"0": "2"} Review Comment: Nice suggestion, changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490214832 ## sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, shouldTestPandasUDFs} +import org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, PythonMicroBatchStream, PythonStreamingSourceOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { + + protected def simpleDataStreamReaderScript: String = +""" + |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition + | + |class SimpleDataStreamReader(DataSourceStreamReader): + |def initialOffset(self): + |return {"0": "2"} + |def latestOffset(self): + |return {"0": "2"} + |def partitions(self, start: dict, end: dict): + |return [InputPartition(i) for i in range(int(start["0"]))] + |def commit(self, end: dict): + |1 + 2 + |def read(self, partition): + |yield (0, partition.value) + |yield (1, partition.value) + |yield (2, partition.value) + |""".stripMargin + + protected def errorDataStreamReaderScript: String = +""" + |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition + | + |class ErrorDataStreamReader(DataSourceStreamReader): + |def initialOffset(self): + |raise Exception("error reading initial offset") + |def latestOffset(self): + |raise Exception("error reading latest offset") + |def partitions(self, start: dict, end: dict): + |raise Exception("error planning partitions") + |def commit(self, end: dict): + |raise Exception("error committing offset") + |def read(self, partition): + |yield (0, partition.value) + |yield (1, partition.value) + |yield (2, partition.value) + |""".stripMargin + + private val errorDataSourceName = "ErrorDataSource" + + test("simple data stream source") { +assume(shouldTestPandasUDFs) +val dataSourceScript = + s""" + |from pyspark.sql.datasource import DataSource + |$simpleDataStreamReaderScript + | + |class $dataSourceName(DataSource): + |def streamReader(self, schema): + |return SimpleDataStreamReader() + |""".stripMargin +val inputSchema = StructType.fromDDL("input BINARY") + +val dataSource = createUserDefinedPythonDataSource(dataSourceName, dataSourceScript) +spark.dataSource.registerPython(dataSourceName, dataSource) +val pythonDs = new PythonDataSourceV2 +pythonDs.setShortName("SimpleDataSource") +val stream = new PythonMicroBatchStream( + pythonDs, dataSourceName, inputSchema, CaseInsensitiveStringMap.empty()) + +val initialOffset = stream.initialOffset() +assert(initialOffset.json == "{\"0\": \"2\"}") +for (_ <- 1 to 50) { + val offset = stream.latestOffset() + assert(offset.json == "{\"0\": \"2\"}") + assert(stream.planInputPartitions(offset, offset).size == 2) + stream.commit(offset) +} +stream.stop() + } + + test("Error creating stream reader") { +assume(shouldTestPandasUDFs) +val dataSourceScript = + s""" + |from pyspark.sql.datasource import DataSource + |class $dataSourceName(DataSource): + |def streamReader(self, schema): + |raise Exception("error creating stream reader") + |""".stripMargin +val dataSource = createUserDefinedPythonDataSource( + name = dataSourceName, pythonScript = dataSourceScript) +spark.dataSource.registerPython(dataSourceName, dataSourc
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490214280 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + val initialOffsetFuncId = 884 + val latestOffsetFuncId = 885 + val partitionsFuncId = 886 + val commitFuncId = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + protected val bufferSize: Int = conf.get(BUFFER_SIZE) + protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + protected val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStream, bufferSize) +dataOut = new DataOutputStream(stream) + +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + +val pythonIncludes = func.pythonIncludes.asScala.toSet +PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, dataOut) + +// Send the user function to python process +PythonWorkerUtils.writePythonFunction(func, dataOut) + +// Send output schema +PythonWorkerUtils.writeUTF(outputSchema.json, dataOut) + +// Send configurations +dataOut.writeInt(SQLConf.get.arrowMaxRecordsPerBatch) +dataOut.flush() + +dataIn = new DataInputStream( + new BufferedInputStream(pytho
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490213270 ## python/pyspark/sql/streaming/python_streaming_source_runner.py: ## @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import functools +import json +from itertools import islice +from typing import IO, List, Iterator, Iterable + +from pyspark.accumulators import _accumulatorRegistry +from pyspark.errors import PySparkAssertionError, PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.serializers import ( +read_int, +write_int, +write_with_length, +SpecialLengths, +) +from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, LocalDataToArrowConversion +from pyspark.sql.datasource import DataSource, InputPartition +from pyspark.sql.pandas.types import to_arrow_schema +from pyspark.sql.types import ( +_parse_datatype_json_string, +BinaryType, +StructType, +) +from pyspark.util import handle_worker_exception +from pyspark.worker_util import ( +check_python_version, +read_command, +pickleSer, +send_accumulator_updates, +setup_broadcasts, +setup_memory_limits, +setup_spark_files, +utf8_deserializer, +) + +initial_offset_func_id = 884 +latest_offset_func_id = 885 +partitions_func_id = 886 +commit_func_id = 887 + + +def initial_offset_func(reader, outfile): +offset = reader.initialOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def latest_offset_func(reader, outfile): +offset = reader.latestOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def partitions_func(reader, infile, outfile): +start_offset = json.loads(utf8_deserializer.loads(infile)) +end_offset = json.loads(utf8_deserializer.loads(infile)) +partitions = reader.partitions(start_offset, end_offset) +# Return the serialized partition values. +write_int(len(partitions), outfile) +for partition in partitions: +pickleSer._write_with_length(partition, outfile) + + +def commit_func(reader, infile, outfile): +end_offset = json.loads(utf8_deserializer.loads(infile)) +reader.commit(end_offset) +write_int(0, outfile) + + +def main(infile: IO, outfile: IO) -> None: +try: +check_python_version(infile) +setup_spark_files(infile) + +memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1")) +setup_memory_limits(memory_limit_mb) + +_accumulatorRegistry.clear() + +# Receive the data source instance. +data_source = read_command(pickleSer, infile) + +if not isinstance(data_source, DataSource): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "a Python data source instance of type 'DataSource'", +"actual": f"'{type(data_source).__name__}'", +}, +) + +# Receive the data source output schema. +schema_json = utf8_deserializer.loads(infile) +schema = _parse_datatype_json_string(schema_json) +if not isinstance(schema, StructType): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "an output schema of type 'StructType'", +"actual": f"'{type(schema).__name__}'", +}, +) + +# Receive the configuration values. Review Comment: Removed. It is no longer necessary because the pickled read function will be created from another python worker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spa
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490211705 ## python/pyspark/sql/streaming/python_streaming_source_runner.py: ## @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import functools +import json +from itertools import islice +from typing import IO, List, Iterator, Iterable + +from pyspark.accumulators import _accumulatorRegistry +from pyspark.errors import PySparkAssertionError, PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.serializers import ( +read_int, +write_int, +write_with_length, +SpecialLengths, +) +from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, LocalDataToArrowConversion +from pyspark.sql.datasource import DataSource, InputPartition +from pyspark.sql.pandas.types import to_arrow_schema +from pyspark.sql.types import ( +_parse_datatype_json_string, +BinaryType, +StructType, +) +from pyspark.util import handle_worker_exception +from pyspark.worker_util import ( +check_python_version, +read_command, +pickleSer, +send_accumulator_updates, +setup_broadcasts, +setup_memory_limits, +setup_spark_files, +utf8_deserializer, +) + +initial_offset_func_id = 884 +latest_offset_func_id = 885 +partitions_func_id = 886 +commit_func_id = 887 + + +def initial_offset_func(reader, outfile): +offset = reader.initialOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def latest_offset_func(reader, outfile): +offset = reader.latestOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def partitions_func(reader, infile, outfile): +start_offset = json.loads(utf8_deserializer.loads(infile)) +end_offset = json.loads(utf8_deserializer.loads(infile)) +partitions = reader.partitions(start_offset, end_offset) +# Return the serialized partition values. +write_int(len(partitions), outfile) +for partition in partitions: +pickleSer._write_with_length(partition, outfile) + + +def commit_func(reader, infile, outfile): +end_offset = json.loads(utf8_deserializer.loads(infile)) +reader.commit(end_offset) +write_int(0, outfile) + + +def main(infile: IO, outfile: IO) -> None: +try: +check_python_version(infile) +setup_spark_files(infile) + +memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1")) +setup_memory_limits(memory_limit_mb) + +_accumulatorRegistry.clear() + +# Receive the data source instance. +data_source = read_command(pickleSer, infile) + +if not isinstance(data_source, DataSource): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "a Python data source instance of type 'DataSource'", +"actual": f"'{type(data_source).__name__}'", +}, +) + +# Receive the data source output schema. +schema_json = utf8_deserializer.loads(infile) +schema = _parse_datatype_json_string(schema_json) +if not isinstance(schema, StructType): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "an output schema of type 'StructType'", +"actual": f"'{type(schema).__name__}'", +}, +) + +# Receive the configuration values. +max_arrow_batch_size = read_int(infile) +assert max_arrow_batch_size > 0, ( +"The maximum arrow batch size should be greater than 0, but got " +f"'{max_arrow_batch_size}'" +) + +# Instantiate data source reader. +try: +reader = data_source.streamReader(schema=schema) +# Initialization succeed. +write_int(0, outfile) +outfile.flush() + +# handle method call from socket +while True: +func_id = read_int(infile) +
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490134034 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: UPDATE: we are discussing offline now and it could take time - this is tied to the interface and crazily uneasy to change after it is shipped. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490111896 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: That essentially means we need to add the deserializeOffset() interface to the StreamReader for deserialization. Or is there any alternative? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490109817 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: Maybe yes. I'm OK with dict with clearly explaining which type they can use as value (flipping the coin we need to still restrict the types as we have to convert this to json), but probably less headache to let data source implementation to take care of themselves. ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: Maybe yes. I'm OK with dict with clearly explaining which type they can use as value (flipping the coin we need to still restrict the available types as we have to convert this to json), but probably less headache to let data source implementation to take care of themselves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490109817 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: Maybe yes. I'm OK with dict with clearly explaining which type they can use as value, but probably less headache to let data source implementation to take care of themselves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490107780 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: So data source implementation provides serde between its offset model <-> json, and python worker will handle it with serde being provided. For python worker <-> JVM, json is used. Does this make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490106637 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: Does that mean we also need to add deserializeOffset() to the python interface? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490104863 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: We expect data source to know about its offset model - we pass a json format of offset to specific data source, which is expected to be deserialized successfully to its offset model. It will come to the runtime error if the json format of offset and offset model aren't compatible, which I think Scala side is doing the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1489903039 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: The serialization is easy, but deserialization will be tricky. We don't have runtime type information to deserialize a plain json text back to python object.(because we don't even know what python type the json is serialized from unless we keep extra type information in the json) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1489279083 ## python/pyspark/sql/streaming/python_streaming_source_runner.py: ## @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import functools +import json +from itertools import islice +from typing import IO, List, Iterator, Iterable + +from pyspark.accumulators import _accumulatorRegistry +from pyspark.errors import PySparkAssertionError, PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.serializers import ( +read_int, +write_int, +write_with_length, +SpecialLengths, +) +from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, LocalDataToArrowConversion +from pyspark.sql.datasource import DataSource, InputPartition +from pyspark.sql.pandas.types import to_arrow_schema +from pyspark.sql.types import ( +_parse_datatype_json_string, +BinaryType, +StructType, +) +from pyspark.util import handle_worker_exception +from pyspark.worker_util import ( +check_python_version, +read_command, +pickleSer, +send_accumulator_updates, +setup_broadcasts, +setup_memory_limits, +setup_spark_files, +utf8_deserializer, +) + +initial_offset_func_id = 884 +latest_offset_func_id = 885 +partitions_func_id = 886 +commit_func_id = 887 + + +def initial_offset_func(reader, outfile): +offset = reader.initialOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def latest_offset_func(reader, outfile): +offset = reader.latestOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def partitions_func(reader, infile, outfile): +start_offset = json.loads(utf8_deserializer.loads(infile)) +end_offset = json.loads(utf8_deserializer.loads(infile)) +partitions = reader.partitions(start_offset, end_offset) +# Return the serialized partition values. +write_int(len(partitions), outfile) +for partition in partitions: +pickleSer._write_with_length(partition, outfile) + + +def commit_func(reader, infile, outfile): +end_offset = json.loads(utf8_deserializer.loads(infile)) +reader.commit(end_offset) +write_int(0, outfile) + + +def main(infile: IO, outfile: IO) -> None: +try: +check_python_version(infile) +setup_spark_files(infile) + +memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1")) +setup_memory_limits(memory_limit_mb) + +_accumulatorRegistry.clear() + +# Receive the data source instance. +data_source = read_command(pickleSer, infile) + +if not isinstance(data_source, DataSource): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "a Python data source instance of type 'DataSource'", +"actual": f"'{type(data_source).__name__}'", +}, +) + +# Receive the data source output schema. +schema_json = utf8_deserializer.loads(infile) Review Comment: What is the expected behavior if user does not specify the schema and relies on the schema of data source, which we say, inference (which is mostly applied to the majority of data sources)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1489259159 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: I just realized this is too limited - there are data sources which has an offset structure as complex model, e.g. offset model consists of a list containing case class instance as element. It probably couldn't be bound to flattened dictionary. Shall we revisit the design for the structure of offset? Probably the closest approach from proposed change to do is to simply allow dict, with noticing that we are serializing dict to json via following approach underneath (hence the dict has to be compatible). https://docs.python.org/3/library/json.html#py-to-json-table Or just introduce an interface for Offset which is expected to handle json serde by itself. We could probably document the example; how to implement json serde for the complex type of offset. ## python/pyspark/sql/streaming/python_streaming_source_runner.py: ## @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import functools +import json +from itertools import islice +from typing import IO, List, Iterator, Iterable + +from pyspark.accumulators import _accumulatorRegistry +from pyspark.errors import PySparkAssertionError, PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.serializers import ( +read_int, +write_int, +write_with_length, +SpecialLengths, +) +from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, LocalDataToArrowConversion +from pyspark.sql.datasource import DataSource, InputPartition +from pyspark.sql.pandas.types import to_arrow_schema +from pyspark.sql.types import ( +_parse_datatype_json_string, +BinaryType, +StructType, +) +from pyspark.util import handle_worker_exception +from pyspark.worker_util import ( +check_python_version, +read_command, +pickleSer, +send_accumulator_updates, +setup_broadcasts, +setup_memory_limits, +setup_spark_files, +utf8_deserializer, +) + +initial_offset_func_id = 884 +latest_offset_func_id = 885 +partitions_func_id = 886 +commit_func_id = 887 + + +def initial_offset_func(reader, outfile): +offset = reader.initialOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def latest_offset_func(reader, outfile): +offset = reader.latestOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def partitions_func(reader, infile, outfile): +start_offset = json.loads(utf8_deserializer.loads(infile)) +end_offset = json.loads(utf8_deserializer.loads(infile)) +partitions = reader.partitions(start_offset, end_offset) +# Return the serialized partition values. +write_int(len(partitions), outfile) +for partition in partitions: +pickleSer._write_with_length(partition, outfile) + + +def commit_func(reader, infile, outfile): +end_offset = json.loads(utf8_deserializer.loads(infile)) +reader.commit(end_offset) +write_int(0, outfile) + + +def main(infile: IO, outfile: IO) -> None: +try: +check_python_version(infile) +setup_spark_files(infile) + +memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1")) +setup_memory_limits(memory_limit_mb) + +_accumulatorRegistry.cl
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on PR #45023: URL: https://github.com/apache/spark/pull/45023#issuecomment-1943113395 Could you please check the GA build result and fix accordingly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org