[spark] branch master updated: [SPARK-38723][SS][TESTS] Add test for streaming query resume race condition

2023-10-25 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7d7afb06f68 [SPARK-38723][SS][TESTS] Add test for streaming query 
resume race condition
7d7afb06f68 is described below

commit 7d7afb06f682c10f3900eb8adeab9fad6d49cb24
Author: Phil Dakin 
AuthorDate: Thu Oct 26 14:24:09 2023 +0900

[SPARK-38723][SS][TESTS] Add test for streaming query resume race condition

### What changes were proposed in this pull request?
Add a test for the CONCURRENT_QUERY error raised when multiple sessions try 
to simultaneously resume the same streaming query from checkpoint.

### Why are the changes needed?
Improve testing coverage per 
https://issues.apache.org/jira/browse/SPARK-38723.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Change is itself a test - ran locally and confirmed the suite passes.
```
[info] All tests passed.
[success] Total time: 129 s (02:09), completed Oct 17, 2023, 2:11:34 PM
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43405 from PhilDakin/pdakin.SPARK-38723.

Authored-by: Phil Dakin 
Signed-off-by: Jungtaek Lim 
---
 .../sql/errors/QueryExecutionErrorsSuite.scala | 48 ++
 1 file changed, 48 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 78bbabb1a3f..fb1d05f2a9a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -25,6 +25,7 @@ import java.util.{Locale, Properties, 
ServiceConfigurationError}
 import org.apache.hadoop.fs.{LocalFileSystem, Path}
 import org.apache.hadoop.fs.permission.FsPermission
 import org.mockito.Mockito.{mock, spy, when}
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, QueryTest, 
Row, SaveMode}
@@ -49,6 +50,7 @@ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
 import org.apache.spark.sql.streaming.StreamingQueryException
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{DataType, DecimalType, LongType, 
MetadataBuilder, StructType}
+import org.apache.spark.util.ThreadUtils
 import org.apache.spark.util.Utils
 
 class QueryExecutionErrorsSuite
@@ -876,6 +878,52 @@ class QueryExecutionErrorsSuite
 assert(e.getCause.isInstanceOf[NullPointerException])
   }
 
+  test("CONCURRENT_QUERY: streaming query is resumed from many sessions") {
+failAfter(90 seconds) {
+  withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") {
+withTempDir { dir =>
+  val ds = spark.readStream.format("rate").load()
+
+  // Queries have the same ID when they are resumed from the same 
checkpoint.
+  val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
+  val dataLocation = new File(dir, "data").getCanonicalPath
+
+  // Run an initial query to setup the checkpoint.
+  val initialQuery = ds.writeStream.format("parquet")
+.option("checkpointLocation", chkLocation).start(dataLocation)
+
+  // Error is thrown due to a race condition. Ensure it happens with 
high likelihood in the
+  // test by spawning many threads.
+  val exceptions = ThreadUtils.parmap(Seq.range(1, 50), 
"QueryExecutionErrorsSuite", 50)
+{ _ =>
+  var exception = None : 
Option[SparkConcurrentModificationException]
+  try {
+val restartedQuery = ds.writeStream.format("parquet")
+  .option("checkpointLocation", 
chkLocation).start(dataLocation)
+restartedQuery.stop()
+restartedQuery.awaitTermination()
+  } catch {
+case e: SparkConcurrentModificationException =>
+  exception = Some(e)
+  }
+  exception
+}
+  assert(exceptions.map(e => e.isDefined).reduceLeft(_ || _))
+  exceptions.map { e =>
+if (e.isDefined) {
+  checkError(
+e.get,
+errorClass = "CONCURRENT_QUERY",
+sqlState = Some("0A000")
+  )
+}
+  }
+  spark.streams.active.foreach(_.stop())
+}
+  }
+}
+  }
+
   test("UNSUPPORTED_EXPR_FOR_WINDOW: to_date is not supported with WINDOW") {
 withTable("t") {
   sql("CREATE TABLE t(c String) USING parquet")



[spark] branch master updated: [SPARK-45663][CORE][MLLIB] Replace `IterableOnceOps#aggregate` with `IterableOnceOps#foldLeft`

2023-10-25 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a3146c83d98 [SPARK-45663][CORE][MLLIB] Replace 
`IterableOnceOps#aggregate` with `IterableOnceOps#foldLeft`
a3146c83d98 is described below

commit a3146c83d98fe76aeb6880a40b61fcdd257685ce
Author: yangjie01 
AuthorDate: Thu Oct 26 13:20:56 2023 +0800

[SPARK-45663][CORE][MLLIB] Replace `IterableOnceOps#aggregate` with 
`IterableOnceOps#foldLeft`

### What changes were proposed in this pull request?
This pr replace `IterableOnceOps#aggregate` with `IterableOnceOps#foldLeft` 
due to `aggregate` has been marked as deprecated since Scala 2.13.0.

```scala
  deprecated("`aggregate` is not relevant for sequential collections. Use 
`foldLeft(z)(seqop)` instead.", "2.13.0")
  def aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B = 
foldLeft(z)(seqop)
```

### Why are the changes needed?
Clean up deprecated API usage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43527 from LuciferYang/SPARK-45663.

Authored-by: yangjie01 
Signed-off-by: yangjie01 
---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala   | 5 ++---
 .../scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala | 2 +-
 .../scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala | 5 ++---
 3 files changed, 5 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index c6770c77b92..5dc666c62d1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1219,8 +1219,7 @@ abstract class RDD[T: ClassTag](
 // Clone the zero value since we will also be serializing it as part of 
tasks
 var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
 val cleanSeqOp = sc.clean(seqOp)
-val cleanCombOp = sc.clean(combOp)
-val aggregatePartition = (it: Iterator[T]) => 
it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
+val aggregatePartition = (it: Iterator[T]) => 
it.foldLeft(zeroValue)(cleanSeqOp)
 val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, 
taskResult)
 sc.runJob(this, aggregatePartition, mergeResult)
 jobResult
@@ -1258,7 +1257,7 @@ abstract class RDD[T: ClassTag](
   val cleanSeqOp = context.clean(seqOp)
   val cleanCombOp = context.clean(combOp)
   val aggregatePartition =
-(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
+(it: Iterator[T]) => it.foldLeft(zeroValue)(cleanSeqOp)
   var partiallyAggregated: RDD[U] = mapPartitions(it => 
Iterator(aggregatePartition(it)))
   var numPartitions = partiallyAggregated.partitions.length
   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / 
depth)).toInt, 2)
diff --git 
a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
 
