[spark] branch master updated: [SPARK-45250][CORE] Support stage level task resource profile for yarn cluster when dynamic allocation disabled

2023-10-02 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5b80639e643 [SPARK-45250][CORE] Support stage level task resource 
profile for yarn cluster when dynamic allocation disabled
5b80639e643 is described below

commit 5b80639e643b6dd09dd64c3f43ec039b2ef2f9fd
Author: Bobby Wang 
AuthorDate: Mon Oct 2 23:00:56 2023 -0500

[SPARK-45250][CORE] Support stage level task resource profile for yarn 
cluster when dynamic allocation disabled

### What changes were proposed in this pull request?
This PR is a follow-up of https://github.com/apache/spark/pull/37268 which 
supports stage level task resource profile for standalone cluster when dynamic 
allocation disabled. This PR enables stage-level task resource profile for yarn 
cluster.

### Why are the changes needed?

Users who work on spark ML/DL cases running on Yarn would expect 
stage-level task resource profile feature.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

The current tests of https://github.com/apache/spark/pull/37268 can also 
cover this PR since both yarn and standalone cluster share the same 
TaskSchedulerImpl class which implements this feature. Apart from that, 
modifying the existing test to cover yarn cluster. Apart from that, I also 
performed some manual tests which have been updated in the comments.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43030 from wbo4958/yarn-task-resoure-profile.

Authored-by: Bobby Wang 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../apache/spark/resource/ResourceProfileManager.scala|  6 +++---
 .../spark/resource/ResourceProfileManagerSuite.scala  | 15 +--
 docs/configuration.md |  2 +-
 docs/running-on-yarn.md   |  6 +-
 4 files changed, 22 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index 9f98d4d9c9c..cd7124a5724 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -67,9 +67,9 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
*/
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
 if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
-  if ((notRunningUnitTests || testExceptionThrown) && 
!isStandaloneOrLocalCluster) {
-throw new SparkException("TaskResourceProfiles are only supported for 
Standalone " +
-  "cluster for now when dynamic allocation is disabled.")
+  if ((notRunningUnitTests || testExceptionThrown) && 
!(isStandaloneOrLocalCluster || isYarn)) {
+throw new SparkException("TaskResourceProfiles are only supported for 
Standalone and " +
+  "Yarn cluster for now when dynamic allocation is disabled.")
   }
 } else {
   val isNotDefaultProfile = rp.id != 
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
index e97d5c7883a..77dc7bcb4c5 100644
--- 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
@@ -126,18 +126,29 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
 val defaultProf = rpmanager.defaultResourceProfile
 assert(rpmanager.isSupported(defaultProf))
 
-// task resource profile.
+// Standalone: supports task resource profile.
 val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1)
 val taskProf = new TaskResourceProfile(gpuTaskReq.requests)
 assert(rpmanager.isSupported(taskProf))
 
+// Local: doesn't support task resource profile.
 conf.setMaster("local")
 rpmanager = new ResourceProfileManager(conf, listenerBus)
 val error = intercept[SparkException] {
   rpmanager.isSupported(taskProf)
 }.getMessage
 assert(error === "TaskResourceProfiles are only supported for Standalone " 
+
-  "cluster for now when dynamic allocation is disabled.")
+  "and Yarn cluster for now when dynamic allocation is disabled.")
+
+// Local cluster: supports task resource profile.
+conf.setMaster("local-cluster[1, 1, 1024]")
+rpmanager = new ResourceProfileManager(conf, listenerBus)
+assert(rpmanager.isSupported(taskProf))
+
+// Yarn: supports task resource profile.
+conf.setMaster("yarn")
+   

[spark] branch master updated: [SPARK-45378][CORE] Add convertToNettyForSsl to ManagedBuffer

2023-10-02 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b01dce2b2a5 [SPARK-45378][CORE] Add convertToNettyForSsl to 
ManagedBuffer
b01dce2b2a5 is described below

commit b01dce2b2a57b933283d6fd350aa917d3cd76d83
Author: Hasnain Lakhani 
AuthorDate: Mon Oct 2 22:56:03 2023 -0500

