This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 166d41a8f444 [SPARK-51147][SS] Refactor streaming related classes to a
dedicated streaming directory
166d41a8f444 is described below
commit 166d41a8f4441d55c75e747107b3d6517dad1e8c
Author: bogao007 <[email protected]>
AuthorDate: Tue Feb 11 15:16:21 2025 +0900
[SPARK-51147][SS] Refactor streaming related classes to a dedicated
streaming directory
### What changes were proposed in this pull request?
Refactor streaming classes under
`sql/core/src/main/scala/org/apache/spark/sql/execution/python` to
`sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming`.
### Why are the changes needed?
Better organization for streaming related code base.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49867 from bogao007/streaming-python-dir.
Authored-by: bogao007 <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 394458fc1fa8d837fe65b6a6eef99191e08353fd)
Signed-off-by: Jungtaek Lim <[email protected]>
---
python/pyspark/sql/streaming/readwriter.py | 3 ++-
.../org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala | 3 ++-
.../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 3 ++-
.../execution/datasources/v2/python/PythonMicroBatchStream.scala | 3 +--
.../{ => streaming}/ApplyInPandasWithStatePythonRunner.scala | 7 ++++---
.../python/{ => streaming}/ApplyInPandasWithStateWriter.scala | 2 +-
.../python/{ => streaming}/BaseStreamingArrowWriter.scala | 2 +-
.../{ => streaming}/FlatMapGroupsInPandasWithStateExec.scala | 3 ++-
.../sql/execution/python/{ => streaming}/PythonForeachWriter.scala | 3 ++-
.../python/{ => streaming}/PythonStreamingSourceRunner.scala | 2 +-
.../{ => streaming}/TransformWithStateInPandasDeserializer.scala | 2 +-
.../python/{ => streaming}/TransformWithStateInPandasExec.scala | 3 ++-
.../{ => streaming}/TransformWithStateInPandasPythonRunner.scala | 5 +++--
.../{ => streaming}/TransformWithStateInPandasStateServer.scala | 2 +-
.../spark/sql/execution/streaming/IncrementalExecution.scala | 2 +-
.../spark/sql/execution/streaming/sources/ForeachWriterTable.scala | 2 +-
.../python/{ => streaming}/BaseStreamingArrowWriterSuite.scala | 2 +-
.../python/{ => streaming}/PythonForeachWriterSuite.scala | 4 ++--
.../python/{ => streaming}/PythonStreamingDataSourceSuite.scala | 3 ++-
.../TransformWithStateInPandasStateServerSuite.scala | 2 +-
.../FlatMapGroupsInPandasWithStateDistributionSuite.scala | 2 +-
.../spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala | 2 +-
22 files changed, 35 insertions(+), 27 deletions(-)
diff --git a/python/pyspark/sql/streaming/readwriter.py
b/python/pyspark/sql/streaming/readwriter.py
index 69282dce37af..34af8cd9b070 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -1562,7 +1562,8 @@ class DataStreamWriter:
wrapped_func = _wrap_function(self._spark._sc, func, serializer,
serializer)
assert self._spark._sc._jvm is not None
jForeachWriter = getattr(
- self._spark._sc._jvm,
"org.apache.spark.sql.execution.python.PythonForeachWriter"
+ self._spark._sc._jvm,
+
"org.apache.spark.sql.execution.python.streaming.PythonForeachWriter",
)(wrapped_func, self._df._jdf.schema())
self._jwrite.foreach(jForeachWriter)
return self
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 6e5298537ab4..f8fd56f38f9b 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -75,7 +75,8 @@ import
org.apache.spark.sql.execution.command.{CreateViewCommand, ExternalComman
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions,
JDBCPartition, JDBCRelation}
import
org.apache.spark.sql.execution.datasources.v2.python.UserDefinedPythonDataSource
-import org.apache.spark.sql.execution.python.{PythonForeachWriter,
UserDefinedPythonFunction, UserDefinedPythonTableFunction}
+import org.apache.spark.sql.execution.python.{UserDefinedPythonFunction,
UserDefinedPythonTableFunction}
+import org.apache.spark.sql.execution.python.streaming.PythonForeachWriter
import org.apache.spark.sql.execution.stat.StatFunctions
import
org.apache.spark.sql.execution.streaming.GroupStateImpl.groupStateTimeoutFromString
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 7d48d39109b9..89f86c347568 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{WriteFiles, WriteFilesExec}
import org.apache.spark.sql.execution.exchange.{REBALANCE_PARTITIONS_BY_COL,
REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, REPARTITION_BY_NUM,
ShuffleExchangeExec}
import org.apache.spark.sql.execution.python._
+import
org.apache.spark.sql.execution.python.streaming.{FlatMapGroupsInPandasWithStateExec,
TransformWithStateInPandasExec}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.MemoryPlan
import org.apache.spark.sql.internal.SQLConf
@@ -828,7 +829,7 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
case FlatMapGroupsInPandasWithState(
func, groupAttr, outputAttr, stateType, outputMode, timeout, child) =>
val stateVersion =
conf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)
- val execPlan = python.FlatMapGroupsInPandasWithStateExec(
+ val execPlan = FlatMapGroupsInPandasWithStateExec(
func, groupAttr, outputAttr, stateType, None, stateVersion,
outputMode, timeout,
batchTimestampMs = None, eventTimeWatermarkForLateEvents = None,
eventTimeWatermarkForEviction = None,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala
index 0fc1df4cd1e9..b3ecfc8bb7f7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala
@@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory}
import org.apache.spark.sql.connector.read.streaming.{AcceptsLatestSeenOffset,
MicroBatchStream, Offset}
import
org.apache.spark.sql.execution.datasources.v2.python.PythonMicroBatchStream.nextStreamId
-import org.apache.spark.sql.execution.python.PythonStreamingSourceRunner
+import
org.apache.spark.sql.execution.python.streaming.PythonStreamingSourceRunner
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{PythonStreamBlockId, StorageLevel}
@@ -120,4 +120,3 @@ object PythonMicroBatchStream {
currentId
}
}
-
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
similarity index 95%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
index 8832ac1ea5cb..193ec03841bd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import java.io._
@@ -36,8 +36,9 @@ import
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.metric.SQLMetric
-import
org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{COUNT_COLUMN_SCHEMA_FROM_PYTHON_WORKER,
InType, OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
-import
org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
+import org.apache.spark.sql.execution.python.{PythonArrowInput,
PythonArrowOutput, PythonUDFRunner}
+import
org.apache.spark.sql.execution.python.streaming.ApplyInPandasWithStatePythonRunner.{COUNT_COLUMN_SCHEMA_FROM_PYTHON_WORKER,
InType, OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
+import
org.apache.spark.sql.execution.python.streaming.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
import org.apache.spark.sql.execution.streaming.GroupStateImpl
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStateWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStateWriter.scala
similarity index 99%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStateWriter.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStateWriter.scala
index db49be7fd99f..6bfa5440db37 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStateWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStateWriter.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import scala.jdk.CollectionConverters._
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BaseStreamingArrowWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriter.scala
similarity index 98%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BaseStreamingArrowWriter.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriter.scala
index 303389cee096..73c70a618866 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BaseStreamingArrowWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriter.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.ArrowStreamWriter
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/FlatMapGroupsInPandasWithStateExec.scala
similarity index 98%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/FlatMapGroupsInPandasWithStateExec.scala
index 76bb16443662..e53bdae813d2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/FlatMapGroupsInPandasWithStateExec.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import org.apache.spark.{JobArtifactSet, SparkException,
SparkUnsupportedOperationException, TaskContext}
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
@@ -26,6 +26,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, Processing
import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan,
UnaryExecNode}
+import org.apache.spark.sql.execution.python.ArrowPythonRunner
import org.apache.spark.sql.execution.python.PandasGroupUtils.resolveArgOffsets
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
similarity index 98%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
index 076bc72c431f..04c51c859bac 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import java.io.File
import java.util.concurrent.TimeUnit
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.python.{EvaluatePython, HybridRowQueue}
import
org.apache.spark.sql.execution.streaming.sources.ForeachUserFuncException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
similarity index 99%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
index 11ab706e8abb..89273b7bc80f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
@@ -16,7 +16,7 @@
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream,
DataOutputStream}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasDeserializer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasDeserializer.scala
similarity index 97%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasDeserializer.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasDeserializer.scala
index 82d4978853cb..1a8ffb35c053 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasDeserializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasDeserializer.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import java.io.DataInputStream
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasExec.scala
similarity index 99%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasExec.scala
index c5a2a54c3f6a..e77035e31ccb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasExec.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import java.util.UUID
@@ -34,6 +34,7 @@ import
org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.{BinaryExecNode, CoGroupedIterator,
SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.execution.python.ArrowPythonRunner
import org.apache.spark.sql.execution.python.PandasGroupUtils.{executePython,
groupAndProject, resolveArgOffsets}
import
org.apache.spark.sql.execution.streaming.{DriverStatefulProcessorHandleImpl,
StatefulOperatorCustomMetric, StatefulOperatorCustomSumMetric,
StatefulOperatorPartitioning, StatefulOperatorStateInfo,
StatefulProcessorHandleImpl, StateStoreWriter, TransformWithStateMetadataUtils,
TransformWithStateVariableInfo, WatermarkSupport}
import
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.StateStoreAwareZipPartitionsHelper
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasPythonRunner.scala
similarity index 97%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasPythonRunner.scala
index f415ae2543d3..c343f0b9f1e9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasPythonRunner.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import java.io.{DataInputStream, DataOutputStream}
import java.net.ServerSocket
@@ -30,7 +30,8 @@ import org.apache.spark.api.python.{BasePythonRunner,
ChainedPythonFunctions, Py
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.metric.SQLMetric
-import
org.apache.spark.sql.execution.python.TransformWithStateInPandasPythonRunner.{GroupedInType,
InType}
+import org.apache.spark.sql.execution.python.{BasicPythonArrowOutput,
PythonArrowInput, PythonUDFRunner}
+import
org.apache.spark.sql.execution.python.streaming.TransformWithStateInPandasPythonRunner.{GroupedInType,
InType}
import
org.apache.spark.sql.execution.streaming.{DriverStatefulProcessorHandleImpl,
StatefulProcessorHandleImpl}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServer.scala
similarity index 99%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServer.scala
index 1264616433cc..f665db8b5b12 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServer.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream,
DataOutputStream, EOFException}
import java.net.ServerSocket
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 8fe6178744ac..f08268ec577c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.{LocalLimitExec,
QueryExecution, Serialize
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
MergingSessionsExec, ObjectHashAggregateExec, SortAggregateExec,
UpdatingSessionsExec}
import
org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataPartitionReader
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
-import
org.apache.spark.sql.execution.python.{FlatMapGroupsInPandasWithStateExec,
TransformWithStateInPandasExec}
+import
org.apache.spark.sql.execution.python.streaming.{FlatMapGroupsInPandasWithStateExec,
TransformWithStateInPandasExec}
import
org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1
import
org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataReader,
OperatorStateMetadataV1, OperatorStateMetadataV2, OperatorStateMetadataWriter,
StateSchemaBroadcast, StateSchemaMetadata}
import org.apache.spark.sql.internal.SQLConf
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
index c0956a62e59f..4f3c9db4f7c3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite,
Table, TableCapabi
import org.apache.spark.sql.connector.write.{DataWriter, LogicalWriteInfo,
PhysicalWriteInfo, SupportsTruncate, Write, WriteBuilder, WriterCommitMessage}
import
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory,
StreamingWrite}
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.python.PythonForeachWriter
+import org.apache.spark.sql.execution.python.streaming.PythonForeachWriter
import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend
import org.apache.spark.sql.types.StructType
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BaseStreamingArrowWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
similarity index 98%
rename from
sql/core/src/test/scala/org/apache/spark/sql/execution/python/BaseStreamingArrowWriterSuite.scala
rename to
sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
index 0417a839dc6b..2a7ff8c829f5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BaseStreamingArrowWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/BaseStreamingArrowWriterSuite.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.ArrowStreamWriter
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriterSuite.scala
similarity index 96%
rename from
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
rename to
sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriterSuite.scala
index a2d331836183..c1ecb904b496 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriterSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import scala.collection.mutable.ArrayBuffer
@@ -28,7 +28,7 @@ import org.apache.spark._
import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeProjection}
-import
org.apache.spark.sql.execution.python.PythonForeachWriter.UnsafeRowBuffer
+import
org.apache.spark.sql.execution.python.streaming.PythonForeachWriter.UnsafeRowBuffer
import org.apache.spark.sql.types.{DataType, IntegerType}
import org.apache.spark.util.Utils
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala
similarity index 99%
rename from
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala
rename to
sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala
index 3d91a045907f..e6e5ee62efeb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import java.io.File
import java.util.concurrent.CountDownLatch
@@ -25,6 +25,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import
org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource,
shouldTestPandasUDFs}
import
org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2,
PythonMicroBatchStream, PythonStreamingSourceOffset}
+import org.apache.spark.sql.execution.python.PythonDataSourceSuiteBase
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream,
OffsetSeqLog, ProcessingTimeTrigger}
import org.apache.spark.sql.streaming.StreamingQueryException
import org.apache.spark.sql.types.StructType
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServerSuite.scala
similarity index 99%
rename from
sql/core/src/test/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServerSuite.scala
rename to
sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServerSuite.scala
index 402ed560edff..1f0aa72d2713 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServerSuite.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.execution.python.streaming
import java.io.DataOutputStream
import java.net.ServerSocket
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateDistributionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateDistributionSuite.scala
index 433bc6b4380b..3320a682b124 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateDistributionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateDistributionSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming
import org.apache.spark.sql.IntegratedUDFTestUtils.{shouldTestPandasUDFs,
TestGroupedMapPandasUDFWithState}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update
-import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasWithStateExec
+import
org.apache.spark.sql.execution.python.streaming.FlatMapGroupsInPandasWithStateExec
import org.apache.spark.sql.execution.streaming.MemoryStream
import
org.apache.spark.sql.streaming.util.{StatefulOpClusteredDistributionTestHelper,
StreamManualClock}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType,
StructField, StructType}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala
index 49825f7cde83..9c5714f68260 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.IntegratedUDFTestUtils._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical.{NoTimeout,
ProcessingTimeTimeout}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Complete,
Update}
-import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasWithStateExec
+import
org.apache.spark.sql.execution.python.streaming.FlatMapGroupsInPandasWithStateExec
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{lit, timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]