[spark] branch master updated: [SPARK-45250][CORE] Support stage level task resource profile for yarn cluster when dynamic allocation disabled
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5b80639e643 [SPARK-45250][CORE] Support stage level task resource profile for yarn cluster when dynamic allocation disabled 5b80639e643 is described below commit 5b80639e643b6dd09dd64c3f43ec039b2ef2f9fd Author: Bobby Wang AuthorDate: Mon Oct 2 23:00:56 2023 -0500 [SPARK-45250][CORE] Support stage level task resource profile for yarn cluster when dynamic allocation disabled ### What changes were proposed in this pull request? This PR is a follow-up of https://github.com/apache/spark/pull/37268 which supports stage level task resource profile for standalone cluster when dynamic allocation disabled. This PR enables stage-level task resource profile for yarn cluster. ### Why are the changes needed? Users who work on spark ML/DL cases running on Yarn would expect stage-level task resource profile feature. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The current tests of https://github.com/apache/spark/pull/37268 can also cover this PR since both yarn and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover yarn cluster. Apart from that, I also performed some manual tests which have been updated in the comments. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43030 from wbo4958/yarn-task-resoure-profile. Authored-by: Bobby Wang Signed-off-by: Mridul Muralidharan gmail.com> --- .../apache/spark/resource/ResourceProfileManager.scala| 6 +++--- .../spark/resource/ResourceProfileManagerSuite.scala | 15 +-- docs/configuration.md | 2 +- docs/running-on-yarn.md | 6 +- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala index 9f98d4d9c9c..cd7124a5724 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala @@ -67,9 +67,9 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf, */ private[spark] def isSupported(rp: ResourceProfile): Boolean = { if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) { - if ((notRunningUnitTests || testExceptionThrown) && !isStandaloneOrLocalCluster) { -throw new SparkException("TaskResourceProfiles are only supported for Standalone " + - "cluster for now when dynamic allocation is disabled.") + if ((notRunningUnitTests || testExceptionThrown) && !(isStandaloneOrLocalCluster || isYarn)) { +throw new SparkException("TaskResourceProfiles are only supported for Standalone and " + + "Yarn cluster for now when dynamic allocation is disabled.") } } else { val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala index e97d5c7883a..77dc7bcb4c5 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala @@ -126,18 +126,29 @@ class ResourceProfileManagerSuite extends SparkFunSuite { val defaultProf = rpmanager.defaultResourceProfile assert(rpmanager.isSupported(defaultProf)) -// task resource profile. +// Standalone: supports task resource profile. val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1) val taskProf = new TaskResourceProfile(gpuTaskReq.requests) assert(rpmanager.isSupported(taskProf)) +// Local: doesn't support task resource profile. conf.setMaster("local") rpmanager = new ResourceProfileManager(conf, listenerBus) val error = intercept[SparkException] { rpmanager.isSupported(taskProf) }.getMessage assert(error === "TaskResourceProfiles are only supported for Standalone " + - "cluster for now when dynamic allocation is disabled.") + "and Yarn cluster for now when dynamic allocation is disabled.") + +// Local cluster: supports task resource profile. +conf.setMaster("local-cluster[1, 1, 1024]") +rpmanager = new ResourceProfileManager(conf, listenerBus) +assert(rpmanager.isSupported(taskProf)) + +// Yarn: supports task resource profile. +conf.setMaster("yarn") +
[spark] branch master updated: [SPARK-45378][CORE] Add convertToNettyForSsl to ManagedBuffer
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b01dce2b2a5 [SPARK-45378][CORE] Add convertToNettyForSsl to ManagedBuffer b01dce2b2a5 is described below commit b01dce2b2a57b933283d6fd350aa917d3cd76d83 Author: Hasnain Lakhani AuthorDate: Mon Oct 2 22:56:03 2023 -0500 [SPARK-45378][CORE] Add convertToNettyForSsl to ManagedBuffer ### What changes were proposed in this pull request? As the title suggests. In addition to that API, add a config to the `TransportConf` to configure the default block size if desired. ### Why are the changes needed? Netty's SSL support does not support zero-copy transfers. In order to support SSL using Netty we need to add another API to the `ManagedBuffer` which lets buffers return a different data type. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? CI. This will have tests added later - it's tested as part of https://github.com/apache/spark/pull/42685 from which this is split out. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43166 from hasnain-db/spark-tls-buffers. Authored-by: Hasnain Lakhani Signed-off-by: Mridul Muralidharan gmail.com> --- .../network/buffer/FileSegmentManagedBuffer.java | 7 +++ .../apache/spark/network/buffer/ManagedBuffer.java | 14 ++ .../spark/network/buffer/NettyManagedBuffer.java | 5 + .../spark/network/buffer/NioManagedBuffer.java | 5 + .../org/apache/spark/network/util/TransportConf.java | 8 .../org/apache/spark/network/TestManagedBuffer.java | 5 + .../org/apache/spark/storage/BlockManager.scala | 9 + .../spark/storage/BlockManagerManagedBuffer.scala| 2 ++ .../scala/org/apache/spark/storage/DiskStore.scala | 13 + .../org/apache/spark/util/io/ChunkedByteBuffer.scala | 20 .../spark/network/BlockTransferServiceSuite.scala| 2 ++ .../spark/shuffle/BlockStoreShuffleReaderSuite.scala | 1 + 12 files changed, 91 insertions(+) diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index 66566b67870..dd7c2061ec9 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -28,6 +28,7 @@ import java.nio.file.StandardOpenOption; import com.google.common.io.ByteStreams; import io.netty.channel.DefaultFileRegion; +import io.netty.handler.stream.ChunkedStream; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -137,6 +138,12 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { } } + @Override + public Object convertToNettyForSsl() throws IOException { +// Cannot use zero-copy with HTTPS +return new ChunkedStream(createInputStream(), conf.sslShuffleChunkSize()); + } + public File getFile() { return file; } public long getOffset() { return offset; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java index 4dd8cec2900..893aa106a3f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java @@ -75,4 +75,18 @@ public abstract class ManagedBuffer { * the caller will be responsible for releasing this new reference. */ public abstract Object convertToNetty() throws IOException; + + /** + * Convert the buffer into a Netty object, used to write the data out with SSL encryption, + * which cannot use {@link io.netty.channel.FileRegion}. + * The return value is either a {@link io.netty.buffer.ByteBuf}, + * a {@link io.netty.handler.stream.ChunkedStream}, or a {@link java.io.InputStream}. + * + * If this method returns a ByteBuf, then that buffer's reference count will be incremented and + * the caller will be responsible for releasing this new reference. + * + * Once `kernel.ssl.sendfile` and OpenSSL's `ssl_sendfile` are more widely adopted (and supported + * in Netty), we can potentially deprecate these APIs and just use `convertToNetty`. + */ + public abstract Object convertToNettyForSsl() throws IOException; } diff --git
[spark] branch branch-3.5 updated: [SPARK-45371][CONNECT] Fix shading issues in the Spark Connect Scala Client
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new c5203abcbd1 [SPARK-45371][CONNECT] Fix shading issues in the Spark Connect Scala Client c5203abcbd1 is described below commit c5203abcbd191423071ef3603e95a7941bb1eec2 Author: Herman van Hovell AuthorDate: Mon Oct 2 13:03:06 2023 -0400 [SPARK-45371][CONNECT] Fix shading issues in the Spark Connect Scala Client ### What changes were proposed in this pull request? This PR fixes shading for the Spark Connect Scala Client maven build. The following things are addressed: - Guava and protobuf are included in the shaded jars. These were missing, and were causing users to see `ClassNotFoundException`s. - Fixed duplicate shading of guava. We use the parent pom's location now. - Fixed duplicate Netty dependency (shaded and transitive). One was used for GRPC and the other was needed by Arrow. This was fixed by pulling arrow into the shaded jar. - Use the same package as the shading defined in the parent package. ### Why are the changes needed? The maven artifacts for the Spark Connect Scala Client are currently broken. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual tests. Step 1: Build new shaded library and install it in local maven repository `build/mvn clean install -pl connector/connect/client/jvm -am -DskipTests` Step 2: Start Connect Server `connector/connect/bin/spark-connect` Step 3: Launch REPL using the newly created library This step requires [coursier](https://get-coursier.io/) to be installed. `cs launch --jvm zulu:17.0.8 --scala 2.13.9 -r m2Local com.lihaoyi:::ammonite:2.5.11 org.apache.spark::spark-connect-client-jvm:4.0.0-SNAPSHOT --java-opt --add-opens=java.base/java.nio=ALL-UNNAMED -M org.apache.spark.sql.application.ConnectRepl` Step 4: Run a bunch of commands: ```scala // Check version spark.version // Run a simple query { spark.range(1, 1, 1) .select($"id", $"id" % 5 as "group", rand(1).as("v1"), rand(2).as("v2")) .groupBy($"group") .agg( avg($"v1").as("v1_avg"), avg($"v2").as("v2_avg")) .show() } // Run a streaming query { import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger val query_name = "simple_streaming" val stream = spark.readStream .format("rate") .option("numPartitions", "1") .option("rowsPerSecond", "10") .load() .withWatermark("timestamp", "10 milliseconds") .groupBy(window(col("timestamp"), "10 milliseconds")) .count() .selectExpr("window.start as timestamp", "count as num_events") .writeStream .format("memory") .queryName(query_name) .trigger(ProcessingTimeTrigger.create("10 milliseconds")) // run for 20 seconds val query = stream.start() val start = System.currentTimeMillis() val end = System.currentTimeMillis() + 20 * 1000 while (System.currentTimeMillis() < end) { println(s"time: ${System.currentTimeMillis() - start} ms") println(query.status) spark.sql(s"select * from ${query_name}").show() Thread.sleep(500) } query.stop() } ``` Closes #43195 from hvanhovell/SPARK-45371. Authored-by: Herman van Hovell Signed-off-by: Herman van Hovell (cherry picked from commit e53abbbceaa2c41babaa23fe4c2f282f559b4c03) Signed-off-by: Herman van Hovell --- connector/connect/client/jvm/pom.xml | 39 +++- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/connector/connect/client/jvm/pom.xml b/connector/connect/client/jvm/pom.xml index 67227ef38eb..236e5850b76 100644 --- a/connector/connect/client/jvm/pom.xml +++ b/connector/connect/client/jvm/pom.xml @@ -50,10 +50,20 @@ spark-sketch_${scala.binary.version} ${project.version} + com.google.guava guava ${connect.guava.version} + compile + + + com.google.protobuf + protobuf-java + compile com.lihaoyi @@ -85,6 +95,7 @@ maven-shade-plugin false + true com.google.android:* @@ -92,52 +103,62 @@ com.google.code.findbugs:* com.google.code.gson:* com.google.errorprone:* - com.google.guava:* com.google.j2objc:* com.google.protobuf:* + com.google.flatbuffers:* io.grpc:* io.netty:* io.perfmark:* +
[spark] branch master updated: [SPARK-45371][CONNECT] Fix shading issues in the Spark Connect Scala Client
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e53abbbceaa [SPARK-45371][CONNECT] Fix shading issues in the Spark Connect Scala Client e53abbbceaa is described below commit e53abbbceaa2c41babaa23fe4c2f282f559b4c03 Author: Herman van Hovell AuthorDate: Mon Oct 2 13:03:06 2023 -0400 [SPARK-45371][CONNECT] Fix shading issues in the Spark Connect Scala Client ### What changes were proposed in this pull request? This PR fixes shading for the Spark Connect Scala Client maven build. The following things are addressed: - Guava and protobuf are included in the shaded jars. These were missing, and were causing users to see `ClassNotFoundException`s. - Fixed duplicate shading of guava. We use the parent pom's location now. - Fixed duplicate Netty dependency (shaded and transitive). One was used for GRPC and the other was needed by Arrow. This was fixed by pulling arrow into the shaded jar. - Use the same package as the shading defined in the parent package. ### Why are the changes needed? The maven artifacts for the Spark Connect Scala Client are currently broken. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual tests. Step 1: Build new shaded library and install it in local maven repository `build/mvn clean install -pl connector/connect/client/jvm -am -DskipTests` Step 2: Start Connect Server `connector/connect/bin/spark-connect` Step 3: Launch REPL using the newly created library This step requires [coursier](https://get-coursier.io/) to be installed. `cs launch --jvm zulu:17.0.8 --scala 2.13.9 -r m2Local com.lihaoyi:::ammonite:2.5.11 org.apache.spark::spark-connect-client-jvm:4.0.0-SNAPSHOT --java-opt --add-opens=java.base/java.nio=ALL-UNNAMED -M org.apache.spark.sql.application.ConnectRepl` Step 4: Run a bunch of commands: ```scala // Check version spark.version // Run a simple query { spark.range(1, 1, 1) .select($"id", $"id" % 5 as "group", rand(1).as("v1"), rand(2).as("v2")) .groupBy($"group") .agg( avg($"v1").as("v1_avg"), avg($"v2").as("v2_avg")) .show() } // Run a streaming query { import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger val query_name = "simple_streaming" val stream = spark.readStream .format("rate") .option("numPartitions", "1") .option("rowsPerSecond", "10") .load() .withWatermark("timestamp", "10 milliseconds") .groupBy(window(col("timestamp"), "10 milliseconds")) .count() .selectExpr("window.start as timestamp", "count as num_events") .writeStream .format("memory") .queryName(query_name) .trigger(ProcessingTimeTrigger.create("10 milliseconds")) // run for 20 seconds val query = stream.start() val start = System.currentTimeMillis() val end = System.currentTimeMillis() + 20 * 1000 while (System.currentTimeMillis() < end) { println(s"time: ${System.currentTimeMillis() - start} ms") println(query.status) spark.sql(s"select * from ${query_name}").show() Thread.sleep(500) } query.stop() } ``` Closes #43195 from hvanhovell/SPARK-45371. Authored-by: Herman van Hovell Signed-off-by: Herman van Hovell --- connector/connect/client/jvm/pom.xml | 39 +++- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/connector/connect/client/jvm/pom.xml b/connector/connect/client/jvm/pom.xml index 9ca66b5c29c..a9040107f38 100644 --- a/connector/connect/client/jvm/pom.xml +++ b/connector/connect/client/jvm/pom.xml @@ -50,10 +50,20 @@ spark-sketch_${scala.binary.version} ${project.version} + com.google.guava guava ${connect.guava.version} + compile + + + com.google.protobuf + protobuf-java + compile com.lihaoyi @@ -85,6 +95,7 @@ maven-shade-plugin false + true com.google.android:* @@ -92,52 +103,62 @@ com.google.code.findbugs:* com.google.code.gson:* com.google.errorprone:* - com.google.guava:* com.google.j2objc:* com.google.protobuf:* + com.google.flatbuffers:* io.grpc:* io.netty:* io.perfmark:* + org.apache.arrow:* org.codehaus.mojo:* org.checkerframework:*
[spark] branch master updated: [SPARK-45377][CORE] Handle InputStream in NettyLogger
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cdbb301143d [SPARK-45377][CORE] Handle InputStream in NettyLogger cdbb301143d is described below commit cdbb301143de2e9a0ea525d20867948f49863842 Author: Hasnain Lakhani AuthorDate: Mon Oct 2 08:27:50 2023 -0500 [SPARK-45377][CORE] Handle InputStream in NettyLogger ### What changes were proposed in this pull request? Handle `InputStream`s in the `NettyLogger` so we can print out how many available bytes there are. ### Why are the changes needed? As part of the SSL support we are going to transfer `InputStream`s via Netty, and this functionality makes it easy to see the size of the streams in the log at a glance. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? CI. Tested as part of the changes in https://github.com/apache/spark/pull/42685 which this is split out of, I observed the logs there. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43165 from hasnain-db/spark-tls-netty-logger. Authored-by: Hasnain Lakhani Signed-off-by: Sean Owen --- .../main/java/org/apache/spark/network/util/NettyLogger.java | 11 +++ 1 file changed, 11 insertions(+) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyLogger.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyLogger.java index 9398726a926..f4c0df6239d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyLogger.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyLogger.java @@ -17,6 +17,9 @@ package org.apache.spark.network.util; +import java.io.IOException; +import java.io.InputStream; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; import io.netty.channel.ChannelHandlerContext; @@ -42,6 +45,14 @@ public class NettyLogger { } else if (arg instanceof ByteBufHolder) { return format(ctx, eventName) + " " + ((ByteBufHolder) arg).content().readableBytes() + "B"; + } else if (arg instanceof InputStream) { +int available = -1; +try { + available = ((InputStream) arg).available(); +} catch (IOException ex) { + // Swallow, but return -1 to indicate an error happened +} +return format(ctx, eventName, arg) + " " + available + "B"; } else { return super.format(ctx, eventName, arg); } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45389][SQL][HIVE] Correct MetaException matching rule on getting partition metadata
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8b3ad2fc329 [SPARK-45389][SQL][HIVE] Correct MetaException matching rule on getting partition metadata 8b3ad2fc329 is described below commit 8b3ad2fc329e1813366430df7189d27b17133283 Author: Cheng Pan AuthorDate: Mon Oct 2 08:25:51 2023 -0500 [SPARK-45389][SQL][HIVE] Correct MetaException matching rule on getting partition metadata ### What changes were proposed in this pull request? This PR aims to fix the HMS call fallback logic introduced in SPARK-35437. ```patch try { ... hive.getPartitionNames ... hive.getPartitionsByNames } catch { - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => + case ex: HiveException if ex.getCause.isInstanceOf[MetaException] => ... } ``` ### Why are the changes needed? Directly method call won't throw `InvocationTargetException`, and check the code of `hive.getPartitionNames` and `hive.getPartitionsByNames`, both of them will wrap a `HiveException` if `MetaException` throws. ### Does this PR introduce _any_ user-facing change? Yes, it should be a bug fix. ### How was this patch tested? Pass GA and code review. (I'm not sure how to construct/simulate a MetaException during the HMS thrift call with the current HMS testing infrastructure) ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43191 from pan3793/SPARK-45389. Authored-by: Cheng Pan Signed-off-by: Sean Owen --- sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 64aa7d2d6fa..9943c0178fc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -438,7 +438,7 @@ private[client] class Shim_v2_0 extends Shim with Logging { recordHiveCall() hive.getPartitionsByNames(table, partNames.asJava) } catch { -case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => +case ex: HiveException if ex.getCause.isInstanceOf[MetaException] => logWarning("Caught Hive MetaException attempting to get partition metadata by " + "filter from client side. Falling back to fetching all partition metadata", ex) recordHiveCall() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [MINOR][DOCS] Fix Python code sample for StreamingQueryListener: Reporting Metrics programmatically using Asynchronous APIs
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 845e4f6c5bc [MINOR][DOCS] Fix Python code sample for StreamingQueryListener: Reporting Metrics programmatically using Asynchronous APIs 845e4f6c5bc is described below commit 845e4f6c5bcf3a368ee78757f3a74b390cdce5c0 Author: Peter Kaszt AuthorDate: Mon Oct 2 07:48:56 2023 -0500 [MINOR][DOCS] Fix Python code sample for StreamingQueryListener: Reporting Metrics programmatically using Asynchronous APIs Fix Python language code sample in the docs for _StreamingQueryListener_: Reporting Metrics programmatically using Asynchronous APIs section. ### What changes were proposed in this pull request? The code sample in the [Reporting Metrics programmatically using Asynchronous APIs](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis) section was this: ``` spark = ... class Listener(StreamingQueryListener): def onQueryStarted(self, event): print("Query started: " + queryStarted.id) def onQueryProgress(self, event): println("Query terminated: " + queryTerminated.id) def onQueryTerminated(self, event): println("Query made progress: " + queryProgress.progress) spark.streams.addListener(Listener()) ``` Which is not a proper Python code, and has QueryProgress and QueryTerminated prints mixed. Proposed change/fix: ``` spark = ... class Listener(StreamingQueryListener): def onQueryStarted(self, event): print("Query started: " + queryStarted.id) def onQueryProgress(self, event): print("Query made progress: " + queryProgress.progress) def onQueryTerminated(self, event): print("Query terminated: " + queryTerminated.id) spark.streams.addListener(Listener()) ``` ### Why are the changes needed? To fix docimentation errors. ### Does this PR introduce _any_ user-facing change? Yes. -> Sample python code snippet is fixed in docs (see above). ### How was this patch tested? Checked with github's .md preview, and built the docs according to the readme. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43190 from kasztp/master. Authored-by: Peter Kaszt Signed-off-by: Sean Owen (cherry picked from commit d708fd7b68bf0c9964e861cb2c81818d17d7136e) Signed-off-by: Sean Owen --- docs/structured-streaming-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 76a22621a0e..3e87c45a349 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -3831,10 +3831,10 @@ class Listener(StreamingQueryListener): print("Query started: " + queryStarted.id) def onQueryProgress(self, event): -println("Query terminated: " + queryTerminated.id) +print("Query made progress: " + queryProgress.progress) def onQueryTerminated(self, event): -println("Query made progress: " + queryProgress.progress) + print("Query terminated: " + queryTerminated.id) spark.streams.addListener(Listener()) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][DOCS] Fix Python code sample for StreamingQueryListener: Reporting Metrics programmatically using Asynchronous APIs
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d708fd7b68b [MINOR][DOCS] Fix Python code sample for StreamingQueryListener: Reporting Metrics programmatically using Asynchronous APIs d708fd7b68b is described below commit d708fd7b68bf0c9964e861cb2c81818d17d7136e Author: Peter Kaszt AuthorDate: Mon Oct 2 07:48:56 2023 -0500 [MINOR][DOCS] Fix Python code sample for StreamingQueryListener: Reporting Metrics programmatically using Asynchronous APIs Fix Python language code sample in the docs for _StreamingQueryListener_: Reporting Metrics programmatically using Asynchronous APIs section. ### What changes were proposed in this pull request? The code sample in the [Reporting Metrics programmatically using Asynchronous APIs](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis) section was this: ``` spark = ... class Listener(StreamingQueryListener): def onQueryStarted(self, event): print("Query started: " + queryStarted.id) def onQueryProgress(self, event): println("Query terminated: " + queryTerminated.id) def onQueryTerminated(self, event): println("Query made progress: " + queryProgress.progress) spark.streams.addListener(Listener()) ``` Which is not a proper Python code, and has QueryProgress and QueryTerminated prints mixed. Proposed change/fix: ``` spark = ... class Listener(StreamingQueryListener): def onQueryStarted(self, event): print("Query started: " + queryStarted.id) def onQueryProgress(self, event): print("Query made progress: " + queryProgress.progress) def onQueryTerminated(self, event): print("Query terminated: " + queryTerminated.id) spark.streams.addListener(Listener()) ``` ### Why are the changes needed? To fix docimentation errors. ### Does this PR introduce _any_ user-facing change? Yes. -> Sample python code snippet is fixed in docs (see above). ### How was this patch tested? Checked with github's .md preview, and built the docs according to the readme. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43190 from kasztp/master. Authored-by: Peter Kaszt Signed-off-by: Sean Owen --- docs/structured-streaming-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 70e763be0d7..774422a9cd9 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -3837,10 +3837,10 @@ class Listener(StreamingQueryListener): print("Query started: " + queryStarted.id) def onQueryProgress(self, event): -println("Query terminated: " + queryTerminated.id) +print("Query made progress: " + queryProgress.progress) def onQueryTerminated(self, event): -println("Query made progress: " + queryProgress.progress) + print("Query terminated: " + queryTerminated.id) spark.streams.addListener(Listener()) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45386][SQL]: Fix correctness issue with persist using StorageLevel.NONE on Dataset (#43188)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a0c9ab63f3b [SPARK-45386][SQL]: Fix correctness issue with persist using StorageLevel.NONE on Dataset (#43188) a0c9ab63f3b is described below commit a0c9ab63f3bcf4c9bb56c407375ce1c8cc26fb02 Author: Emil Ejbyfeldt AuthorDate: Mon Oct 2 11:36:53 2023 +0200 [SPARK-45386][SQL]: Fix correctness issue with persist using StorageLevel.NONE on Dataset (#43188) * SPARK-45386: Fix correctness issue with StorageLevel.NONE * Move to CacheManager * Add comment --- .../main/scala/org/apache/spark/sql/execution/CacheManager.scala| 4 +++- sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 064819275e0..e906c74f8a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -113,7 +113,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { planToCache: LogicalPlan, tableName: Option[String], storageLevel: StorageLevel): Unit = { -if (lookupCachedData(planToCache).nonEmpty) { +if (storageLevel == StorageLevel.NONE) { + // Do nothing for StorageLevel.NONE since it will not actually cache any data. +} else if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 04e619fa908..8fb25e120f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ +import org.apache.spark.storage.StorageLevel case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2) case class TestDataPoint2(x: Int, s: String) @@ -2604,6 +2605,11 @@ class DatasetSuite extends QueryTest parameters = Map("cls" -> classOf[Array[Int]].getName)) } } + + test("SPARK-45386: persist with StorageLevel.NONE should give correct count") { +val ds = Seq(1, 2).toDS().persist(StorageLevel.NONE) +assert(ds.count() == 2) + } } class DatasetLargeResultCollectingSuite extends QueryTest - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org