[SPARK-45378][CORE] Add convertToNettyForSsl to ManagedBuffer

### What changes were proposed in this pull request?

As the title suggests. In addition to that API, add a config to the 
`TransportConf` to configure the default block size if desired.

### Why are the changes needed?

Netty's SSL support does not support zero-copy transfers. In order to 
support SSL using Netty we need to add another API to the `ManagedBuffer` which 
lets buffers return a different data type.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

CI. This will have tests added later - it's tested as part of 
https://github.com/apache/spark/pull/42685 from which this is split out.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43166 from hasnain-db/spark-tls-buffers.

Authored-by: Hasnain Lakhani 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../network/buffer/FileSegmentManagedBuffer.java |  7 +++
 .../apache/spark/network/buffer/ManagedBuffer.java   | 14 ++
 .../spark/network/buffer/NettyManagedBuffer.java |  5 +
 .../spark/network/buffer/NioManagedBuffer.java   |  5 +
 .../org/apache/spark/network/util/TransportConf.java |  8 
 .../org/apache/spark/network/TestManagedBuffer.java  |  5 +
 .../org/apache/spark/storage/BlockManager.scala  |  9 +
 .../spark/storage/BlockManagerManagedBuffer.scala|  2 ++
 .../scala/org/apache/spark/storage/DiskStore.scala   | 13 +
 .../org/apache/spark/util/io/ChunkedByteBuffer.scala | 20 
 .../spark/network/BlockTransferServiceSuite.scala|  2 ++
 .../spark/shuffle/BlockStoreShuffleReaderSuite.scala |  1 +
 12 files changed, 91 insertions(+)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
index 66566b67870..dd7c2061ec9 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -28,6 +28,7 @@ import java.nio.file.StandardOpenOption;
 
 import com.google.common.io.ByteStreams;
 import io.netty.channel.DefaultFileRegion;
+import io.netty.handler.stream.ChunkedStream;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
@@ -137,6 +138,12 @@ public final class FileSegmentManagedBuffer extends 
ManagedBuffer {
 }
   }
 
+  @Override
+  public Object convertToNettyForSsl() throws IOException {
+// Cannot use zero-copy with HTTPS
+return new ChunkedStream(createInputStream(), conf.sslShuffleChunkSize());
+  }
+
   public File getFile() { return file; }
 
   public long getOffset() { return offset; }
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
index 4dd8cec2900..893aa106a3f 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
@@ -75,4 +75,18 @@ public abstract class ManagedBuffer {
* the caller will be responsible for releasing this new reference.
*/
   public abstract Object convertToNetty() throws IOException;
+
+  /**
+   * Convert the buffer into a Netty object, used to write the data out with 
SSL encryption,
+   * which cannot use {@link io.netty.channel.FileRegion}.
+   * The return value is either a {@link io.netty.buffer.ByteBuf},
+   * a {@link io.netty.handler.stream.ChunkedStream}, or a {@link 
java.io.InputStream}.
+   *
+   * If this method returns a ByteBuf, then that buffer's reference count will 
be incremented and
+   * the caller will be responsible for releasing this new reference.
+   *
+   * Once `kernel.ssl.sendfile` and OpenSSL's `ssl_sendfile` are more widely 
adopted (and supported
+   * in Netty), we can potentially deprecate these APIs and just use 
`convertToNetty`.
+   */
+  public abstract Object convertToNettyForSsl() throws IOException;
 }
diff --git 

[spark] branch branch-3.5 updated: [SPARK-45371][CONNECT] Fix shading issues in the Spark Connect Scala Client

2023-10-02 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new c5203abcbd1 [SPARK-45371][CONNECT] Fix shading issues in the Spark 
Connect Scala Client
c5203abcbd1 is described below

commit c5203abcbd191423071ef3603e95a7941bb1eec2
Author: Herman van Hovell 
AuthorDate: Mon Oct 2 13:03:06 2023 -0400

[SPARK-45371][CONNECT] Fix shading issues in the Spark Connect Scala Client