b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
index ce46fc8f201..f08cf44e4e1 100644
--- 
a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala
@@ -69,7 +69,7 @@ private[spark] object StratifiedSamplingUtils extends Logging 
{
   val rng = new RandomDataGenerator()
   rng.reSeed(seed + partition)
   val seqOp = getSeqOp(withReplacement, fractions, rng, counts)
-  Iterator(iter.aggregate(zeroU)(seqOp, combOp))
+  Iterator(iter.foldLeft(zeroU)(seqOp))
 }
 mappedPartitionRDD.reduce(combOp)
   }
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
index cbe2776f664..2b86c7cd344 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
@@ -78,9 +78,8 @@ private[evaluation] object AreaUnderCurve {
* @param curve an iterator over ordered 2D points stored in pairs 
representing a curve
*/
   def of(curve: Iterable[(Double, Double)]): Double = {
-curve.iterator.sliding(2).withPartial(false).aggregate(0.0)(
-  seqop = (auc: Double, points: Seq[(Double, Double)]) => auc + 
trapezoid(points),
-  combop = _ + _
+curve.iterator.sliding(2).withPartial(false).foldLeft(0.0)(
+  op = (auc: Double, points: Seq[(Double, Double)]) => auc + 
trapezoid(points)
 )
   }
 }



[spark] branch master updated: [SPARK-44407][BUILD][TESTS] Clean up the compilation warnings related to `it will become a keyword in Scala 3` and prohibit use these keywords as variable name

2023-10-25 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 35c628d9b27 [SPARK-44407][BUILD][TESTS] Clean up the compilation 
warnings related to `it will become a keyword in Scala 3` and prohibit use 
these keywords as variable name
35c628d9b27 is described below

commit 35c628d9b27aee9263bf43254cd839da69da9f28
Author: yangjie01 
AuthorDate: Thu Oct 26 13:18:23 2023 +0800

[SPARK-44407][BUILD][TESTS] Clean up the compilation warnings related to 
`it will become a keyword in Scala 3` and prohibit use these keywords as 
variable name

### What changes were proposed in this pull request?
This pr clean up the compilation warnings related to `it will become a 
keyword in Scala 3`,  additionally, to facilitate future Scala version 
migration, a new compiler option has been added to prohibit the use of these 
keywords as variable names.

### Why are the changes needed?
There are some literals, such as `enum`, `given`, `export`, etc., using 
them as variable names in Scala 2.13 will trigger compilation warnings, but 
this will become a compilation error in Scala 3.

**Scala 2.13**

```
Welcome to Scala 2.13.12 (OpenJDK 64-Bit Server VM, Java 17.0.8).
Type in expressions for evaluation. Or try :help.

scala> val enum: Int = 1
   ^
   warning: Wrap `enum` in backticks to use it as an identifier, it 
will become a keyword in Scala 3. [quickfixable]
val enum: Int = 1

scala> val export: Int = 1
   ^
   warning: Wrap `export` in backticks to use it as an identifier, it 
will become a keyword in Scala 3. [quickfixable]
val export: Int = 1

scala> val given: Int = 1
   ^
   warning: Wrap `given` in backticks to use it as an identifier, it 
will become a keyword in Scala 3. [quickfixable]
val given: Int = 1
```

**Scala 3**

```
Welcome to Scala 3.3.1 (17.0.8, Java OpenJDK 64-Bit Server VM).
Type in expressions for evaluation. Or try :help.

scala> val enum: Int = 1
-- [E032] Syntax Error: 

1 |val enum: Int = 1
  |
  |pattern expected
  |
  | longer explanation available when compiling with `-explain`

scala> val export: Int = 1
-- [E032] Syntax Error: 

1 |val export: Int = 1
  |^^
  |pattern expected
  |
  | longer explanation available when compiling with `-explain`

scala> val given: Int = 1
-- [E040] Syntax Error: 

1 |val given: Int = 1
  | ^
  | an identifier expected, but ':' found
  |
  | longer explanation available when compiling with `-explain`
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43529 from LuciferYang/SPARK-44407.

Authored-by: yangjie01 
Signed-off-by: yangjie01 
---
 pom.xml | 6 ++
 project/SparkBuild.scala| 6 +-
 .../org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala  | 4 ++--
 3 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index e3f3b2fe9a1..6488918326f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3008,6 +3008,12 @@
 SPARK-45627 Symbol literals are deprecated in Scala 2.13 and 
it's a compile error in Scala 3.
   -->
   -Wconf:cat=deprecationmsg=symbol literal is 
deprecated:e
+  
+  -Wconf:cat=deprecationmsg=it will become a keyword in 
Scala 3:e
 
 
   -Xss128m
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 6e87cab6df8..098a628ba1c 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -265,7 +265,11 @@ object SparkBuild extends PomBuild {
 // Or use `-Wconf:msg=legacy-binding:s` to silence this warning. 
[quickfixable]"
 "-Wconf:msg=legacy-binding:s",
 // SPARK-45627 Symbol literals are deprecated in Scala 2.13 and it's a 
compile error in Scala 3.
-"-Wconf:cat=deprecation=symbol literal is deprecated:e"
+"-Wconf:cat=deprecation=symbol literal is deprecated:e",
+// SPARK-45627 `enum`, `export` and `given` will become keywords in 
Scala 3,
+// so they are prohibited from being used as variable names in Scala 
2.13 to
+// reduce the cost of migration in subsequent 

[spark] branch master updated: [SPARK-45665][INFRA] Uses different `ORACLE_DOCKER_IMAGE_NAME` in the scheduled builds in other branches

2023-10-25 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8cdcfd262f9 [SPARK-45665][INFRA] Uses different 
`ORACLE_DOCKER_IMAGE_NAME` in the scheduled builds in other branches
8cdcfd262f9 is described below

commit 8cdcfd262f9fd46fb9a8e1ceb0bccefe452582bd
Author: yangjie01 
AuthorDate: Thu Oct 26 11:22:41 2023 +0800

[SPARK-45665][INFRA] Uses different `ORACLE_DOCKER_IMAGE_NAME` in the 
scheduled builds in other branches

### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/43123 upgraded the version of Oracle 
used for testing. As the daily test will reuse `build_and_test.yml`, the 
branch-3.x will also use the new version. However, due to the lack of 
synchronized code changes, the `OracleIntegrationSuite` in the `Docker 
integration tests` cannot pass the test during the daily test of branch-3.x:

- branch-3.3: 
https://github.com/apache/spark/actions/runs/6609791712/job/17950549755
- branch-3.4: 
https://github.com/apache/spark/actions/runs/6611049884/job/17954225189
- branch-3.5: 
https://github.com/apache/spark/actions/runs/6612344747/job/17958021656

So this PR adds the ORACLE_DOCKER_IMAGE_NAME environment variable to the 
daily test yml file for branch-3.x and uses the previous version of the Oracle 
Docker image that can pass the test successfully.

### Why are the changes needed?
Restore the daily test for branch-3.x.

### Does this PR introduce _any_ user-facing change?
No,dev-only

### How was this patch tested?
Monitor the daily test results.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43496 from LuciferYang/oracl-docker-image-name.

Lead-authored-by: yangjie01 
Co-authored-by: YangJie 
Signed-off-by: yangjie01 
---
 .github/workflows/build_and_test.yml | 1 +
 .github/workflows/build_branch33.yml | 3 ++-
 .github/workflows/build_branch34.yml | 3 ++-
 .github/workflows/build_branch35.yml | 3 ++-
 4 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 13e751bcaa7..5825185f344 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -982,6 +982,7 @@ jobs:
 distribution: zulu
 java-version: ${{ inputs.java }}
 - name: Run tests
+  env: ${{ fromJSON(inputs.envs) }}
   run: |
 ./dev/run-tests --parallelism 1 --modules docker-integration-tests 
--included-tags org.apache.spark.tags.DockerTest
 - name: Upload test results to report
diff --git a/.github/workflows/build_branch33.yml 
b/.github/workflows/build_branch33.yml
index 63a950447f5..fc6ce7028fc 100644
--- a/.github/workflows/build_branch33.yml
+++ b/.github/workflows/build_branch33.yml
@@ -37,7 +37,8 @@ jobs:
   envs: >-
 {
   "SCALA_PROFILE": "scala2.13",
-  "PYTHON_TO_TEST": ""
+  "PYTHON_TO_TEST": "",
+  "ORACLE_DOCKER_IMAGE_NAME": "gvenzl/oracle-xe:21.3.0"
 }
   jobs: >-
 {
diff --git a/.github/workflows/build_branch34.yml 
b/.github/workflows/build_branch34.yml
index 740b68b69e3..deb43d82c97 100644
--- a/.github/workflows/build_branch34.yml
+++ b/.github/workflows/build_branch34.yml
@@ -37,7 +37,8 @@ jobs:
   envs: >-
 {
   "SCALA_PROFILE": "scala2.13",
-  "PYTHON_TO_TEST": ""
+  "PYTHON_TO_TEST": "",
+  "ORACLE_DOCKER_IMAGE_NAME": "gvenzl/oracle-xe:21.3.0"
 }
   jobs: >-
 {
diff --git a/.github/workflows/build_branch35.yml 
b/.github/workflows/build_branch35.yml
index 15ee66a9ce4..9e6fe13c020 100644
--- a/.github/workflows/build_branch35.yml
+++ b/.github/workflows/build_branch35.yml
@@ -37,7 +37,8 @@ jobs:
   envs: >-
 {
   "SCALA_PROFILE": "scala2.13",
-  "PYTHON_TO_TEST": ""
+  "PYTHON_TO_TEST": "",
+  "ORACLE_DOCKER_IMAGE_NAME": "gvenzl/oracle-xe:21.3.0"
 }
   jobs: >-
 {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (dcbe275543e -> 25c74d0d4e2)

2023-10-25 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from dcbe275543e [SPARK-45634][PS] Remove `DataFrame.get_dtype_counts` from 
Pandas API on Spark
 add 25c74d0d4e2 [SPARK-45635][PYTHON][TESTS] Cleanup unused import for 
PySpark testing

No new revisions were added by this update.

Summary of changes:
 python/pyspark/pandas/tests/computation/test_corrwith.py | 1 -
 python/pyspark/pandas/tests/data_type_ops/test_num_arithmetic.py | 1 -
 python/pyspark/pandas/tests/data_type_ops/testing_utils.py   | 1 -
 python/pyspark/pandas/tests/frame/test_constructor.py| 1 -
 python/pyspark/pandas/tests/groupby/test_groupby.py  | 2 +-
 python/pyspark/pandas/tests/indexes/test_base_slow.py| 2 +-
 python/pyspark/pandas/tests/indexes/test_datetime_property.py| 1 -
 python/pyspark/pandas/tests/series/test_as_type.py   | 1 -
 python/pyspark/pandas/tests/series/test_cumulative.py| 1 -
 python/pyspark/pandas/tests/series/test_series.py| 9 +
 python/pyspark/pandas/tests/test_dataframe_spark_io.py   | 2 --
 python/pyspark/pandas/tests/test_frame_resample.py   | 7 ---
 python/pyspark/pandas/tests/test_ops_on_diff_frames.py   | 2 --
 python/pyspark/pandas/tests/test_resample.py | 2 +-
 python/pyspark/pandas/tests/test_series_resample.py  | 7 ---
 python/pyspark/pandas/tests/test_spark_functions.py  | 5 -
 python/pyspark/testing/connectutils.py   | 1 -
 python/pyspark/testing/utils.py  | 2 +-
 18 files changed, 5 insertions(+), 43 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45634][PS] Remove `DataFrame.get_dtype_counts` from Pandas API on Spark

2023-10-25 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new dcbe275543e [SPARK-45634][PS] Remove `DataFrame.get_dtype_counts` from 
Pandas API on Spark
dcbe275543e is described below

commit dcbe275543e05cb4529317ddb933d09253d65d6f
Author: Haejoon Lee 
AuthorDate: Thu Oct 26 11:16:36 2023 +0900

[SPARK-45634][PS] Remove `DataFrame.get_dtype_counts` from Pandas API on 
Spark

### What changes were proposed in this pull request?

This PR proposes to remove old API `get_dtype_counts` from Pandas API on 
Spark

### Why are the changes needed?

This API was deprecated a long time ago, but has not been removed since 
it's internally used in our code base. But it's no longer used in anywhere 
currently.

### Does this PR introduce _any_ user-facing change?

`DataFrame.get_dtype_counts` is removed.

### How was this patch tested?

No new test is required for API removal. The existing CI should pass.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43488 from itholic/SPARK-45634.

Authored-by: Haejoon Lee 
Signed-off-by: Hyukjin Kwon 
---
 .../source/migration_guide/pyspark_upgrade.rst |  1 +
 python/pyspark/pandas/generic.py   | 51 --
 2 files changed, 1 insertion(+), 51 deletions(-)

diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst 
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index 933fa936f70..20fab578504 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -53,6 +53,7 @@ Upgrading from PySpark 3.5 to 4.0
 * In Spark 4.0, ``col_space`` parameter from ``DataFrame.to_latex`` and 
``Series.to_latex`` has been removed from pandas API on Spark.
 * In Spark 4.0, ``DataFrame.to_spark_io`` has been removed from pandas API on 
Spark, use ``DataFrame.spark.to_spark_io`` instead.
 * In Spark 4.0, ``Series.is_monotonic`` and ``Index.is_monotonic`` have been 
removed from pandas API on Spark, use ``Series.is_monotonic_increasing`` or 
``Index.is_monotonic_increasing`` instead respectively.
+* In Spark 4.0, ``DataFrame.get_dtype_counts`` has been removed from pandas 
API on Spark, use ``DataFrame.dtypes.value_counts()`` instead.
 
 
 Upgrading from PySpark 3.3 to 3.4
diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index c6f1b9ccbb7..16eaeb6142e 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -19,13 +19,11 @@
 A base class of DataFrame/Column to behave like pandas DataFrame/Series.
 """
 from abc import ABCMeta, abstractmethod
-from collections import Counter
 from functools import reduce
 from typing import (
 Any,
 Callable,
 Dict,
-Iterable,
 IO,
 List,
 Optional,
@@ -400,55 +398,6 @@ class Frame(object, metaclass=ABCMeta):
 """
 return self._apply_series_op(lambda psser: psser._cumprod(skipna), 
should_resolve=True)
 
-# TODO: Although this has removed pandas >= 1.0.0, but we're keeping this 
as deprecated
-# since we're using this for `DataFrame.info` internally.
-# We can drop it once our minimal pandas version becomes 1.0.0.
-def get_dtype_counts(self) -> pd.Series:
-"""
-Return counts of unique dtypes in this object.
-
-.. deprecated:: 0.14.0
-
-Returns
----
-dtype: pd.Series
-Series with the count of columns with each dtype.
-
-See Also
-
-dtypes: Return the dtypes in this object.
-
-Examples
-
->>> a = [['a', 1, 1], ['b', 2, 2], ['c', 3, 3]]
->>> df = ps.DataFrame(a, columns=['str', 'int1', 'int2'])
->>> df
-  str  int1  int2
-0   a 1 1
-1   b 2 2
-2   c 3 3
-
->>> df.get_dtype_counts().sort_values()
-object1
-int64 2
-dtype: int64
-
->>> df.str.get_dtype_counts().sort_values()
-object1
-dtype: int64
-"""
-warnings.warn(
-"`get_dtype_counts` has been deprecated and will be "
-"removed in a future version. For DataFrames use "
-"`.dtypes.value_counts()",
-FutureWarning,
-)
-if not isinstance(self.dtypes, Iterable):
-dtypes = [self.dtypes]
-else:
-dtypes = list(self.dtypes)
-return pd.Series(dict(Counter([d.name for d in dtypes])))
-
 def pipe(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
 r"""
 Apply func(self, \*args, \*\*kwargs).



[spark] branch master updated: [SPARK-45651][BUILD] Log memory usage of publish snapshot workflow

2023-10-25 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e57a16d7af1 [SPARK-45651][BUILD] Log memory usage of publish snapshot 
workflow
e57a16d7af1 is described below

commit e57a16d7af14d1ce0c14d01dd220c63acb98517d
Author: Enrico Minack 
AuthorDate: Thu Oct 26 10:39:32 2023 +0900

[SPARK-45651][BUILD] Log memory usage of publish snapshot workflow

### What changes were proposed in this pull request?
This logs memory consumption while publishing snapshots. This is to 
investigate whether the suspected high memory usage is the root cause of 
`publish_snapshots` failures for master.

Merging this after #43512 allows to run this manually.

### Why are the changes needed?
The working assumption is that high memory usage is the root cause. This 
logging should provide proof or disproof for this assumption. This can be 
reverted once more is known or SPARK-45651 is fixed.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Locally

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43513 from EnricoMi/publish-snapshot-log-memory.

Authored-by: Enrico Minack 
Signed-off-by: Hyukjin Kwon 
---
 .github/workflows/publish_snapshot.yml | 10 +-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/publish_snapshot.yml 
b/.github/workflows/publish_snapshot.yml
index 6d55f1afed0..3354ab88a39 100644
--- a/.github/workflows/publish_snapshot.yml
+++ b/.github/workflows/publish_snapshot.yml
@@ -70,4 +70,12 @@ jobs:
 GPG_KEY: "not_used"
 GPG_PASSPHRASE: "not_used"
 GIT_REF: ${{ matrix.branch }}
-  run: ./dev/create-release/release-build.sh publish-snapshot
+  run: |
+while true
+do
+  date
+  top -b -n 1 -i
+  sleep 1
+  echo
+done | sed "s/^/mem: /" &
+./dev/create-release/release-build.sh publish-snapshot


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45136][CONNECT] Enhance ClosureCleaner with Ammonite support

2023-10-25 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3e2b146eb81 [SPARK-45136][CONNECT] Enhance ClosureCleaner with 
Ammonite support
3e2b146eb81 is described below

commit 3e2b146eb81d9a5727f07b58f7bb1760a71a8697
Author: Vsevolod Stepanov 
AuthorDate: Wed Oct 25 21:35:07 2023 -0400

[SPARK-45136][CONNECT] Enhance ClosureCleaner with Ammonite support

### What changes were proposed in this pull request?
This PR enhances existing ClosureCleaner implementation to support cleaning 
closures defined in Ammonite. Please refer to [this 
gist](https://gist.github.com/vsevolodstep-db/b8e4d676745d6e2d047ecac291e5254c) 
to get more context on how Ammonite code wrapping works and what problems I'm 
trying to solve here.

Overall, it contains these logical changes in `ClosureCleaner`:
1. Making it recognize and clean closures defined in Ammonite (previously 
it was checking if capturing class name starts with `$line` and ends with 
`$iw`, which is native Scala REPL specific thing
2. Making it clean closures if they are defined inside a user class in a 
REPL (see corner case 1 in the gist)
3. Making it clean nested closures properly for Ammonite REPL (see corner 
case 2 in the gist)
4. Making it transitively follow other Ammonite commands that are captured 
by the target closure.

Please note that `cleanTransitively` option of `ClosureCleaner.clean()` 
method refers to following references transitively within enclosing command 
object, but it doesn't follow other command objects.

As we need `ClosureCleaner` to be available in Spark Connect, I also moved 
the implementation to `common-utils` module. This brings a new 
`xbean-asm9-shaded` which is fairly small.

Also, this PR moves `checkSerializable` check from `ClosureCleaner` to 
`SparkClosureCleaner`, as it is specific to Spark core

The important changes affect `ClosureCleaner` only. They should not affect 
existing codepath for normal Scala closures / closures defined in a native 
Scala REPL and cover only closures defined in Ammonite.

Also,  this PR modifies SparkConnect's `UserDefinedFunction` to actually 
use `ClosureCleaner` and clean closures in SparkConnect

### Why are the changes needed?
To properly support closures defined in Ammonite, reduce UDF payload size 
and avoid possible `NonSerializable` exceptions. This includes:
- lambda capturing outer command object, leading in a circular dependency
- lambda capturing other command objects transitively, exploding payload 
size

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests.
New tests in `ReplE2ESuite` covering various scenarios using SparkConnect + 
Ammonite REPL to make sure closures are actually cleaned.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #42995 from vsevolodstep-db/SPARK-45136/closure-cleaner.

Authored-by: Vsevolod Stepanov 
Signed-off-by: Herman van Hovell 
---
 common/utils/pom.xml   |   4 +
 .../org/apache/spark/util/ClosureCleaner.scala | 636 ++---
 .../org/apache/spark/util/SparkStreamUtils.scala   | 109 
 .../sql/expressions/UserDefinedFunction.scala  |  10 +-
 .../spark/sql/application/ReplE2ESuite.scala   | 143 +
 .../CheckConnectJvmClientCompatibility.scala   |   8 +
 core/pom.xml   |   4 -
 .../main/scala/org/apache/spark/SparkContext.scala |   2 +-
 .../apache/spark/util/SparkClosureCleaner.scala|  49 ++
 .../main/scala/org/apache/spark/util/Utils.scala   |  85 +--
 .../apache/spark/util/ClosureCleanerSuite.scala|   2 +-
 .../apache/spark/util/ClosureCleanerSuite2.scala   |   4 +-
 project/MimaExcludes.scala |   4 +-
 .../catalyst/encoders/ExpressionEncoderSuite.scala |   4 +-
 .../org/apache/spark/streaming/StateSpec.scala |   6 +-
 15 files changed, 756 insertions(+), 314 deletions(-)

diff --git a/common/utils/pom.xml b/common/utils/pom.xml
index 37d1ea48d97..44cb30a19ff 100644
--- a/common/utils/pom.xml
+++ b/common/utils/pom.xml
@@ -39,6 +39,10 @@
   org.apache.spark
   spark-tags_${scala.binary.version}
 
+
+  org.apache.xbean
+  xbean-asm9-shaded
+
 
   com.fasterxml.jackson.core
   jackson-databind
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala 
b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
similarity index 61%
rename from core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
rename to common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 29fb0206f90..ffa2f0e60b2 

[spark] branch master updated: [SPARK-45661][SQL][PYTHON] Add toNullable in StructType, MapType and ArrayType

2023-10-25 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0219eb5984f [SPARK-45661][SQL][PYTHON] Add toNullable in StructType, 
MapType and ArrayType
0219eb5984f is described below

commit 0219eb5984f0f4a7209deb091b713ded10aebba3
Author: Hyukjin Kwon 
AuthorDate: Thu Oct 26 09:30:59 2023 +0900

[SPARK-45661][SQL][PYTHON] Add toNullable in StructType, MapType and 
ArrayType

### What changes were proposed in this pull request?

This PR proposes to add:

- `StructType.toNullable`
- `MapType.toNullable`
- `ArrayType.toNullable`

that returns a nullable schema.

### Why are the changes needed?

See 
https://stackoverflow.com/questions/33193958/change-nullable-property-of-column-in-spark-dataframe
 as an example.

### Does this PR introduce _any_ user-facing change?

Yes, it adds new API in both Scala and Python:
- `StructType.toNullable`
- `MapType.toNullable`
- `ArrayType.toNullable`

### How was this patch tested?

For Scala, it just adds an alias.
For Python side, doctests were added.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43523 from HyukjinKwon/SPARK-45661.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/types.py| 124 +
 .../org/apache/spark/sql/types/ArrayType.scala |   8 ++
 .../scala/org/apache/spark/sql/types/MapType.scala |   8 ++
 .../org/apache/spark/sql/types/StructType.scala|   8 ++
 4 files changed, 148 insertions(+)

diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 01db75b2500..d6862d7178a 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -139,6 +139,9 @@ class DataType:
 """
 return obj
 
+def _as_nullable(self) -> "DataType":
+return self
+
 @classmethod
 def fromDDL(cls, ddl: str) -> "DataType":
 """
@@ -593,6 +596,41 @@ class ArrayType(DataType):
 def simpleString(self) -> str:
 return "array<%s>" % self.elementType.simpleString()
 
+def _as_nullable(self) -> "ArrayType":
+return ArrayType(self.elementType._as_nullable(), containsNull=True)
+
+def toNullable(self) -> "ArrayType":
+"""
+Returns the same data type but set all nullability fields are true
+(`StructField.nullable`, `ArrayType.containsNull`, and 
`MapType.valueContainsNull`).
+
+.. versionadded:: 4.0.0
+
+Returns
+---
+:class:`ArrayType`
+
+Examples
+
+Example 1: Simple nullability conversion
+
+>>> ArrayType(IntegerType(), containsNull=False).toNullable()
+ArrayType(IntegerType(), True)
+
+Example 2: Nested nullability conversion
+
+>>> ArrayType(
+... StructType([
+... StructField("b", IntegerType(), nullable=False),
+... StructField("c", ArrayType(IntegerType(), 
containsNull=False))
+... ]),
+... containsNull=False
+... ).toNullable()
+ArrayType(StructType([StructField('b', IntegerType(), True),
+StructField('c', ArrayType(IntegerType(), True), True)]), True)
+"""
+return self._as_nullable()
+
 def __repr__(self) -> str:
 return "ArrayType(%s, %s)" % (self.elementType, str(self.containsNull))
 
@@ -671,6 +709,44 @@ class MapType(DataType):
 def simpleString(self) -> str:
 return "map<%s,%s>" % (self.keyType.simpleString(), 
self.valueType.simpleString())
 
+def _as_nullable(self) -> "MapType":
+return MapType(
+self.keyType._as_nullable(), self.valueType._as_nullable(), 
valueContainsNull=True
+)
+
+def toNullable(self) -> "MapType":
+"""
+Returns the same data type but set all nullability fields are true
+(`StructField.nullable`, `ArrayType.containsNull`, and 
`MapType.valueContainsNull`).
+
+.. versionadded:: 4.0.0
+
+Returns
+---
+:class:`MapType`
+
+Examples
+
+Example 1: Simple nullability conversion
+
+>>> MapType(IntegerType(), StringType(), 
valueContainsNull=False).toNullable()
+MapType(IntegerType(), StringType(), True)
+
+Example 2: Nested nullability conversion
+
+>>> MapType(
+... StringType(),
+... MapType(
+... IntegerType(),
+... ArrayType(IntegerType(), containsNull=False),
+... valueContainsNull=False
+... ),
+... valueContainsNull=False
+... ).toNullable()
+MapType(StringType(), MapType(IntegerType(), 

[spark-connect-go] branch dependabot/go_modules/google.golang.org/grpc-1.56.3 created (now bcf0c58)

2023-10-25 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/go_modules/google.golang.org/grpc-1.56.3
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git


  at bcf0c58  Bump google.golang.org/grpc from 1.54.0 to 1.56.3

No new revisions were added by this update.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45660] Re-use Literal objects in ComputeCurrentTime rule

2023-10-25 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 344453761cb [SPARK-45660] Re-use Literal objects in ComputeCurrentTime 
rule
344453761cb is described below

commit 344453761cbca154a04a53d4c5d6c2b1eef59652
Author: Ole Sasse 
AuthorDate: Wed Oct 25 19:56:15 2023 +0300

[SPARK-45660] Re-use Literal objects in ComputeCurrentTime rule

### What changes were proposed in this pull request?

The ComputeCurrentTime optimizer rule does produce unique timestamp 
Literals for current time expressions of a query. For CurrentDate and 
LocalTimestamp the Literal objects are not re-used though, but semantically 
equal objects are created for each instance. This can cost unnecessary much 
memory in case there are many such Literal objects.

This PR adds a map that caches timestamp literals in case they are used 
more than once.

### Why are the changes needed?

A query that has a lot of equal literals could use unnecessary high amounts 
of memory

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a new Unit Test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43524 from olaky/unique-timestamp-replacement-literals.

Authored-by: Ole Sasse 
Signed-off-by: Max Gekk 
---
 .../sql/catalyst/optimizer/finishAnalysis.scala| 15 ---
 .../optimizer/ComputeCurrentTimeSuite.scala| 30 +-
 2 files changed, 40 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index 4052ccd6496..18c85999312 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import java.time.{Instant, LocalDateTime}
+import java.time.{Instant, LocalDateTime, ZoneId}
 
 import org.apache.spark.sql.catalyst.CurrentUserContext
 import org.apache.spark.sql.catalyst.expressions._
@@ -79,6 +79,8 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
 val currentTimestampMicros = instantToMicros(instant)
 val currentTime = Literal.create(currentTimestampMicros, TimestampType)
 val timezone = Literal.create(conf.sessionLocalTimeZone, StringType)
+val currentDates = collection.mutable.HashMap.empty[ZoneId, Literal]
+val localTimestamps = collection.mutable.HashMap.empty[ZoneId, Literal]
 
 def transformCondition(treePatternbits: TreePatternBits): Boolean = {
   treePatternbits.containsPattern(CURRENT_LIKE)
@@ -88,12 +90,17 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
   case subQuery =>
 subQuery.transformAllExpressionsWithPruning(transformCondition) {
   case cd: CurrentDate =>
-Literal.create(DateTimeUtils.microsToDays(currentTimestampMicros, 
cd.zoneId), DateType)
+currentDates.getOrElseUpdate(cd.zoneId, {
+  Literal.create(
+DateTimeUtils.microsToDays(currentTimestampMicros, cd.zoneId), 
DateType)
+})
   case CurrentTimestamp() | Now() => currentTime
   case CurrentTimeZone() => timezone
   case localTimestamp: LocalTimestamp =>
-val asDateTime = LocalDateTime.ofInstant(instant, 
localTimestamp.zoneId)
-Literal.create(localDateTimeToMicros(asDateTime), TimestampNTZType)
+localTimestamps.getOrElseUpdate(localTimestamp.zoneId, {
+  val asDateTime = LocalDateTime.ofInstant(instant, 
localTimestamp.zoneId)
+  Literal.create(localDateTimeToMicros(asDateTime), 
TimestampNTZType)
+})
 }
 }
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
index 8b76cc383c5..447d77855fb 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
@@ -23,7 +23,7 @@ import scala.concurrent.duration._
 import scala.jdk.CollectionConverters.MapHasAsScala
 
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, 
CurrentTimestamp, CurrentTimeZone, InSubquery, ListQuery, Literal, 
LocalTimestamp, Now}
+import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, 

[spark] branch branch-3.4 updated: [SPARK-40154][PYTHON][DOCS] Correct storage level in Dataframe.cache docstring

2023-10-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new ecdb69f3db3 [SPARK-40154][PYTHON][DOCS] Correct storage level in 
Dataframe.cache docstring
ecdb69f3db3 is described below

commit ecdb69f3db3370aa7cf6ae8a52130379e465ca73
Author: Paul Staab 
AuthorDate: Wed Oct 25 07:36:15 2023 -0500

[SPARK-40154][PYTHON][DOCS] Correct storage level in Dataframe.cache 
docstring

### What changes were proposed in this pull request?
Corrects the docstring `DataFrame.cache` to give the correct storage level 
after it changed with Spark 3.0. It seems that the docstring of 
`DataFrame.persist` was updated, but `cache` was forgotten.

### Why are the changes needed?
The doctoring claims that `cache` uses serialised storage, but it actually 
uses deserialised storage. I confirmed that this is still the case with Spark 
3.5.0 using the example code from the Jira ticket.

### Does this PR introduce _any_ user-facing change?
Yes, the docstring changes.

### How was this patch tested?
The Github actions workflow succeeded.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43229 from paulstaab/SPARK-40154.

Authored-by: Paul Staab 
Signed-off-by: Sean Owen 
(cherry picked from commit 94607dd001b133a25dc9865f25b3f9e7f5a5daa3)
Signed-off-by: Sean Owen 
---
 python/pyspark/sql/dataframe.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 518bc9867d7..14426c51439 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1404,7 +1404,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 self.rdd.foreachPartition(f)  # type: ignore[arg-type]
 
 def cache(self) -> "DataFrame":
-"""Persists the :class:`DataFrame` with the default storage level 
(`MEMORY_AND_DISK`).
+"""Persists the :class:`DataFrame` with the default storage level 
(`MEMORY_AND_DISK_DESER`).
 
 .. versionadded:: 1.3.0
 
@@ -1413,7 +1413,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
 Notes
 -
-The default storage level has changed to `MEMORY_AND_DISK` to match 
Scala in 2.0.
+The default storage level has changed to `MEMORY_AND_DISK_DESER` to 
match Scala in 3.0.
 
 Returns
 ---


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-40154][PYTHON][DOCS] Correct storage level in Dataframe.cache docstring

