[spark] branch master updated: [SPARK-44441][BUILD] Upgrade `bcprov-jdk15on` and `bcpkix-jdk15on` to 1.70
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new aec34451297 [SPARK-1][BUILD] Upgrade `bcprov-jdk15on` and `bcpkix-jdk15on` to 1.70 aec34451297 is described below commit aec3445129789c5b1d768333bacf3f3e680d73a0 Author: yangjie01 AuthorDate: Sat Jul 15 12:17:07 2023 -0500 [SPARK-1][BUILD] Upgrade `bcprov-jdk15on` and `bcpkix-jdk15on` to 1.70 ### What changes were proposed in this pull request? This pr aims to upgrade `bcprov-jdk15on` and `bcpkix-jdk15on` from 1.60 to 1.70 ### Why are the changes needed? The new version fixed [CVE-2020-15522](https://github.com/bcgit/bc-java/wiki/CVE-2020-15522). The release notes as follows: - https://www.bouncycastle.org/releasenotes.html#r1rv70 ### Does this PR introduce _any_ user-facing change? No, just upgrade test dependency ### How was this patch tested? Pass Git Hub Actions Closes #42015 from LuciferYang/SPARK-1. Authored-by: yangjie01 Signed-off-by: Sean Owen --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index eac34643fc9..3c2107b1b00 100644 --- a/pom.xml +++ b/pom.xml @@ -214,7 +214,7 @@ 3.1.0 1.1.0 1.5.0 -1.60 +1.70 1.9.0
[spark] branch master updated: [SPARK-44438][SS] Shutdown scheduled executor used for maintenance task if an error is reported
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5f73b405b25 [SPARK-44438][SS] Shutdown scheduled executor used for maintenance task if an error is reported 5f73b405b25 is described below commit 5f73b405b251f8cb39be80377e8cb2d2f960d490 Author: Anish Shrigondekar AuthorDate: Sat Jul 15 22:33:28 2023 +0900 [SPARK-44438][SS] Shutdown scheduled executor used for maintenance task if an error is reported ### What changes were proposed in this pull request? Shutdown scheduled executor used for maintenance task if an error is reported ### Why are the changes needed? Without this change, we basically would never clean up the maintenance task threads and the pointer is lost too, so we have no way to shutdown this scheduled executor which could eventually lead to thread exhaustion. The change here fixes this issue. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests ``` [info] - SPARK-44438: maintenance task should be shutdown on error (759 milliseconds) 13:59:57.755 WARN org.apache.spark.sql.execution.streaming.state.StateStoreSuite: = POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) = [info] Run completed in 2 seconds, 255 milliseconds. [info] Total number of tests run: 1 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes #42010 from anishshri-db/task/SPARK-44438. Authored-by: Anish Shrigondekar Signed-off-by: Jungtaek Lim --- .../sql/execution/streaming/state/StateStore.scala | 21 ++-- .../streaming/state/StateStoreSuite.scala | 56 ++ 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 30e660eb2ff..96c7b61f205 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -579,19 +579,25 @@ object StateStore extends Logging { loadedProviders.contains(storeProviderId) } + /** Check if maintenance thread is running and scheduled future is not done */ def isMaintenanceRunning: Boolean = loadedProviders.synchronized { maintenanceTask != null && maintenanceTask.isRunning } + /** Stop maintenance thread and reset the maintenance task */ + def stopMaintenanceTask(): Unit = loadedProviders.synchronized { +if (maintenanceTask != null) { + maintenanceTask.stop() + maintenanceTask = null +} + } + /** Unload and stop all state store providers */ def stop(): Unit = loadedProviders.synchronized { loadedProviders.keySet.foreach { key => unload(key) } loadedProviders.clear() _coordRef = null -if (maintenanceTask != null) { - maintenanceTask.stop() - maintenanceTask = null -} +stopMaintenanceTask() logInfo("StateStore stopped") } @@ -602,7 +608,12 @@ object StateStore extends Logging { maintenanceTask = new MaintenanceTask( storeConf.maintenanceInterval, task = { doMaintenance() }, - onError = { loadedProviders.synchronized { loadedProviders.clear() } } + onError = { loadedProviders.synchronized { + logInfo("Stopping maintenance task since an error was encountered.") + stopMaintenanceTask() + loadedProviders.clear() +} + } ) logInfo("State Store maintenance task started") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 9f8a588cc32..02aa12b325f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -47,6 +47,30 @@ import org.apache.spark.tags.ExtendedSQLTest import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils +class FakeStateStoreProviderWithMaintenanceError extends StateStoreProvider { + private var id: StateStoreId = null + + override def init( + stateStoreId: StateStoreId, + keySchema: StructType, + valueSchema: StructType, + numColsPrefixKey: Int, + storeConfs:
[spark] branch master updated: [MINOR][SS][DOCS] Fix typos in the Scaladoc and make the semantic of getCurrentWatermarkMs explicit
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e5e2b914de6 [MINOR][SS][DOCS] Fix typos in the Scaladoc and make the semantic of getCurrentWatermarkMs explicit e5e2b914de6 is described below commit e5e2b914de6a498ae191bdb0d02308c5b6f13f15 Author: bartosz25 AuthorDate: Sat Jul 15 08:31:31 2023 -0500 [MINOR][SS][DOCS] Fix typos in the Scaladoc and make the semantic of getCurrentWatermarkMs explicit ### What changes were proposed in this pull request? Improve the code comments: 1. Rate micro-batch data source Scaladoc parameters aren't consistent with the options really supported by this data source. 2. The `getCurrentWatermarkMs` has a special semantic for the 1st micro-batch when the watermark is not set yet. IMO, it should return `Option[Long]`, hence `None` instead of `0` for the first micro-batch, but since it's a breaking change, I preferred to add a note on that instead. ### Why are the changes needed? 1. Avoid confusion while using the classes and methods. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The tests weren't added because the change is only at the Scaladoc level. I affirm that the contribution is my original work and that I license the work to the project under the project's open source license. Closes #41988 from bartosz25/comments_fixes. Authored-by: bartosz25 Signed-off-by: Sean Owen --- .../sql/execution/streaming/sources/RatePerMicroBatchProvider.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/streaming/GroupState.scala | 5 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala index ccf8b0a7b92..41878a6a549 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala @@ -34,11 +34,11 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * with 0L. * * This source supports the following options: - * - `rowsPerMicroBatch` (e.g. 100): How many rows should be generated per micro-batch. + * - `rowsPerBatch` (e.g. 100): How many rows should be generated per micro-batch. * - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the *generated rows. * - `startTimestamp` (e.g. 1000, default: 0): starting value of generated time - * - `advanceMillisPerMicroBatch` (e.g. 1000, default: 1000): the amount of time being advanced in + * - `advanceMillisPerBatch` (e.g. 1000, default: 1000): the amount of time being advanced in *generated time on each micro-batch. * * Unlike `rate` data source, this data source provides a consistent set of input rows per diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala index 2c8f1db74f8..f08a2fd3cc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala @@ -315,6 +315,11 @@ trait GroupState[S] extends LogicalGroupState[S] { * * @note In a streaming query, this can be called only when watermark is set before calling * `[map/flatMap]GroupsWithState`. In a batch query, this method always returns -1. + * @note The watermark gets propagated in the end of each query. As a result, this method will + * return 0 (1970-01-01T00:00:00) for the first micro-batch. If you use this value + * as a part of the timestamp set in the `setTimeoutTimestamp`, it may lead to the + * state expiring immediately in the next micro-batch, once the watermark gets the + * real value from your data. */ @throws[UnsupportedOperationException]( "if watermark has not been set before in [map|flatMap]GroupsWithState") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org