This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 166d41a8f444 [SPARK-51147][SS] Refactor streaming related classes to a 
dedicated streaming directory
166d41a8f444 is described below

commit 166d41a8f4441d55c75e747107b3d6517dad1e8c
Author: bogao007 <[email protected]>
AuthorDate: Tue Feb 11 15:16:21 2025 +0900

    [SPARK-51147][SS] Refactor streaming related classes to a dedicated 
streaming directory
    
    ### What changes were proposed in this pull request?
    
    Refactor streaming classes under 
`sql/core/src/main/scala/org/apache/spark/sql/execution/python` to 
`sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming`.
    
    ### Why are the changes needed?
    
    Better organization for streaming related code base.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49867 from bogao007/streaming-python-dir.
    
    Authored-by: bogao007 <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
    (cherry picked from commit 394458fc1fa8d837fe65b6a6eef99191e08353fd)
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 python/pyspark/sql/streaming/readwriter.py                         | 3 ++-
 .../org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala | 3 ++-
 .../scala/org/apache/spark/sql/execution/SparkStrategies.scala     | 3 ++-
 .../execution/datasources/v2/python/PythonMicroBatchStream.scala   | 3 +--
 .../{ => streaming}/ApplyInPandasWithStatePythonRunner.scala       | 7 ++++---
 .../python/{ => streaming}/ApplyInPandasWithStateWriter.scala      | 2 +-
 .../python/{ => streaming}/BaseStreamingArrowWriter.scala          | 2 +-
 .../{ => streaming}/FlatMapGroupsInPandasWithStateExec.scala       | 3 ++-
 .../sql/execution/python/{ => streaming}/PythonForeachWriter.scala | 3 ++-
 .../python/{ => streaming}/PythonStreamingSourceRunner.scala       | 2 +-
 .../{ => streaming}/TransformWithStateInPandasDeserializer.scala   | 2 +-
 .../python/{ => streaming}/TransformWithStateInPandasExec.scala    | 3 ++-
 .../{ => streaming}/TransformWithStateInPandasPythonRunner.scala   | 5 +++--
 .../{ => streaming}/TransformWithStateInPandasStateServer.scala    | 2 +-
 .../spark/sql/execution/streaming/IncrementalExecution.scala       | 2 +-
 .../spark/sql/execution/streaming/sources/ForeachWriterTable.scala | 2 +-
 .../python/{ => streaming}/BaseStreamingArrowWriterSuite.scala     | 2 +-
 .../python/{ => streaming}/PythonForeachWriterSuite.scala          | 4 ++--
 .../python/{ => streaming}/PythonStreamingDataSourceSuite.scala    | 3 ++-
 .../TransformWithStateInPandasStateServerSuite.scala               | 2 +-
 .../FlatMapGroupsInPandasWithStateDistributionSuite.scala          | 2 +-
 .../spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala  | 2 +-
 22 files changed, 35 insertions(+), 27 deletions(-)

diff --git a/python/pyspark/sql/streaming/readwriter.py 
b/python/pyspark/sql/streaming/readwriter.py
index 69282dce37af..34af8cd9b070 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -1562,7 +1562,8 @@ class DataStreamWriter:
         wrapped_func = _wrap_function(self._spark._sc, func, serializer, 
serializer)
         assert self._spark._sc._jvm is not None
         jForeachWriter = getattr(
-            self._spark._sc._jvm, 
"org.apache.spark.sql.execution.python.PythonForeachWriter"
+            self._spark._sc._jvm,
+            
"org.apache.spark.sql.execution.python.streaming.PythonForeachWriter",
         )(wrapped_func, self._df._jdf.schema())
         self._jwrite.foreach(jForeachWriter)
         return self
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 6e5298537ab4..f8fd56f38f9b 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -75,7 +75,8 @@ import 
org.apache.spark.sql.execution.command.{CreateViewCommand, ExternalComman
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, 
JDBCPartition, JDBCRelation}
 import 
org.apache.spark.sql.execution.datasources.v2.python.UserDefinedPythonDataSource
-import org.apache.spark.sql.execution.python.{PythonForeachWriter, 
UserDefinedPythonFunction, UserDefinedPythonTableFunction}
+import org.apache.spark.sql.execution.python.{UserDefinedPythonFunction, 
UserDefinedPythonTableFunction}
+import org.apache.spark.sql.execution.python.streaming.PythonForeachWriter
 import org.apache.spark.sql.execution.stat.StatFunctions
 import 