2023-10-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 9e4411e2450 [SPARK-40154][PYTHON][DOCS] Correct storage level in 
Dataframe.cache docstring
9e4411e2450 is described below

commit 9e4411e2450d0503933626207b5e03308c30bc72
Author: Paul Staab 
AuthorDate: Wed Oct 25 07:36:15 2023 -0500

[SPARK-40154][PYTHON][DOCS] Correct storage level in Dataframe.cache 
docstring

### What changes were proposed in this pull request?
Corrects the docstring `DataFrame.cache` to give the correct storage level 
after it changed with Spark 3.0. It seems that the docstring of 
`DataFrame.persist` was updated, but `cache` was forgotten.

### Why are the changes needed?
The doctoring claims that `cache` uses serialised storage, but it actually 
uses deserialised storage. I confirmed that this is still the case with Spark 
3.5.0 using the example code from the Jira ticket.

### Does this PR introduce _any_ user-facing change?
Yes, the docstring changes.

### How was this patch tested?
The Github actions workflow succeeded.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43229 from paulstaab/SPARK-40154.

Authored-by: Paul Staab 
Signed-off-by: Sean Owen 
(cherry picked from commit 94607dd001b133a25dc9865f25b3f9e7f5a5daa3)
Signed-off-by: Sean Owen 
---
 python/pyspark/sql/dataframe.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 30ed73d3c47..5707ae2a31f 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1485,7 +1485,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 self.rdd.foreachPartition(f)  # type: ignore[arg-type]
 
 def cache(self) -> "DataFrame":
