This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e359f210c45 [SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc improvement for StreamingForeachBatchHelper e359f210c45 is described below commit e359f210c45abc17f0bcd32c9a86faf678caff75 Author: Raghu Angadi <raghu.ang...@databricks.com> AuthorDate: Thu Oct 19 14:15:43 2023 +0900 [SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc improvement for StreamingForeachBatchHelper ### What changes were proposed in this pull request? Couple of minor improvements to `StreamingForeachBatchHelper`: * Make `RunnerCleaner` private and add ScalaDoc. * Update contract for `pythonForeachBatchWrapper()` to inform that call should eventually should `close()` the `AutoClosable` returned. In addition, it also fixes a flake in Protobuf unit test. ### Why are the changes needed? - Code readability improvement. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Existing tests. - For protobuf suite, verified with seed set to '399'. It fails before this PR and passes after. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43424 from rangadi/feb-scaladoc. Authored-by: Raghu Angadi <raghu.ang...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../apache/spark/sql/connect/planner/SparkConnectPlanner.scala | 2 +- .../spark/sql/connect/planner/StreamingForeachBatchHelper.scala | 9 ++++++--- .../sql/connect/service/SparkConnectSessionHodlerSuite.scala | 4 +++- .../spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala | 2 +- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index fa964c02a25..299f4f8830a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2927,7 +2927,7 @@ class SparkConnectPlanner( } // This is filled when a foreach batch runner started for Python. - var foreachBatchRunnerCleaner: Option[StreamingForeachBatchHelper.RunnerCleaner] = None + var foreachBatchRunnerCleaner: Option[AutoCloseable] = None if (writeOp.hasForeachBatch) { val foreachBatchFn = writeOp.getForeachBatch.getFunctionCase match { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala index b8097b23550..ce75ba3eb59 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala @@ -40,7 +40,9 @@ object StreamingForeachBatchHelper extends Logging { type ForeachBatchFnType = (DataFrame, Long) => Unit - case class RunnerCleaner(runner: StreamingPythonRunner) extends AutoCloseable { + // Visible for testing. + /** An AutoClosable to clean up resources on query termination. Stops Python worker. */ + private[connect] case class RunnerCleaner(runner: StreamingPythonRunner) extends AutoCloseable { override def close(): Unit = { try runner.stop() catch { @@ -98,11 +100,12 @@ object StreamingForeachBatchHelper extends Logging { /** * Starts up Python worker and initializes it with Python function. Returns a foreachBatch * function that sets up the session and Dataframe cache and and interacts with the Python - * worker to execute user's function. + * worker to execute user's function. In addition, it returns an AutoClosable. The caller must + * ensure it is closed so that worker process and related resources are released. */ def pythonForeachBatchWrapper( pythonFn: SimplePythonFunction, - sessionHolder: SessionHolder): (ForeachBatchFnType, RunnerCleaner) = { + sessionHolder: SessionHolder): (ForeachBatchFnType, AutoCloseable) = { val port = SparkConnectService.localPort val connectUrl = s"sc://localhost:$port/;user_id=${sessionHolder.userId}" diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala index a6451de8fc2..910c2a2650c 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.api.python.SimplePythonFunction import org.apache.spark.sql.IntegratedUDFTestUtils import org.apache.spark.sql.connect.common.InvalidPlanInput import org.apache.spark.sql.connect.planner.{PythonStreamingQueryListener, StreamingForeachBatchHelper} +import org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper.RunnerCleaner import org.apache.spark.sql.test.SharedSparkSession class SparkConnectSessionHolderSuite extends SharedSparkSession { @@ -206,7 +207,8 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession { sessionHolder.streamingForeachBatchRunnerCleanerCache .registerCleanerForQuery(query2, cleaner2) - val (runner1, runner2) = (cleaner1.runner, cleaner2.runner) + val (runner1, runner2) = + (cleaner1.asInstanceOf[RunnerCleaner].runner, cleaner2.asInstanceOf[RunnerCleaner].runner) // assert both python processes are running assert(!runner1.isWorkerStopped().get) diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala index d3e63a11a66..6135cb2d592 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala @@ -137,7 +137,7 @@ class ProtobufCatalystDataConversionSuite while ( data != null && (data.get(0) == defaultValue || - (dt == BinaryType && + (dt.fields(0).dataType == BinaryType && data.get(0).asInstanceOf[Array[Byte]].isEmpty))) data = generator().asInstanceOf[Row] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org