org.apache.spark.sql.execution.streaming.GroupStateImpl.groupStateTimeoutFromString
 import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 7d48d39109b9..89f86c347568 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{WriteFiles, WriteFilesExec}
 import org.apache.spark.sql.execution.exchange.{REBALANCE_PARTITIONS_BY_COL, 
REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, REPARTITION_BY_NUM, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.python._
+import 
org.apache.spark.sql.execution.python.streaming.{FlatMapGroupsInPandasWithStateExec,
 TransformWithStateInPandasExec}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.MemoryPlan
 import org.apache.spark.sql.internal.SQLConf
@@ -828,7 +829,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case FlatMapGroupsInPandasWithState(
         func, groupAttr, outputAttr, stateType, outputMode, timeout, child) =>
         val stateVersion = 
conf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)
-        val execPlan = python.FlatMapGroupsInPandasWithStateExec(
+        val execPlan = FlatMapGroupsInPandasWithStateExec(
           func, groupAttr, outputAttr, stateType, None, stateVersion, 
outputMode, timeout,
           batchTimestampMs = None, eventTimeWatermarkForLateEvents = None,
           eventTimeWatermarkForEviction = None,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala
index 0fc1df4cd1e9..b3ecfc8bb7f7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala
@@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
 import org.apache.spark.sql.connector.read.streaming.{AcceptsLatestSeenOffset, 
MicroBatchStream, Offset}
 import 
org.apache.spark.sql.execution.datasources.v2.python.PythonMicroBatchStream.nextStreamId
-import org.apache.spark.sql.execution.python.PythonStreamingSourceRunner
+import 
org.apache.spark.sql.execution.python.streaming.PythonStreamingSourceRunner
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{PythonStreamBlockId, StorageLevel}
@@ -120,4 +120,3 @@ object PythonMicroBatchStream {
     currentId
   }
 }
-
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
similarity index 95%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
index 8832ac1ea5cb..193ec03841bd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import java.io._
 
@@ -36,8 +36,9 @@ import 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.execution.metric.SQLMetric
-import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{COUNT_COLUMN_SCHEMA_FROM_PYTHON_WORKER,
 InType, OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
-import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
+import org.apache.spark.sql.execution.python.{PythonArrowInput, 
PythonArrowOutput, PythonUDFRunner}
+import 
org.apache.spark.sql.execution.python.streaming.ApplyInPandasWithStatePythonRunner.{COUNT_COLUMN_SCHEMA_FROM_PYTHON_WORKER,
 InType, OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
+import 
org.apache.spark.sql.execution.python.streaming.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
 import org.apache.spark.sql.execution.streaming.GroupStateImpl
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStateWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStateWriter.scala
similarity index 99%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStateWriter.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStateWriter.scala
index db49be7fd99f..6bfa5440db37 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStateWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStateWriter.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import scala.jdk.CollectionConverters._
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BaseStreamingArrowWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriter.scala
similarity index 98%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BaseStreamingArrowWriter.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriter.scala
index 303389cee096..73c70a618866 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BaseStreamingArrowWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriter.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import org.apache.arrow.vector.VectorSchemaRoot
 import org.apache.arrow.vector.ipc.ArrowStreamWriter
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/FlatMapGroupsInPandasWithStateExec.scala
similarity index 98%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/FlatMapGroupsInPandasWithStateExec.scala
index 76bb16443662..e53bdae813d2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/FlatMapGroupsInPandasWithStateExec.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import org.apache.spark.{JobArtifactSet, SparkException, 
SparkUnsupportedOperationException, TaskContext}
 import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
@@ -26,6 +26,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, Processing
 import org.apache.spark.sql.catalyst.plans.physical.Distribution
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.execution.python.ArrowPythonRunner
 import org.apache.spark.sql.execution.python.PandasGroupUtils.resolveArgOffsets
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
similarity index 98%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
index 076bc72c431f..04c51c859bac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import java.io.File
 import java.util.concurrent.TimeUnit
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.sql.ForeachWriter
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.python.{EvaluatePython, HybridRowQueue}
 import 
org.apache.spark.sql.execution.streaming.sources.ForeachUserFuncException
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
similarity index 99%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
index 11ab706e8abb..89273b7bc80f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
@@ -16,7 +16,7 @@
  */
 
 
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasDeserializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasDeserializer.scala
similarity index 97%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasDeserializer.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasDeserializer.scala
index 82d4978853cb..1a8ffb35c053 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasDeserializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasDeserializer.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import java.io.DataInputStream
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasExec.scala
similarity index 99%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasExec.scala
index c5a2a54c3f6a..e77035e31ccb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasExec.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import java.util.UUID
 
@@ -34,6 +34,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.Distribution
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.execution.{BinaryExecNode, CoGroupedIterator, 
SparkPlan}
 import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.execution.python.ArrowPythonRunner
 import org.apache.spark.sql.execution.python.PandasGroupUtils.{executePython, 
groupAndProject, resolveArgOffsets}
 import 
org.apache.spark.sql.execution.streaming.{DriverStatefulProcessorHandleImpl, 
StatefulOperatorCustomMetric, StatefulOperatorCustomSumMetric, 
StatefulOperatorPartitioning, StatefulOperatorStateInfo, 
StatefulProcessorHandleImpl, StateStoreWriter, TransformWithStateMetadataUtils, 
TransformWithStateVariableInfo, WatermarkSupport}
 import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.StateStoreAwareZipPartitionsHelper
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasPythonRunner.scala
similarity index 97%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasPythonRunner.scala
index f415ae2543d3..c343f0b9f1e9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasPythonRunner.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import java.io.{DataInputStream, DataOutputStream}
 import java.net.ServerSocket
@@ -30,7 +30,8 @@ import org.apache.spark.api.python.{BasePythonRunner, 
ChainedPythonFunctions, Py
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.metric.SQLMetric
-import 
org.apache.spark.sql.execution.python.TransformWithStateInPandasPythonRunner.{GroupedInType,
 InType}
+import org.apache.spark.sql.execution.python.{BasicPythonArrowOutput, 
PythonArrowInput, PythonUDFRunner}
+import 
org.apache.spark.sql.execution.python.streaming.TransformWithStateInPandasPythonRunner.{GroupedInType,
 InType}
 import 
org.apache.spark.sql.execution.streaming.{DriverStatefulProcessorHandleImpl, 
StatefulProcessorHandleImpl}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServer.scala
similarity index 99%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServer.scala
index 1264616433cc..f665db8b5b12 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServer.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream, EOFException}
 import java.net.ServerSocket
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 8fe6178744ac..f08268ec577c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.{LocalLimitExec, 
QueryExecution, Serialize
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
MergingSessionsExec, ObjectHashAggregateExec, SortAggregateExec, 
UpdatingSessionsExec}
 import 
org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataPartitionReader
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
-import 
org.apache.spark.sql.execution.python.{FlatMapGroupsInPandasWithStateExec, 
TransformWithStateInPandasExec}
+import 
org.apache.spark.sql.execution.python.streaming.{FlatMapGroupsInPandasWithStateExec,
 TransformWithStateInPandasExec}
 import 
org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1
 import 
org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataReader, 
OperatorStateMetadataV1, OperatorStateMetadataV2, OperatorStateMetadataWriter, 
StateSchemaBroadcast, StateSchemaMetadata}
 import org.apache.spark.sql.internal.SQLConf
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
index c0956a62e59f..4f3c9db4f7c3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, 
Table, TableCapabi
 import org.apache.spark.sql.connector.write.{DataWriter, LogicalWriteInfo, 
PhysicalWriteInfo, SupportsTruncate, Write, WriteBuilder, WriterCommitMessage}
 import 
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, 
StreamingWrite}
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.python.PythonForeachWriter
+import org.apache.spark.sql.execution.python.streaming.PythonForeachWriter
 import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend
 import org.apache.spark.sql.types.StructType
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BaseStreamingArrowWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
similarity index 98%
rename from 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/BaseStreamingArrowWriterSuite.scala
rename to 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
index 0417a839dc6b..2a7ff8c829f5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BaseStreamingArrowWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import org.apache.arrow.vector.VectorSchemaRoot
 import org.apache.arrow.vector.ipc.ArrowStreamWriter
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriterSuite.scala
similarity index 96%
rename from 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
rename to 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriterSuite.scala
index a2d331836183..c1ecb904b496 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriterSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -28,7 +28,7 @@ import org.apache.spark._
 import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
 import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
-import 
org.apache.spark.sql.execution.python.PythonForeachWriter.UnsafeRowBuffer
+import 
org.apache.spark.sql.execution.python.streaming.PythonForeachWriter.UnsafeRowBuffer
 import org.apache.spark.sql.types.{DataType, IntegerType}
 import org.apache.spark.util.Utils
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala
similarity index 99%
rename from 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala
rename to 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala
index 3d91a045907f..e6e5ee62efeb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import java.io.File
 import java.util.concurrent.CountDownLatch
@@ -25,6 +25,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import 
org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, 
shouldTestPandasUDFs}
 import 