-"""Persists the :class:`DataFrame` with the default storage level 
(`MEMORY_AND_DISK`).
+"""Persists the :class:`DataFrame` with the default storage level 
(`MEMORY_AND_DISK_DESER`).
 
 .. versionadded:: 1.3.0
 
@@ -1494,7 +1494,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
 Notes
 -
-The default storage level has changed to `MEMORY_AND_DISK` to match 
Scala in 2.0.
+The default storage level has changed to `MEMORY_AND_DISK_DESER` to 
match Scala in 3.0.
 
 Returns
 ---


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (a073bf38c7d -> 94607dd001b)

2023-10-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from a073bf38c7d [SPARK-45209][CORE][UI] Flame Graph Support For Executor 
Thread Dump Page
 add 94607dd001b [SPARK-40154][PYTHON][DOCS] Correct storage level in 
Dataframe.cache docstring

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/dataframe.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45209][CORE][UI] Flame Graph Support For Executor Thread Dump Page

2023-10-25 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a073bf38c7d [SPARK-45209][CORE][UI] Flame Graph Support For Executor 
Thread Dump Page
a073bf38c7d is described below

commit a073bf38c7d8802e2ab12c54299e1541a48a394e
Author: Kent Yao 
AuthorDate: Wed Oct 25 18:29:43 2023 +0800