### What changes were proposed in this pull request?
This PR fixes shading for the Spark Connect Scala Client maven build. The 
following things are addressed:
- Guava and protobuf are included in the shaded jars. These were missing, 
and were causing users to see `ClassNotFoundException`s.
- Fixed duplicate shading of guava. We use the parent pom's location now.
- Fixed duplicate Netty dependency (shaded and transitive). One was used 
for GRPC and the other was needed by Arrow. This was fixed by pulling arrow 
into the shaded jar.
- Use the same package as the shading defined in the parent package.

### Why are the changes needed?
The maven artifacts for the Spark Connect Scala Client are currently broken.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manual tests.
 Step 1:  Build new shaded library and install it in local maven 
repository
`build/mvn clean install -pl connector/connect/client/jvm -am -DskipTests`
 Step 2: Start Connect Server
`connector/connect/bin/spark-connect`
 Step 3: Launch REPL using the newly created library
This step requires [coursier](https://get-coursier.io/) to be installed.
`cs launch --jvm zulu:17.0.8 --scala 2.13.9 -r m2Local 
com.lihaoyi:::ammonite:2.5.11 
org.apache.spark::spark-connect-client-jvm:4.0.0-SNAPSHOT --java-opt 
--add-opens=java.base/java.nio=ALL-UNNAMED -M 
org.apache.spark.sql.application.ConnectRepl`
 Step 4: Run a bunch of commands:
```scala
// Check version
spark.version

// Run a simple query
{
  spark.range(1, 1, 1)
.select($"id", $"id" % 5 as "group", rand(1).as("v1"), rand(2).as("v2"))
.groupBy($"group")
.agg(
  avg($"v1").as("v1_avg"),
  avg($"v2").as("v2_avg"))
.show()
}

// Run a streaming query
{
  import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
  val query_name = "simple_streaming"
  val stream = spark.readStream
.format("rate")
.option("numPartitions", "1")
.option("rowsPerSecond", "10")
.load()
.withWatermark("timestamp", "10 milliseconds")
.groupBy(window(col("timestamp"), "10 milliseconds"))
.count()
.selectExpr("window.start as timestamp", "count as num_events")
.writeStream
.format("memory")
.queryName(query_name)
.trigger(ProcessingTimeTrigger.create("10 milliseconds"))
  // run for 20 seconds
  val query = stream.start()
  val start = System.currentTimeMillis()
  val end = System.currentTimeMillis() + 20 * 1000
  while (System.currentTimeMillis() < end) {
println(s"time: ${System.currentTimeMillis() - start} ms")
println(query.status)
spark.sql(s"select * from ${query_name}").show()
Thread.sleep(500)
  }
  query.stop()
}
```

Closes #43195 from hvanhovell/SPARK-45371.

Authored-by: Herman van Hovell 
Signed-off-by: Herman van Hovell 
(cherry picked from commit e53abbbceaa2c41babaa23fe4c2f282f559b4c03)
Signed-off-by: Herman van Hovell 
---
 connector/connect/client/jvm/pom.xml | 39 +++-
 1 file changed, 30 insertions(+), 9 deletions(-)

diff --git a/connector/connect/client/jvm/pom.xml 
b/connector/connect/client/jvm/pom.xml
index 67227ef38eb..236e5850b76 100644
--- a/connector/connect/client/jvm/pom.xml
+++ b/connector/connect/client/jvm/pom.xml
@@ -50,10 +50,20 @@
   spark-sketch_${scala.binary.version}
   ${project.version}
 
+
 
   com.google.guava
   guava
   ${connect.guava.version}
+  compile
+
+
+  com.google.protobuf
+  protobuf-java
+  compile
 
 
   com.lihaoyi
@@ -85,6 +95,7 @@
 maven-shade-plugin
 
   false
+  true
   
 
   com.google.android:*
@@ -92,52 +103,62 @@
   com.google.code.findbugs:*
   com.google.code.gson:*
   com.google.errorprone:*
-  com.google.guava:*
   com.google.j2objc:*
   com.google.protobuf:*
+  com.google.flatbuffers:*
   io.grpc:*
   io.netty:*
   io.perfmark:*
+  

[spark] branch master updated: [SPARK-45371][CONNECT] Fix shading issues in the Spark Connect Scala Client

2023-10-02 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e53abbbceaa [SPARK-45371][CONNECT] Fix shading issues in the Spark 
Connect Scala Client
e53abbbceaa is described below

commit e53abbbceaa2c41babaa23fe4c2f282f559b4c03
Author: Herman van Hovell 
AuthorDate: Mon Oct 2 13:03:06 2023 -0400

[SPARK-45371][CONNECT] Fix shading issues in the Spark Connect Scala Client

### What changes were proposed in this pull request?
This PR fixes shading for the Spark Connect Scala Client maven build. The 
following things are addressed:
- Guava and protobuf are included in the shaded jars. These were missing, 
and were causing users to see `ClassNotFoundException`s.
- Fixed duplicate shading of guava. We use the parent pom's location now.
- Fixed duplicate Netty dependency (shaded and transitive). One was used 
for GRPC and the other was needed by Arrow. This was fixed by pulling arrow 
into the shaded jar.
- Use the same package as the shading defined in the parent package.

### Why are the changes needed?
The maven artifacts for the Spark Connect Scala Client are currently broken.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manual tests.
 Step 1:  Build new shaded library and install it in local maven 
repository
`build/mvn clean install -pl connector/connect/client/jvm -am -DskipTests`
 Step 2: Start Connect Server
`connector/connect/bin/spark-connect`
 Step 3: Launch REPL using the newly created library
This step requires [coursier](https://get-coursier.io/) to be installed.
`cs launch --jvm zulu:17.0.8 --scala 2.13.9 -r m2Local 
com.lihaoyi:::ammonite:2.5.11 
org.apache.spark::spark-connect-client-jvm:4.0.0-SNAPSHOT --java-opt 
--add-opens=java.base/java.nio=ALL-UNNAMED -M 
org.apache.spark.sql.application.ConnectRepl`
 Step 4: Run a bunch of commands:
```scala
// Check version
spark.version

// Run a simple query
{
  spark.range(1, 1, 1)
.select($"id", $"id" % 5 as "group", rand(1).as("v1"), rand(2).as("v2"))
.groupBy($"group")
.agg(
  avg($"v1").as("v1_avg"),
  avg($"v2").as("v2_avg"))
.show()
}

// Run a streaming query
{
  import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
  val query_name = "simple_streaming"
  val stream = spark.readStream
.format("rate")
.option("numPartitions", "1")
.option("rowsPerSecond", "10")
.load()
.withWatermark("timestamp", "10 milliseconds")
.groupBy(window(col("timestamp"), "10 milliseconds"))
.count()
.selectExpr("window.start as timestamp", "count as num_events")
.writeStream
.format("memory")
.queryName(query_name)
.trigger(ProcessingTimeTrigger.create("10 milliseconds"))
  // run for 20 seconds
  val query = stream.start()
  val start = System.currentTimeMillis()
  val end = System.currentTimeMillis() + 20 * 1000
  while (System.currentTimeMillis() < end) {
println(s"time: ${System.currentTimeMillis() - start} ms")
println(query.status)
spark.sql(s"select * from ${query_name}").show()
Thread.sleep(500)
  }
  query.stop()
}
```

Closes #43195 from hvanhovell/SPARK-45371.

Authored-by: Herman van Hovell 
Signed-off-by: Herman van Hovell 
---
 connector/connect/client/jvm/pom.xml | 39 +++-
 1 file changed, 30 insertions(+), 9 deletions(-)

diff --git a/connector/connect/client/jvm/pom.xml 
b/connector/connect/client/jvm/pom.xml
index 9ca66b5c29c..a9040107f38 100644
--- a/connector/connect/client/jvm/pom.xml
+++ b/connector/connect/client/jvm/pom.xml
@@ -50,10 +50,20 @@
   spark-sketch_${scala.binary.version}
   ${project.version}
 
+
 
   com.google.guava
   guava
   ${connect.guava.version}
+  compile
+
+
+  com.google.protobuf
+  protobuf-java
+  compile
 
 
   com.lihaoyi
@@ -85,6 +95,7 @@
 maven-shade-plugin
 
   false
+  true
   
 
   com.google.android:*
@@ -92,52 +103,62 @@
   com.google.code.findbugs:*
   com.google.code.gson:*
   com.google.errorprone:*
-  com.google.guava:*
   com.google.j2objc:*
   com.google.protobuf:*
+  com.google.flatbuffers:*
   io.grpc:*
   io.netty:*
   io.perfmark:*
+  org.apache.arrow:*
   org.codehaus.mojo:*
   org.checkerframework:*
   

[spark] branch master updated: [SPARK-45377][CORE] Handle InputStream in NettyLogger

2023-10-02 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cdbb301143d [SPARK-45377][CORE] Handle InputStream in NettyLogger
cdbb301143d is described below

commit cdbb301143de2e9a0ea525d20867948f49863842
Author: Hasnain Lakhani 
AuthorDate: Mon Oct 2 08:27:50 2023 -0500

[SPARK-45377][CORE] Handle InputStream in NettyLogger

### What changes were proposed in this pull request?

Handle `InputStream`s in the `NettyLogger` so we can print out how many 
available bytes there are.

### Why are the changes needed?

As part of the SSL support we are going to transfer `InputStream`s via 
Netty, and this functionality makes it easy to see the size of the streams in 
the log at a glance.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

CI. Tested as part of the changes in 
https://github.com/apache/spark/pull/42685 which this is split out of, I 
observed the logs there.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43165 from hasnain-db/spark-tls-netty-logger.

Authored-by: Hasnain Lakhani 
Signed-off-by: Sean Owen 
---
 .../main/java/org/apache/spark/network/util/NettyLogger.java  | 11 +++
 1 file changed, 11 insertions(+)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/NettyLogger.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/NettyLogger.java
index 9398726a926..f4c0df6239d 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/NettyLogger.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/NettyLogger.java
@@ -17,6 +17,9 @@
 
 package org.apache.spark.network.util;
 
+import java.io.IOException;
+import java.io.InputStream;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufHolder;
 import io.netty.channel.ChannelHandlerContext;
@@ -42,6 +45,14 @@ public class NettyLogger {
   } else if (arg instanceof ByteBufHolder) {
 return format(ctx, eventName) + " " +
   ((ByteBufHolder) arg).content().readableBytes() + "B";
+  } else if (arg instanceof InputStream) {
+int available = -1;
+try {
+  available = ((InputStream) arg).available();
+} catch (IOException ex) {
+  // Swallow, but return -1 to indicate an error happened
+}
+return format(ctx, eventName, arg) + " " + available + "B";
   } else {
 return super.format(ctx, eventName, arg);
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45389][SQL][HIVE] Correct MetaException matching rule on getting partition metadata

2023-10-02 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8b3ad2fc329 [SPARK-45389][SQL][HIVE] Correct MetaException matching 
rule on getting partition metadata
8b3ad2fc329 is described below

commit 8b3ad2fc329e1813366430df7189d27b17133283
Author: Cheng Pan 
AuthorDate: Mon Oct 2 08:25:51 2023 -0500

[SPARK-45389][SQL][HIVE] Correct MetaException matching rule on getting 
partition metadata

### What changes were proposed in this pull request?

This PR aims to fix the HMS call fallback logic introduced in SPARK-35437.

```patch
try {
  ...
  hive.getPartitionNames
  ...
  hive.getPartitionsByNames
} catch {
- case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
+ case ex: HiveException if ex.getCause.isInstanceOf[MetaException] =>
  ...
}
```

### Why are the changes needed?

Directly method call won't throw `InvocationTargetException`, and check the 
code of `hive.getPartitionNames` and `hive.getPartitionsByNames`, both of them 
will wrap a `HiveException` if `MetaException` throws.

### Does this PR introduce _any_ user-facing change?

Yes, it should be a bug fix.

### How was this patch tested?

Pass GA and code review. (I'm not sure how to construct/simulate a 
MetaException during the HMS thrift call with the current HMS testing 
infrastructure)

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43191 from pan3793/SPARK-45389.

Authored-by: Cheng Pan 
Signed-off-by: Sean Owen 
---
 sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 64aa7d2d6fa..9943c0178fc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -438,7 +438,7 @@ private[client] class Shim_v2_0 extends Shim with Logging {
 recordHiveCall()
 hive.getPartitionsByNames(table, partNames.asJava)
   } catch {
-case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
+case ex: HiveException if ex.getCause.isInstanceOf[MetaException] =>
   logWarning("Caught Hive MetaException attempting to get partition 
metadata by " +
 "filter from client side. Falling back to fetching all partition 
metadata", ex)
   recordHiveCall()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [MINOR][DOCS] Fix Python code sample for StreamingQueryListener: Reporting Metrics programmatically using Asynchronous APIs

2023-10-02 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 845e4f6c5bc [MINOR][DOCS] Fix Python code sample for 
StreamingQueryListener: Reporting Metrics programmatically using Asynchronous 
APIs
845e4f6c5bc is described below

commit 845e4f6c5bcf3a368ee78757f3a74b390cdce5c0
Author: Peter Kaszt 
AuthorDate: Mon Oct 2 07:48:56 2023 -0500

[MINOR][DOCS] Fix Python code sample for StreamingQueryListener: Reporting 
Metrics programmatically using Asynchronous APIs

Fix Python language code sample in the docs for _StreamingQueryListener_:
Reporting Metrics programmatically using Asynchronous APIs section.

### What changes were proposed in this pull request?
The code sample in the [Reporting Metrics programmatically using 
Asynchronous 
APIs](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis)
 section was this:
```
spark = ...

class Listener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: " + queryStarted.id)

def onQueryProgress(self, event):
println("Query terminated: " + queryTerminated.id)

def onQueryTerminated(self, event):
println("Query made progress: " + queryProgress.progress)

spark.streams.addListener(Listener())
```

Which is not a proper Python code, and has QueryProgress and 
QueryTerminated prints mixed. Proposed change/fix:
```
spark = ...

class Listener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: " + queryStarted.id)

def onQueryProgress(self, event):
print("Query made progress: " + queryProgress.progress)

def onQueryTerminated(self, event):
print("Query terminated: " + queryTerminated.id)

spark.streams.addListener(Listener())
```

### Why are the changes needed?
To fix docimentation errors.

### Does this PR introduce _any_ user-facing change?
Yes. -> Sample python code snippet is fixed in docs (see above).

### How was this patch tested?
Checked with github's .md preview, and built the docs according to the 
readme.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43190 from kasztp/master.

Authored-by: Peter Kaszt 
Signed-off-by: Sean Owen 
(cherry picked from commit d708fd7b68bf0c9964e861cb2c81818d17d7136e)
Signed-off-by: Sean Owen 
---
 docs/structured-streaming-programming-guide.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 76a22621a0e..3e87c45a349 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -3831,10 +3831,10 @@ class Listener(StreamingQueryListener):
 print("Query started: " + queryStarted.id)
 
 def onQueryProgress(self, event):
-println("Query terminated: " + queryTerminated.id)
+print("Query made progress: " + queryProgress.progress)
 
 def onQueryTerminated(self, event):
-println("Query made progress: " + queryProgress.progress)
+   print("Query terminated: " + queryTerminated.id)
 
 
 spark.streams.addListener(Listener())


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [MINOR][DOCS] Fix Python code sample for StreamingQueryListener: Reporting Metrics programmatically using Asynchronous APIs

2023-10-02 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d708fd7b68b [MINOR][DOCS] Fix Python code sample for 
StreamingQueryListener: Reporting Metrics programmatically using Asynchronous 
APIs
d708fd7b68b is described below

commit d708fd7b68bf0c9964e861cb2c81818d17d7136e
Author: Peter Kaszt 
AuthorDate: Mon Oct 2 07:48:56 2023 -0500

[MINOR][DOCS] Fix Python code sample for StreamingQueryListener: Reporting 
Metrics programmatically using Asynchronous APIs

Fix Python language code sample in the docs for _StreamingQueryListener_:
Reporting Metrics programmatically using Asynchronous APIs section.

### What changes were proposed in this pull request?
The code sample in the [Reporting Metrics programmatically using 
Asynchronous 
APIs](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis)
 section was this:
```
spark = ...

class Listener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: " + queryStarted.id)

def onQueryProgress(self, event):
println("Query terminated: " + queryTerminated.id)

def onQueryTerminated(self, event):
println("Query made progress: " + queryProgress.progress)

spark.streams.addListener(Listener())
```

Which is not a proper Python code, and has QueryProgress and 
QueryTerminated prints mixed. Proposed change/fix:
```
spark = ...

class Listener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: " + queryStarted.id)

def onQueryProgress(self, event):
print("Query made progress: " + queryProgress.progress)

def onQueryTerminated(self, event):
print("Query terminated: " + queryTerminated.id)

spark.streams.addListener(Listener())
```

### Why are the changes needed?
To fix docimentation errors.

### Does this PR introduce _any_ user-facing change?
Yes. -> Sample python code snippet is fixed in docs (see above).

### How was this patch tested?
Checked with github's .md preview, and built the docs according to the 
readme.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43190 from kasztp/master.

Authored-by: Peter Kaszt 
Signed-off-by: Sean Owen 
---
 docs/structured-streaming-programming-guide.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 70e763be0d7..774422a9cd9 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -3837,10 +3837,10 @@ class Listener(StreamingQueryListener):
 print("Query started: " + queryStarted.id)
 
 def onQueryProgress(self, event):
-println("Query terminated: " + queryTerminated.id)
+print("Query made progress: " + queryProgress.progress)
 
 def onQueryTerminated(self, event):
-println("Query made progress: " + queryProgress.progress)
+   print("Query terminated: " + queryTerminated.id)
 
 
 spark.streams.addListener(Listener())


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45386][SQL]: Fix correctness issue with persist using StorageLevel.NONE on Dataset (#43188)

2023-10-02 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a0c9ab63f3b [SPARK-45386][SQL]: Fix correctness issue with persist 
using StorageLevel.NONE on Dataset (#43188)
a0c9ab63f3b is described below

commit a0c9ab63f3bcf4c9bb56c407375ce1c8cc26fb02
Author: Emil Ejbyfeldt 
AuthorDate: Mon Oct 2 11:36:53 2023 +0200

[SPARK-45386][SQL]: Fix correctness issue with persist using 
StorageLevel.NONE on Dataset (#43188)

* SPARK-45386: Fix correctness issue with StorageLevel.NONE

* Move to CacheManager

* Add comment
---
 .../main/scala/org/apache/spark/sql/execution/CacheManager.scala| 4 +++-
 sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 064819275e0..e906c74f8a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -113,7 +113,9 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
   planToCache: LogicalPlan,
   tableName: Option[String],
   storageLevel: StorageLevel): Unit = {
-if (lookupCachedData(planToCache).nonEmpty) {
+if (storageLevel == StorageLevel.NONE) {
+  // Do nothing for StorageLevel.NONE since it will not actually cache any 
data.
+} else if (lookupCachedData(planToCache).nonEmpty) {
   logWarning("Asked to cache already cached data.")
 } else {
   val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 04e619fa908..8fb25e120f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
+import org.apache.spark.storage.StorageLevel
 
 case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2)
 case class TestDataPoint2(x: Int, s: String)
@@ -2604,6 +2605,11 @@ class DatasetSuite extends QueryTest
 parameters = Map("cls" -> classOf[Array[Int]].getName))
 }
   }
+
+  test("SPARK-45386: persist with StorageLevel.NONE should give correct 
count") {
+val ds = Seq(1, 2).toDS().persist(StorageLevel.NONE)
+assert(ds.count() == 2)
+  }
 }
 
 class DatasetLargeResultCollectingSuite extends QueryTest


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org