This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new abf8770ffac7 [SPARK-46994][PYTHON] Refactor PythonWrite to prepare for supporting python data source streaming write abf8770ffac7 is described below commit abf8770ffac7ac5f4dcd5b7b94b744b0267b34d9 Author: Chaoqin Li <chaoqin...@databricks.com> AuthorDate: Thu Feb 8 12:16:49 2024 +0900 [SPARK-46994][PYTHON] Refactor PythonWrite to prepare for supporting python data source streaming write ### What changes were proposed in this pull request? Move PythonBatchWrite out of PythonWrite. ### Why are the changes needed? This is to prepare for supporting python data source streaming write in the future. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Trivial code refactoring, existing test sufficient. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45049 from chaoqin-li1123/python_sink. Authored-by: Chaoqin Li <chaoqin...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../datasources/v2/python/PythonWrite.scala | 34 +++++++++++++--------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonWrite.scala index d216dfde9974..a10a18e43f64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonWrite.scala @@ -26,15 +26,32 @@ class PythonWrite( shortName: String, info: LogicalWriteInfo, isTruncate: Boolean - ) extends Write with BatchWrite { - private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid) + ) extends Write { + + override def toString: String = shortName + + override def toBatch: BatchWrite = new PythonBatchWrite(ds, shortName, info, isTruncate) + + override def description: String = "(Python)" + + override def supportedCustomMetrics(): Array[CustomMetric] = + ds.source.createPythonMetrics() +} + +class PythonBatchWrite( + ds: PythonDataSourceV2, + shortName: String, + info: LogicalWriteInfo, + isTruncate: Boolean + ) extends BatchWrite { // Store the pickled data source writer instance. private var pythonDataSourceWriter: Array[Byte] = _ - override def createBatchWriterFactory( - physicalInfo: PhysicalWriteInfo): DataWriterFactory = { + private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid) + override def createBatchWriterFactory(physicalInfo: PhysicalWriteInfo): DataWriterFactory = + { val writeInfo = ds.source.createWriteInfoInPython( shortName, info.schema(), @@ -53,13 +70,4 @@ class PythonWrite( override def abort(messages: Array[WriterCommitMessage]): Unit = { ds.source.commitWriteInPython(pythonDataSourceWriter, messages, abort = true) } - - override def toString: String = shortName - - override def toBatch: BatchWrite = this - - override def description: String = "(Python)" - - override def supportedCustomMetrics(): Array[CustomMetric] = - ds.source.createPythonMetrics() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org