org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, 
PythonMicroBatchStream, PythonStreamingSourceOffset}
+import org.apache.spark.sql.execution.python.PythonDataSourceSuiteBase
 import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, 
OffsetSeqLog, ProcessingTimeTrigger}
 import org.apache.spark.sql.streaming.StreamingQueryException
 import org.apache.spark.sql.types.StructType
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServerSuite.scala
similarity index 99%
rename from 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServerSuite.scala
rename to 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServerSuite.scala
index 402ed560edff..1f0aa72d2713 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServerSuite.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
 
 import java.io.DataOutputStream
 import java.net.ServerSocket
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateDistributionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateDistributionSuite.scala
index 433bc6b4380b..3320a682b124 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateDistributionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateDistributionSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming
 
 import org.apache.spark.sql.IntegratedUDFTestUtils.{shouldTestPandasUDFs, 
TestGroupedMapPandasUDFWithState}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update
-import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasWithStateExec
+import 
org.apache.spark.sql.execution.python.streaming.FlatMapGroupsInPandasWithStateExec
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import 
org.apache.spark.sql.streaming.util.{StatefulOpClusteredDistributionTestHelper, 
StreamManualClock}
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala
index 49825f7cde83..9c5714f68260 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.IntegratedUDFTestUtils._
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.plans.logical.{NoTimeout, 
ProcessingTimeTimeout}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Complete, 
Update}
-import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasWithStateExec
+import 
org.apache.spark.sql.execution.python.streaming.FlatMapGroupsInPandasWithStateExec
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions.{lit, timestamp_seconds}
 import org.apache.spark.sql.internal.SQLConf


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to