[spark] branch master updated: [SPARK-44441][BUILD] Upgrade `bcprov-jdk15on` and `bcpkix-jdk15on` to 1.70

2023-07-15 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new aec34451297 [SPARK-1][BUILD] Upgrade `bcprov-jdk15on` and 
`bcpkix-jdk15on` to 1.70
aec34451297 is described below

commit aec3445129789c5b1d768333bacf3f3e680d73a0
Author: yangjie01 
AuthorDate: Sat Jul 15 12:17:07 2023 -0500

[SPARK-1][BUILD] Upgrade `bcprov-jdk15on` and `bcpkix-jdk15on` to 1.70

### What changes were proposed in this pull request?
This pr aims to upgrade `bcprov-jdk15on` and `bcpkix-jdk15on`  from 1.60 to 
1.70

### Why are the changes needed?
The new version fixed 
[CVE-2020-15522](https://github.com/bcgit/bc-java/wiki/CVE-2020-15522).

The release notes as follows:
- https://www.bouncycastle.org/releasenotes.html#r1rv70

### Does this PR introduce _any_ user-facing change?
No, just upgrade test dependency

### How was this patch tested?
Pass Git Hub Actions

Closes #42015 from LuciferYang/SPARK-1.

Authored-by: yangjie01 
Signed-off-by: Sean Owen 
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index eac34643fc9..3c2107b1b00 100644
--- a/pom.xml
+++ b/pom.xml
@@ -214,7 +214,7 @@
 3.1.0
 1.1.0
 1.5.0
-1.60
+1.70
 1.9.0
 

[spark] branch master updated: [SPARK-44438][SS] Shutdown scheduled executor used for maintenance task if an error is reported

2023-07-15 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5f73b405b25 [SPARK-44438][SS] Shutdown scheduled executor used for 
maintenance task if an error is reported
5f73b405b25 is described below

commit 5f73b405b251f8cb39be80377e8cb2d2f960d490
Author: Anish Shrigondekar 
AuthorDate: Sat Jul 15 22:33:28 2023 +0900

[SPARK-44438][SS] Shutdown scheduled executor used for maintenance task if 
an error is reported

### What changes were proposed in this pull request?
Shutdown scheduled executor used for maintenance task if an error is 
reported

### Why are the changes needed?
Without this change, we basically would never clean up the maintenance task 
threads and the pointer is lost too, so we have no way to shutdown this 
scheduled executor which could eventually lead to thread exhaustion. The change 
here fixes this issue.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests

```
[info] - SPARK-44438: maintenance task should be shutdown on error (759 
milliseconds)
13:59:57.755 WARN 
org.apache.spark.sql.execution.streaming.state.StateStoreSuite:

= POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: rpc-boss-3-1 
(daemon=true), shuffle-boss-6-1 (daemon=true) =
[info] Run completed in 2 seconds, 255 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

Closes #42010 from anishshri-db/task/SPARK-44438.

Authored-by: Anish Shrigondekar 
Signed-off-by: Jungtaek Lim 
---
 .../sql/execution/streaming/state/StateStore.scala | 21 ++--
 .../streaming/state/StateStoreSuite.scala  | 56 ++
 2 files changed, 72 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 30e660eb2ff..96c7b61f205 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -579,19 +579,25 @@ object StateStore extends Logging {
 loadedProviders.contains(storeProviderId)
   }
 
+  /** Check if maintenance thread is running and scheduled future is not done 
*/
   def isMaintenanceRunning: Boolean = loadedProviders.synchronized {
 maintenanceTask != null && maintenanceTask.isRunning
   }
 
+  /** Stop maintenance thread and reset the maintenance task */
+  def stopMaintenanceTask(): Unit = loadedProviders.synchronized {
+if (maintenanceTask != null) {
+  maintenanceTask.stop()
+  maintenanceTask = null
+}
+  }
+
   /** Unload and stop all state store providers */
   def stop(): Unit = loadedProviders.synchronized {
 loadedProviders.keySet.foreach { key => unload(key) }
 loadedProviders.clear()
 _coordRef = null
-if (maintenanceTask != null) {
-  maintenanceTask.stop()
-  maintenanceTask = null
-}
+stopMaintenanceTask()
 logInfo("StateStore stopped")
   }
 
@@ -602,7 +608,12 @@ object StateStore extends Logging {
 maintenanceTask = new MaintenanceTask(
   storeConf.maintenanceInterval,
   task = { doMaintenance() },
-  onError = { loadedProviders.synchronized { loadedProviders.clear() } 
}
+  onError = { loadedProviders.synchronized {
+  logInfo("Stopping maintenance task since an error was 
encountered.")
+  stopMaintenanceTask()
+  loadedProviders.clear()
+}
+  }
 )
 logInfo("State Store maintenance task started")
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 9f8a588cc32..02aa12b325f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -47,6 +47,30 @@ import org.apache.spark.tags.ExtendedSQLTest
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
+class FakeStateStoreProviderWithMaintenanceError extends StateStoreProvider {
+  private var id: StateStoreId = null
+
+  override def init(
+  stateStoreId: StateStoreId,
+  keySchema: StructType,
+  valueSchema: StructType,
+  numColsPrefixKey: Int,
+  storeConfs: 

[spark] branch master updated: [MINOR][SS][DOCS] Fix typos in the Scaladoc and make the semantic of getCurrentWatermarkMs explicit

2023-07-15 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e5e2b914de6 [MINOR][SS][DOCS] Fix typos in the Scaladoc and make the 
semantic of getCurrentWatermarkMs explicit
e5e2b914de6 is described below

commit e5e2b914de6a498ae191bdb0d02308c5b6f13f15
Author: bartosz25 
AuthorDate: Sat Jul 15 08:31:31 2023 -0500

[MINOR][SS][DOCS] Fix typos in the Scaladoc and make the semantic of 
getCurrentWatermarkMs explicit

### What changes were proposed in this pull request?
Improve the code comments:
1. Rate micro-batch data source Scaladoc parameters aren't consistent with 
the options really supported by this data source.
2. The `getCurrentWatermarkMs` has a special semantic for the 1st 
micro-batch when the watermark is not set yet. IMO, it should return 
`Option[Long]`, hence `None` instead of `0` for the first micro-batch, but 
since it's a breaking change, I preferred to add a note on that instead.

### Why are the changes needed?
1. Avoid confusion while using the classes and methods.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
The tests weren't added because the change is only at the Scaladoc level. I 
affirm that the contribution is my original work and that I license the work to 
the project under the project's open source license.

Closes #41988 from bartosz25/comments_fixes.

Authored-by: bartosz25 
Signed-off-by: Sean Owen 
---
 .../sql/execution/streaming/sources/RatePerMicroBatchProvider.scala  | 4 ++--
 .../src/main/scala/org/apache/spark/sql/streaming/GroupState.scala   | 5 +
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
index ccf8b0a7b92..41878a6a549 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
@@ -34,11 +34,11 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
  *  with 0L.
  *
  *  This source supports the following options:
- *  - `rowsPerMicroBatch` (e.g. 100): How many rows should be generated per 
micro-batch.
+ *  - `rowsPerBatch` (e.g. 100): How many rows should be generated per 
micro-batch.
  *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
  *generated rows.
  *  - `startTimestamp` (e.g. 1000, default: 0): starting value of generated 
time
- *  - `advanceMillisPerMicroBatch` (e.g. 1000, default: 1000): the amount of 
time being advanced in
+ *  - `advanceMillisPerBatch` (e.g. 1000, default: 1000): the amount of time 
being advanced in
  *generated time on each micro-batch.
  *
  *  Unlike `rate` data source, this data source provides a consistent set of 
input rows per
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
index 2c8f1db74f8..f08a2fd3cc5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
@@ -315,6 +315,11 @@ trait GroupState[S] extends LogicalGroupState[S] {
*
* @note In a streaming query, this can be called only when watermark is set 
before calling
*   `[map/flatMap]GroupsWithState`. In a batch query, this method 
always returns -1.
+   * @note The watermark gets propagated in the end of each query. As a 
result, this method will
+   *   return 0 (1970-01-01T00:00:00) for the first micro-batch. If you 
use this value
+   *   as a part of the timestamp set in the `setTimeoutTimestamp`, it may 
lead to the
+   *   state expiring immediately in the next micro-batch, once the 
watermark gets the
+   *   real value from your data.
*/
   @throws[UnsupportedOperationException](
 "if watermark has not been set before in [map|flatMap]GroupsWithState")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org