This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new abf8770ffac7 [SPARK-46994][PYTHON] Refactor PythonWrite to prepare for 
supporting python data source streaming write
abf8770ffac7 is described below

commit abf8770ffac7ac5f4dcd5b7b94b744b0267b34d9
Author: Chaoqin Li <chaoqin...@databricks.com>
AuthorDate: Thu Feb 8 12:16:49 2024 +0900

    [SPARK-46994][PYTHON] Refactor PythonWrite to prepare for supporting python 
data source streaming write
    
    ### What changes were proposed in this pull request?
    Move PythonBatchWrite out of PythonWrite.
    
    ### Why are the changes needed?
    This is to prepare for supporting python data source streaming write in the 
future.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Trivial code refactoring, existing test sufficient.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45049 from chaoqin-li1123/python_sink.
    
    Authored-by: Chaoqin Li <chaoqin...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../datasources/v2/python/PythonWrite.scala        | 34 +++++++++++++---------
 1 file changed, 21 insertions(+), 13 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonWrite.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonWrite.scala
index d216dfde9974..a10a18e43f64 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonWrite.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonWrite.scala
@@ -26,15 +26,32 @@ class PythonWrite(
     shortName: String,
     info: LogicalWriteInfo,
     isTruncate: Boolean
-  ) extends Write with BatchWrite {
-  private[this] val jobArtifactUUID = 
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+  ) extends Write {
+
+  override def toString: String = shortName
+
+  override def toBatch: BatchWrite = new PythonBatchWrite(ds, shortName, info, 
isTruncate)
+
+  override def description: String = "(Python)"
+
+  override def supportedCustomMetrics(): Array[CustomMetric] =
+    ds.source.createPythonMetrics()
+}
+
+class PythonBatchWrite(
+    ds: PythonDataSourceV2,
+    shortName: String,
+    info: LogicalWriteInfo,
+    isTruncate: Boolean
+  ) extends BatchWrite {
 
   // Store the pickled data source writer instance.
   private var pythonDataSourceWriter: Array[Byte] = _
 
-  override def createBatchWriterFactory(
-      physicalInfo: PhysicalWriteInfo): DataWriterFactory = {
+  private[this] val jobArtifactUUID = 
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
 
+  override def createBatchWriterFactory(physicalInfo: PhysicalWriteInfo): 
DataWriterFactory =
+  {
     val writeInfo = ds.source.createWriteInfoInPython(
       shortName,
       info.schema(),
@@ -53,13 +70,4 @@ class PythonWrite(
   override def abort(messages: Array[WriterCommitMessage]): Unit = {
     ds.source.commitWriteInPython(pythonDataSourceWriter, messages, abort = 
true)
   }
-
-  override def toString: String = shortName
-
-  override def toBatch: BatchWrite = this
-
-  override def description: String = "(Python)"
-
-  override def supportedCustomMetrics(): Array[CustomMetric] =
-    ds.source.createPythonMetrics()
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to