[SPARK-45209][CORE][UI] Flame Graph Support For Executor Thread Dump Page

### What changes were proposed in this pull request?

This PR draws a CPU Flame Graph by Java stack traces for executors and 
drivers. Currently, the Java stack traces is just a SNAPSHOT, not sampling at a 
certain frequency for a period. Sampling might be considered an upcoming 
feature out of the scope of this PR.

![fg 
git](https://github.com/apache/spark/assets/8326978/c3f99a1a-78ee-4adb-be1f-e4afd5f307b7)

If you are new to flame graphs, there are also some references you can 
refer to learn about the basic concepts and details.

[1] [Flame Graphs](https://www.brendangregg.com/flamegraphs.html)
[2] [FLIP-165: Operator's Flame 
Graphs](https://cwiki.apache.org/confluence/display/FLINK/FLIP-165%3A+Operator%27s+Flame+Graphs)
[3] [Java in Flames. mixed-mode flame graphs provide a… | by Netflix 
Technology Blog](https://netflixtechblog.com/java-in-flames-e763b3d32166)
[4] 
[HProf](https://docs.oracle.com/javase/7/docs/technotes/samples/hprof.html)

 Pending features

This PR mainly focuses on the UI, independent of the profiling steps. What 
we might have in the future are:

- Flame Graph Support For Task Thread Page which SPARK-45151 added
- Add `ProfilingExecutor(max, interval)` message to profile whole executor
- Add `ProfileTask(taskId, max, interval)` message to profile an certain 
task
- Different views for on/off/full CPUs
- Mixed mode profiling, which might rely upon some ext libs at runtime
- And so on.

### Why are the changes needed?

Performance is always an important design factor in Spark. It is desirable 
to provide better visibility into the distribution of CPU resources while 
executing user code alongside the Spark kernel. One of the most visually 
effective means to do that is [Flame 
Graphs](http://www.brendangregg.com/FlameGraphs/cpuflamegraphs.html), which 
visually presents the data gathered by performance profiling tools used by 
developers for performance tuning their applications.

### Does this PR introduce _any_ user-facing change?

yes

### How was this patch tested?

locally
### Was this patch authored or co-authored using generative AI tooling?

no

Closes #42988 from yaooqinn/SPARK-45209.

Authored-by: Kent Yao 
Signed-off-by: yangjie01 
---
 LICENSE|  3 +-
 LICENSE-binary |  3 +-
 .../org/apache/spark/ui/static/d3-flamegraph.css   | 47 
 .../apache/spark/ui/static/d3-flamegraph.min.js|  2 +
 .../org/apache/spark/ui/static/flamegraph.js   | 36 
 .../spark/ui/exec/ExecutorThreadDumpPage.scala | 20 +
 .../spark/ui/flamegraph/FlamegraphNode.scala   | 50 ++
 dev/.rat-excludes  |  2 +
 8 files changed, 161 insertions(+), 2 deletions(-)

diff --git a/LICENSE b/LICENSE
index 44983fd1259..3216134fd4b 100644
--- a/LICENSE
+++ b/LICENSE
@@ -216,7 +216,8 @@ 
core/src/main/resources/org/apache/spark/ui/static/bootstrap*
 core/src/main/resources/org/apache/spark/ui/static/vis*
 docs/js/vendor/bootstrap.js
 
connector/spark-ganglia-lgpl/src/main/java/com/codahale/metrics/ganglia/GangliaReporter.java
-
+core/src/main/resources/org/apache/spark/ui/static/d3-flamegraph.min.js
+core/src/main/resources/org/apache/spark/ui/static/d3-flamegraph.css
 
 Python Software Foundation License
 --
diff --git a/LICENSE-binary b/LICENSE-binary
index 30fca96a883..c6f291f1108 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -413,7 +413,8 @@ 
core/src/main/java/org/apache/spark/util/collection/TimSort.java
 core/src/main/resources/org/apache/spark/ui/static/bootstrap*
 core/src/main/resources/org/apache/spark/ui/static/vis*
 docs/js/vendor/bootstrap.js
-
+core/src/main/resources/org/apache/spark/ui/static/d3-flamegraph.min.js
+core/src/main/resources/org/apache/spark/ui/static/d3-flamegraph.css
 
 

 This product bundles various third-party components under other open source 
licenses.
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/d3-flamegraph.css 
b/core/src/main/resources/org/apache/spark/ui/static/d3-flamegraph.css
new file mode 100644
index 

[spark] branch master updated: [SPARK-45656][SQL] Fix observation when named observations with the same name on different datasets

2023-10-25 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7db9b2293fa [SPARK-45656][SQL] Fix observation when named observations 
with the same name on different datasets
7db9b2293fa is described below

commit 7db9b2293fa778073274d235dd72212b75d94073
Author: Takuya UESHIN 
AuthorDate: Wed Oct 25 16:59:26 2023 +0900

[SPARK-45656][SQL] Fix observation when named observations with the same 
name on different datasets

### What changes were proposed in this pull request?

Fixes observation when named observations with the same name on different 
datasets.

### Why are the changes needed?

Currently if there are observations with the same name on different 
dataset, one of them will be overwritten by the other execution.

For example,

```py
>>> observation1 = Observation("named")
>>> df1 = spark.range(50)
>>> observed_df1 = df1.observe(observation1, count(lit(1)).alias("cnt"))
>>>
>>> observation2 = Observation("named")
>>> df2 = spark.range(100)
>>> observed_df2 = df2.observe(observation2, count(lit(1)).alias("cnt"))
>>>
>>> observed_df1.collect()
...
>>> observed_df2.collect()
...
>>> observation1.get
{'cnt': 50}
>>> observation2.get
{'cnt': 50}
```

`observation2` should return `{'cnt': 100}`.

### Does this PR introduce _any_ user-facing change?

Yes, the observations with the same name will be available if they observe 
different datasets.

### How was this patch tested?

Added the related tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43519 from ueshin/issues/SPARK-45656/observation.

Authored-by: Takuya UESHIN 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/test_dataframe.py  | 18 ++
 .../main/scala/org/apache/spark/sql/Dataset.scala   |  2 +-
 .../scala/org/apache/spark/sql/Observation.scala| 21 +
 .../scala/org/apache/spark/sql/DatasetSuite.scala   | 21 +
 4 files changed, 53 insertions(+), 9 deletions(-)

diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index 3c493a8ae3a..0a2e3a53946 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -1023,6 +1023,24 @@ class DataFrameTestsMixin:
 self.assertGreaterEqual(row.cnt, 0)
 self.assertGreaterEqual(row.sum, 0)
 
+def test_observe_with_same_name_on_different_dataframe(self):
+# SPARK-45656: named observations with the same name on different 
datasets
+from pyspark.sql import Observation
+
+observation1 = Observation("named")
+df1 = self.spark.range(50)
+observed_df1 = df1.observe(observation1, count(lit(1)).alias("cnt"))
+
+observation2 = Observation("named")
+df2 = self.spark.range(100)
+observed_df2 = df2.observe(observation2, count(lit(1)).alias("cnt"))
+
+observed_df1.collect()
+observed_df2.collect()
+
+self.assertEqual(observation1.get, dict(cnt=50))
+self.assertEqual(observation2.get, dict(cnt=100))
+
 def test_sample(self):
 with self.assertRaises(PySparkTypeError) as pe:
 self.spark.range(1).sample()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5079cfcca9d..4f07133bb76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -201,7 +201,7 @@ class Dataset[T] private[sql](
   }
 
   // A globally unique id of this Dataset.
-  private val id = Dataset.curId.getAndIncrement()
+  private[sql] val id = Dataset.curId.getAndIncrement()
 
   queryExecution.assertAnalyzed()
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
index ba40336fc14..14c4983794b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
@@ -21,6 +21,7 @@ import java.util.UUID
 
 import scala.jdk.CollectionConverters.MapHasAsJava
 
+import org.apache.spark.sql.catalyst.plans.logical.CollectMetrics
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.util.QueryExecutionListener
 
@@ -56,7 +57,7 @@ class Observation(val name: String) {
 
   private val listener: ObservationListener = ObservationListener(this)
 
-  @volatile private var sparkSession: Option[SparkSession] = None
+  @volatile private var ds: Option[Dataset[_]] = None
 
   @volatile 

[spark] branch branch-3.5 updated: [SPARK-45588][SPARK-45640][SQL][TESTS][3.5] Fix flaky ProtobufCatalystDataConversionSuite

2023-10-25 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 26f6663d768 [SPARK-45588][SPARK-45640][SQL][TESTS][3.5] Fix flaky 
ProtobufCatalystDataConversionSuite
26f6663d768 is described below

commit 26f6663d76849e1aa16833398082ba9b4a2e73af
Author: panbingkun 
AuthorDate: Wed Oct 25 15:59:00 2023 +0900

[SPARK-45588][SPARK-45640][SQL][TESTS][3.5] Fix flaky 
ProtobufCatalystDataConversionSuite

### What changes were proposed in this pull request?
The pr aims to fix flaky ProtobufCatalystDataConversionSuite, include:
- Fix the type check (when the random value was empty array, we didn't skip 
it. Original intention is to skip default values for types.) [SPARK-45588]
- When data.get(0) is null, data.get(0).asInstanceOf[Array[Byte]].isEmpty 
will be thrown java.lang.NullPointerException. [SPARK-45640]

Backport above to branch 3.5.
Master branch pr: https://github.com/apache/spark/pull/43424 & 
https://github.com/apache/spark/pull/43493

### Why are the changes needed?
Fix flaky ProtobufCatalystDataConversionSuite.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA
- Manually test

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43521 from panbingkun/branch-3.5_SPARK-45640.

Authored-by: panbingkun 
Signed-off-by: Hyukjin Kwon 
---
 .../spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
index d3e63a11a66..b7f17fece5f 100644
--- 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
+++ 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
@@ -137,7 +137,8 @@ class ProtobufCatalystDataConversionSuite
   while (
 data != null &&
 (data.get(0) == defaultValue ||
-  (dt == BinaryType &&
+  (dt.fields(0).dataType == BinaryType &&
+data.get(0) != null &&
 data.get(0).asInstanceOf[Array[Byte]].isEmpty)))
 data = generator().asInstanceOf[Row]
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45545][CORE] Pass SSLOptions wherever we create a SparkTransportConf

2023-10-25 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 08c7bacb4f9 [SPARK-45545][CORE] Pass SSLOptions wherever we create a 
SparkTransportConf
08c7bacb4f9 is described below

commit 08c7bacb4f9dc0343ba9730b00d792cec7a1cf1e
Author: Hasnain Lakhani 
AuthorDate: Wed Oct 25 01:35:56 2023 -0500

[SPARK-45545][CORE] Pass SSLOptions wherever we create a SparkTransportConf

### What changes were proposed in this pull request?

This change ensures that RPC SSL options settings inheritance works 
properly after https://github.com/apache/spark/pull/43238 - we pass 
`sslOptions` wherever we call `fromSparkConf`.

In addition to that minor mechanical change, duplicate/add tests for every 
place that calls this method, to add a test case that runs with SSL support in 
the config.

### Why are the changes needed?

These changes are needed to ensure that the RPC SSL functionality can work 
properly with settings inheritance. In addition, through these tests we can 
ensure that any changes to these modules are also tested with SSL support and 
avoid regressions in the future.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Full integration testing also done as part of 
https://github.com/apache/spark/pull/42685

Added some tests and ran them:

```
build/sbt
> project core
> testOnly org.apache.spark.*Ssl*
> testOnly org.apache.spark.network.netty.NettyBlockTransferSecuritySuite
```

and

```
build/sbt -Pyarn
> project yarn
> testOnly 
org.apache.spark.network.yarn.SslYarnShuffleServiceWithRocksDBBackendSuite
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43387 from hasnain-db/spark-tls-integrate-everywhere.

Authored-by: Hasnain Lakhani 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../scala/org/apache/spark/SecurityManager.scala   |  6 +++
 .../src/main/scala/org/apache/spark/SparkEnv.scala |  7 ++-
 .../spark/deploy/ExternalShuffleService.scala  |  6 ++-
 .../executor/CoarseGrainedExecutorBackend.scala|  4 +-
 .../network/netty/NettyBlockTransferService.scala  |  6 ++-
 .../org/apache/spark/rpc/netty/NettyRpcEnv.scala   | 10 -
 .../spark/shuffle/IndexShuffleBlockResolver.scala  |  8 +++-
 .../apache/spark/shuffle/ShuffleBlockPusher.scala  |  6 ++-
 .../org/apache/spark/storage/BlockManager.scala|  3 +-
 .../apache/spark/ExternalShuffleServiceSuite.scala |  8 +++-
 .../spark/SslExternalShuffleServiceSuite.scala | 52 ++
 .../org/apache/spark/SslShuffleNettySuite.scala| 26 +++
 .../CoarseGrainedExecutorBackendSuite.scala| 40 +++--
 .../netty/NettyBlockTransferSecuritySuite.scala| 45 +--
 .../scala/org/apache/spark/rpc/RpcEnvSuite.scala   | 52 --
 .../apache/spark/rpc/netty/NettyRpcEnvSuite.scala  | 12 +++--
 .../spark/shuffle/ShuffleBlockPusherSuite.scala| 12 -
 .../sort/IndexShuffleBlockResolverSuite.scala  | 14 +-
 .../storage/BlockManagerReplicationSuite.scala | 40 ++---
 .../storage/SslBlockManagerReplicationSuite.scala  | 39 
 .../scala/org/apache/spark/util/SslTestUtils.scala | 35 +++
 .../network/yarn/SslYarnShuffleServiceSuite.scala  | 34 ++
 22 files changed, 379 insertions(+), 86 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index f8961fff8e1..ee9051d024c 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -292,6 +292,12 @@ private[spark] class SecurityManager(
*/
   def isSslRpcEnabled(): Boolean = sslRpcEnabled
 
+  /**
+   * Returns the SSLOptions object for the RPC namespace
+   * @return the SSLOptions object for the RPC namespace
+   */
+  def getRpcSSLOptions(): SSLOptions = rpcSSLOptions
+
   /**
* Gets the user used for authenticating SASL connections.
* For now use a single hardcoded user.
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 310dc828440..c2bae41d34e 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -374,7 +374,12 @@ object SparkEnv extends Logging {
 }
 
 val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
-  val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 
numUsableCores)
+  val transConf = SparkTransportConf.fromSparkConf(
+conf,
+