[jira] [Commented] (SPARK-32385) Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies
[ https://issues.apache.org/jira/browse/SPARK-32385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186855#comment-17186855 ] Vladimir Matveev commented on SPARK-32385: -- > I am still not quite sure what this gains if it doesn't change dependency > resolution. It does not change dependency resolution by itself, it adds an ability for the user, if they want to, to automatically lock onto versions explicitly declared by the Spark project. So yeah, this: > Is it just that you declare one artifact POM to depend on that declares a > bunch of dependent versions, so people don't go depending on different > versions? pretty much summarizes it; this could be expanded to say that (depending on the build tool) it may also enforce these dependent versions in case of conflicts. > I mean people can already do that by setting some spark.version property in > their build. They can't in general, because while it will enforce the Spark's own version, it won't necessarily determine the versions of transitive dependencies. The latter will only happen when the consumer also uses Maven, and when they have a particular order of dependencies in their POM declaration (e.g. no newer Jackson version declared transitively lexically earlier than Spark). > What is important is: if we change the build and it changes Spark's > transitive dependencies for downstream users, that could be a breaking change. My understanding is that this should not happen, unless the user explicitly opts into using the BOM, in which case it arguably changes the situation for the better in most cases, because now versions are guaranteed to align with Spark's declarations. > Publish a "bill of materials" (BOM) descriptor for Spark with correct > versions of various dependencies > -- > > Key: SPARK-32385 > URL: https://issues.apache.org/jira/browse/SPARK-32385 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.1.0 >Reporter: Vladimir Matveev >Priority: Major > > Spark has a lot of dependencies, many of them very common (e.g. Guava, > Jackson). Also, versions of these dependencies are not updated as frequently > as they are released upstream, which is totally understandable and natural, > but which also means that often Spark has a dependency on a lower version of > a library, which is incompatible with a higher, more recent version of the > same library. This incompatibility can manifest in different ways, e.g as > classpath errors or runtime check errors (like with Jackson), in certain > cases. > > Spark does attempt to "fix" versions of its dependencies by declaring them > explicitly in its {{pom.xml}} file. However, this approach, being somewhat > workable if the Spark-using project itself uses Maven, breaks down if another > build system is used, like Gradle. The reason is that Maven uses an > unconventional "nearest first" version conflict resolution strategy, while > many other tools like Gradle use the "highest first" strategy which resolves > the highest possible version number inside the entire graph of dependencies. > This means that other dependencies of the project can pull a higher version > of some dependency, which is incompatible with Spark. > > One example would be an explicit or a transitive dependency on a higher > version of Jackson in the project. Spark itself depends on several modules of > Jackson; if only one of them gets a higher version, and others remain on the > lower version, this will result in runtime exceptions due to an internal > version check in Jackson. > > A widely used solution for this kind of version issues is publishing of a > "bill of materials" descriptor (see here: > [https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html] > and here: > [https://docs.gradle.org/current/userguide/platforms.html#sub:bom_import]). > This descriptor would contain all versions of all dependencies of Spark; then > downstream projects will be able to use their build system's support for BOMs > to enforce version constraints required for Spark to function correctly. > > One example of successful implementation of the BOM-based approach is Spring: > [https://www.baeldung.com/spring-maven-bom#spring-bom]. For different Spring > projects, e.g. Spring Boot, there are BOM descriptors published which can be > used in downstream projects to fix the versions of Spring components and > their dependencies, significantly reducing confusion around proper version > numbers. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark
[jira] [Comment Edited] (SPARK-32385) Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies
[ https://issues.apache.org/jira/browse/SPARK-32385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186846#comment-17186846 ] Vladimir Matveev edited comment on SPARK-32385 at 8/28/20, 11:37 PM: - [~srowen] Sorry for the delayed response! > This requires us fixing every version of every transitive dependency. How > does that get updated as the transitive dependency graph changes? this > exchanges one problem for another I think. That is, we are definitely not > trying to fix dependency versions except where necessary. I don't think this is right — you don't have to fix more than just direct dependencies, like you already do. It's pretty much the same thing as defining the version numbers like [here|https://github.com/apache/spark/blob/a0bd273bb04d9a5684e291ec44617972dcd4accd/pom.xml#L121-L197] and then declaring specific dependencies with the versions below. It's just it is done slightly differently, by using Maven's {{}} mechanism and POM inheritance (for Maven; for Gradle e.g. it would be this "platform" thing). > Gradle isn't something that this project supports, but, wouldn't this be a > much bigger general problem if its resolution rules are different from Maven? > that is, surely gradle can emulate Maven if necessary. I don't think Gradle can emulate Maven, and I personally don't think it should, because Maven's strategy for conflict resolution is quite unconventional, and is not used by most of the dependency management tools, not just in the Java world. Also, I naturally don't have statistics, so this is just my speculation, but it seems likely to me that most of the downstream projects which use Spark don't actually use Maven for dependency management, especially given its Scala heritage. Therefore, they can't take advantage of Maven's dependency resolution algorithm and the current Spark's POM configuration. Also I'd like to point out again that this whole BOM mechanism is something which _Maven_ supports natively, it's not a Gradle extension or something. The BOM concept originated in Maven, and it is declared using Maven's {{}} block, which is a part of POM syntax. Hopefully this would reduce some of the concerns about it. was (Author: netvl): Sorry for the delayed response! > This requires us fixing every version of every transitive dependency. How > does that get updated as the transitive dependency graph changes? this > exchanges one problem for another I think. That is, we are definitely not > trying to fix dependency versions except where necessary. I don't think this is right — you don't have to fix more than just direct dependencies, like you already do. It's pretty much the same thing as defining the version numbers like [here|https://github.com/apache/spark/blob/a0bd273bb04d9a5684e291ec44617972dcd4accd/pom.xml#L121-L197] and then declaring specific dependencies with the versions below. It's just it is done slightly differently, by using Maven's `` mechanism and POM inheritance (for Maven; for Gradle e.g. it would be this "platform" thing). > Gradle isn't something that this project supports, but, wouldn't this be a > much bigger general problem if its resolution rules are different from Maven? > that is, surely gradle can emulate Maven if necessary. I don't think Gradle can emulate Maven, and I personally don't think it should, because Maven's strategy for conflict resolution is quite unconventional, and is not used by most of the dependency management tools, not just in the Java world. Also, I naturally don't have statistics, so this is just my speculation, but it seems likely to me that most of the downstream projects which use Spark don't actually use Maven for dependency management, especially given its Scala heritage. Therefore, they can't take advantage of Maven's dependency resolution algorithm and the current Spark's POM configuration. Also I'd like to point out again that this whole BOM mechanism is something which _Maven_ supports natively, it's not a Gradle extension or something. The BOM concept originated in Maven, and it is declared using Maven's {{}} block, which is a part of POM syntax. Hopefully this would reduce some of the concerns about it. > Publish a "bill of materials" (BOM) descriptor for Spark with correct > versions of various dependencies > -- > > Key: SPARK-32385 > URL: https://issues.apache.org/jira/browse/SPARK-32385 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.1.0 >Reporter: Vladimir Matveev >Priority: Major > > Spark has a lot of dependencies, many of them very common (e.g. Guava, > Jackson). Also, versions of these dependencies are not updated as freq
[jira] [Commented] (SPARK-32385) Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies
[ https://issues.apache.org/jira/browse/SPARK-32385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186846#comment-17186846 ] Vladimir Matveev commented on SPARK-32385: -- Sorry for the delayed response! > This requires us fixing every version of every transitive dependency. How > does that get updated as the transitive dependency graph changes? this > exchanges one problem for another I think. That is, we are definitely not > trying to fix dependency versions except where necessary. I don't think this is right — you don't have to fix more than just direct dependencies, like you already do. It's pretty much the same thing as defining the version numbers like [here|https://github.com/apache/spark/blob/a0bd273bb04d9a5684e291ec44617972dcd4accd/pom.xml#L121-L197] and then declaring specific dependencies with the versions below. It's just it is done slightly differently, by using Maven's `` mechanism and POM inheritance (for Maven; for Gradle e.g. it would be this "platform" thing). > Gradle isn't something that this project supports, but, wouldn't this be a > much bigger general problem if its resolution rules are different from Maven? > that is, surely gradle can emulate Maven if necessary. I don't think Gradle can emulate Maven, and I personally don't think it should, because Maven's strategy for conflict resolution is quite unconventional, and is not used by most of the dependency management tools, not just in the Java world. Also, I naturally don't have statistics, so this is just my speculation, but it seems likely to me that most of the downstream projects which use Spark don't actually use Maven for dependency management, especially given its Scala heritage. Therefore, they can't take advantage of Maven's dependency resolution algorithm and the current Spark's POM configuration. Also I'd like to point out again that this whole BOM mechanism is something which _Maven_ supports natively, it's not a Gradle extension or something. The BOM concept originated in Maven, and it is declared using Maven's {{}} block, which is a part of POM syntax. Hopefully this would reduce some of the concerns about it. > Publish a "bill of materials" (BOM) descriptor for Spark with correct > versions of various dependencies > -- > > Key: SPARK-32385 > URL: https://issues.apache.org/jira/browse/SPARK-32385 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.1.0 >Reporter: Vladimir Matveev >Priority: Major > > Spark has a lot of dependencies, many of them very common (e.g. Guava, > Jackson). Also, versions of these dependencies are not updated as frequently > as they are released upstream, which is totally understandable and natural, > but which also means that often Spark has a dependency on a lower version of > a library, which is incompatible with a higher, more recent version of the > same library. This incompatibility can manifest in different ways, e.g as > classpath errors or runtime check errors (like with Jackson), in certain > cases. > > Spark does attempt to "fix" versions of its dependencies by declaring them > explicitly in its {{pom.xml}} file. However, this approach, being somewhat > workable if the Spark-using project itself uses Maven, breaks down if another > build system is used, like Gradle. The reason is that Maven uses an > unconventional "nearest first" version conflict resolution strategy, while > many other tools like Gradle use the "highest first" strategy which resolves > the highest possible version number inside the entire graph of dependencies. > This means that other dependencies of the project can pull a higher version > of some dependency, which is incompatible with Spark. > > One example would be an explicit or a transitive dependency on a higher > version of Jackson in the project. Spark itself depends on several modules of > Jackson; if only one of them gets a higher version, and others remain on the > lower version, this will result in runtime exceptions due to an internal > version check in Jackson. > > A widely used solution for this kind of version issues is publishing of a > "bill of materials" descriptor (see here: > [https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html] > and here: > [https://docs.gradle.org/current/userguide/platforms.html#sub:bom_import]). > This descriptor would contain all versions of all dependencies of Spark; then > downstream projects will be able to use their build system's support for BOMs > to enforce version constraints required for Spark to function correctly. > > One example of successful implementation of the BOM-based approach is Spring: > [https://w
[jira] [Commented] (SPARK-32385) Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies
[ https://issues.apache.org/jira/browse/SPARK-32385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17178581#comment-17178581 ] Vladimir Matveev commented on SPARK-32385: -- [~srowen] a BOM descriptor can be used as a "platform" in Gradle and most likely in Maven (I don't know for sure, but the concept of BOM originates from Maven, so supposedly the tool itself supports it) to enforce compatible version numbers in the dependency graph. Just regular POMs cannot do this, because a regular POM forms just a single node in a dependency graph, and most of the dependency resolution tools take the entire graph of dependencies into account, which may result in accidentally bumped versions somewhere, require manual and ad-hoc resolution in most cases. With BOM, it is sufficient to tell the dependency engine that it should use this BOM to enforce versions, and that's it - the versions will now be fixed to the versions declared by the framework (Spark in this case). As I said, Spring framework uses this concept to a great success to ensure that applications using Spring always have compatible and tested versions. Naturally, `deps/` files is just a list of jar files, and cannot be used for dependency resolution. Also note that such a BOM descriptor would allow to centralize the version declarations within the Spark project itself, so it won't be something "on top" to support, at least as far as I understand it. > Publish a "bill of materials" (BOM) descriptor for Spark with correct > versions of various dependencies > -- > > Key: SPARK-32385 > URL: https://issues.apache.org/jira/browse/SPARK-32385 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.1.0 >Reporter: Vladimir Matveev >Priority: Major > > Spark has a lot of dependencies, many of them very common (e.g. Guava, > Jackson). Also, versions of these dependencies are not updated as frequently > as they are released upstream, which is totally understandable and natural, > but which also means that often Spark has a dependency on a lower version of > a library, which is incompatible with a higher, more recent version of the > same library. This incompatibility can manifest in different ways, e.g as > classpath errors or runtime check errors (like with Jackson), in certain > cases. > > Spark does attempt to "fix" versions of its dependencies by declaring them > explicitly in its {{pom.xml}} file. However, this approach, being somewhat > workable if the Spark-using project itself uses Maven, breaks down if another > build system is used, like Gradle. The reason is that Maven uses an > unconventional "nearest first" version conflict resolution strategy, while > many other tools like Gradle use the "highest first" strategy which resolves > the highest possible version number inside the entire graph of dependencies. > This means that other dependencies of the project can pull a higher version > of some dependency, which is incompatible with Spark. > > One example would be an explicit or a transitive dependency on a higher > version of Jackson in the project. Spark itself depends on several modules of > Jackson; if only one of them gets a higher version, and others remain on the > lower version, this will result in runtime exceptions due to an internal > version check in Jackson. > > A widely used solution for this kind of version issues is publishing of a > "bill of materials" descriptor (see here: > [https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html] > and here: > [https://docs.gradle.org/current/userguide/platforms.html#sub:bom_import]). > This descriptor would contain all versions of all dependencies of Spark; then > downstream projects will be able to use their build system's support for BOMs > to enforce version constraints required for Spark to function correctly. > > One example of successful implementation of the BOM-based approach is Spring: > [https://www.baeldung.com/spring-maven-bom#spring-bom]. For different Spring > projects, e.g. Spring Boot, there are BOM descriptors published which can be > used in downstream projects to fix the versions of Spring components and > their dependencies, significantly reducing confusion around proper version > numbers. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-32385) Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies
[ https://issues.apache.org/jira/browse/SPARK-32385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167527#comment-17167527 ] Vladimir Matveev commented on SPARK-32385: -- [~hyukjin.kwon] almost: those are just lists of the artifacts in distribution, while BOMs are proper Maven POM descriptors which contain information about dependencies in terms of Maven coordinates. This makes BOMs usable directly as input to build systems like Gradle. Still, the general idea is similar, I guess. > Publish a "bill of materials" (BOM) descriptor for Spark with correct > versions of various dependencies > -- > > Key: SPARK-32385 > URL: https://issues.apache.org/jira/browse/SPARK-32385 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.1.0 >Reporter: Vladimir Matveev >Priority: Major > > Spark has a lot of dependencies, many of them very common (e.g. Guava, > Jackson). Also, versions of these dependencies are not updated as frequently > as they are released upstream, which is totally understandable and natural, > but which also means that often Spark has a dependency on a lower version of > a library, which is incompatible with a higher, more recent version of the > same library. This incompatibility can manifest in different ways, e.g as > classpath errors or runtime check errors (like with Jackson), in certain > cases. > > Spark does attempt to "fix" versions of its dependencies by declaring them > explicitly in its {{pom.xml}} file. However, this approach, being somewhat > workable if the Spark-using project itself uses Maven, breaks down if another > build system is used, like Gradle. The reason is that Maven uses an > unconventional "nearest first" version conflict resolution strategy, while > many other tools like Gradle use the "highest first" strategy which resolves > the highest possible version number inside the entire graph of dependencies. > This means that other dependencies of the project can pull a higher version > of some dependency, which is incompatible with Spark. > > One example would be an explicit or a transitive dependency on a higher > version of Jackson in the project. Spark itself depends on several modules of > Jackson; if only one of them gets a higher version, and others remain on the > lower version, this will result in runtime exceptions due to an internal > version check in Jackson. > > A widely used solution for this kind of version issues is publishing of a > "bill of materials" descriptor (see here: > [https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html] > and here: > [https://docs.gradle.org/current/userguide/platforms.html#sub:bom_import]). > This descriptor would contain all versions of all dependencies of Spark; then > downstream projects will be able to use their build system's support for BOMs > to enforce version constraints required for Spark to function correctly. > > One example of successful implementation of the BOM-based approach is Spring: > [https://www.baeldung.com/spring-maven-bom#spring-bom]. For different Spring > projects, e.g. Spring Boot, there are BOM descriptors published which can be > used in downstream projects to fix the versions of Spring components and > their dependencies, significantly reducing confusion around proper version > numbers. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32385) Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies
Vladimir Matveev created SPARK-32385: Summary: Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies Key: SPARK-32385 URL: https://issues.apache.org/jira/browse/SPARK-32385 Project: Spark Issue Type: Improvement Components: Build Affects Versions: 3.0.0, 2.4.6 Reporter: Vladimir Matveev Spark has a lot of dependencies, many of them very common (e.g. Guava, Jackson). Also, versions of these dependencies are not updated as frequently as they are released upstream, which is totally understandable and natural, but which also means that often Spark has a dependency on a lower version of a library, which is incompatible with a higher, more recent version of the same library. This incompatibility can manifest in different ways, e.g as classpath errors or runtime check errors (like with Jackson), in certain cases. Spark does attempt to "fix" versions of its dependencies by declaring them explicitly in its {{pom.xml}} file. However, this approach, being somewhat workable if the Spark-using project itself uses Maven, breaks down if another build system is used, like Gradle. The reason is that Maven uses an unconventional "nearest first" version conflict resolution strategy, while many other tools like Gradle use the "highest first" strategy which resolves the highest possible version number inside the entire graph of dependencies. This means that other dependencies of the project can pull a higher version of some dependency, which is incompatible with Spark. One example would be an explicit or a transitive dependency on a higher version of Jackson in the project. Spark itself depends on several modules of Jackson; if only one of them gets a higher version, and others remain on the lower version, this will result in runtime exceptions due to an internal version check in Jackson. A widely used solution for this kind of version issues is publishing of a "bill of materials" descriptor (see here: [https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html] and here: [https://docs.gradle.org/current/userguide/platforms.html#sub:bom_import]). This descriptor would contain all versions of all dependencies of Spark; then downstream projects will be able to use their build system's support for BOMs to enforce version constraints required for Spark to function correctly. One example of successful implementation of the BOM-based approach is Spring: [https://www.baeldung.com/spring-maven-bom#spring-bom]. For different Spring projects, e.g. Spring Boot, there are BOM descriptors published which can be used in downstream projects to fix the versions of Spring components and their dependencies, significantly reducing confusion around proper version numbers. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28321) functions.udf(UDF0, DataType) produces unexpected results
[ https://issues.apache.org/jira/browse/SPARK-28321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Matveev updated SPARK-28321: - Description: It looks like that the `f.udf(UDF0, DataType)` variant of the UDF Column-creating methods is wrong ([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061|https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):]): {code:java} def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF0[Any]].call() SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = Seq.fill(0)(None)) } {code} Here the UDF passed as the first argument will be called *right inside the `udf` method* on the driver, rather than at the dataframe computation time on executors. One of the major issues here is that non-deterministic UDFs (e.g. generating a random value) will produce unexpected results: {code:java} val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic() val javaudf = f.udf(new UDF0[Int] { override def call(): Int = scala.util.Random.nextInt() }, IntegerType).asNondeterministic() (1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show() // prints +---+-+ | scala| java| +---+-+ | 934190385|478543809| |-1082102515|478543809| | 774466710|478543809| | 1883582103|478543809| |-1959743031|478543809| | 1534685218|478543809| | 1158899264|478543809| |-1572590653|478543809| | -309451364|478543809| | -906574467|478543809| | -436584308|478543809| | 1598340674|478543809| |-1331343156|478543809| |-1804177830|478543809| |-1682906106|478543809| | -197444289|478543809| | 260603049|478543809| |-1993515667|478543809| |-1304685845|478543809| | 481017016|478543809| +---+-{code} Note that the version which relies on a different overload of the `functions.udf` method works correctly. was: It looks like that the `f.udf(UDF0, DataType)` variant of the UDF Column-creating methods is wrong ([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):] {code:java} def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF0[Any]].call() SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = Seq.fill(0)(None)) } {code} Here the UDF passed as the first argument will be called *right inside the `udf` method* on the driver, rather than at the dataframe computation time on executors. One of the major issues here is that non-deterministic UDFs (e.g. generating a random value) will produce unexpected results: {code:java} val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic() val javaudf = f.udf(new UDF0[Int] { override def call(): Int = scala.util.Random.nextInt() }, IntegerType).asNondeterministic() (1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show() // prints +---+-+ | scala| java| +---+-+ | 934190385|478543809| |-1082102515|478543809| | 774466710|478543809| | 1883582103|478543809| |-1959743031|478543809| | 1534685218|478543809| | 1158899264|478543809| |-1572590653|478543809| | -309451364|478543809| | -906574467|478543809| | -436584308|478543809| | 1598340674|478543809| |-1331343156|478543809| |-1804177830|478543809| |-1682906106|478543809| | -197444289|478543809| | 260603049|478543809| |-1993515667|478543809| |-1304685845|478543809| | 481017016|478543809| +---+-{code} Note that the version which relies on a different overload of the `functions.udf` method works correctly. > functions.udf(UDF0, DataType) produces unexpected results > - > > Key: SPARK-28321 > URL: https://issues.apache.org/jira/browse/SPARK-28321 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.2, 2.4.3 >Reporter: Vladimir Matveev >Priority: Major > > It looks like that the `f.udf(UDF0, DataType)` variant of the UDF > Column-creating methods is wrong > ([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061|https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):]): > > {code:java} > def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = { > val func = f.asInstanceOf[UDF0[Any]].call() > SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = > Seq.fill(0)(Non
[jira] [Created] (SPARK-28321) functions.udf(UDF0, DataType) produces unexpected results
Vladimir Matveev created SPARK-28321: Summary: functions.udf(UDF0, DataType) produces unexpected results Key: SPARK-28321 URL: https://issues.apache.org/jira/browse/SPARK-28321 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.3, 2.3.2 Reporter: Vladimir Matveev It looks like that the `f.udf(UDF0, DataType)` variant of the UDF Column-creating methods is wrong ([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):] {code:java} def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF0[Any]].call() SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = Seq.fill(0)(None)) } {code} Here the UDF passed as the first argument will be called *right inside the `udf` method* on the driver, rather than at the dataframe computation time on executors. One of the major issues here is that non-deterministic UDFs (e.g. generating a random value) will produce unexpected results: {code:java} val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic() val javaudf = f.udf(new UDF0[Int] { override def call(): Int = scala.util.Random.nextInt() }, IntegerType).asNondeterministic() (1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show() // prints +---+-+ | scala| java| +---+-+ | 934190385|478543809| |-1082102515|478543809| | 774466710|478543809| | 1883582103|478543809| |-1959743031|478543809| | 1534685218|478543809| | 1158899264|478543809| |-1572590653|478543809| | -309451364|478543809| | -906574467|478543809| | -436584308|478543809| | 1598340674|478543809| |-1331343156|478543809| |-1804177830|478543809| |-1682906106|478543809| | -197444289|478543809| | 260603049|478543809| |-1993515667|478543809| |-1304685845|478543809| | 481017016|478543809| +---+-{code} Note that the version which relies on a different overload of the `functions.udf` method works correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27808) Ability to ignore existing files for structured streaming
[ https://issues.apache.org/jira/browse/SPARK-27808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848066#comment-16848066 ] Vladimir Matveev commented on SPARK-27808: -- Hello Gabor, The issue is about structured streaming. I've provided a link to the old dstreams documentation to illustrate an option which was available there but is not available now. A different sink implementation is still more like a workaround rather than a proper solution. I'll have to do extra steps, potentially changing the code, to do what I need, rather than changing a configuration option. Also, I'm not sure about the exact implementation, but I suspect it is possible that even though this new sink does not do anything, running the query with it would still mean that the entire input will be read. I would very much like to avoid it because in our case the input is very large and has infinite retention. > Ability to ignore existing files for structured streaming > - > > Key: SPARK-27808 > URL: https://issues.apache.org/jira/browse/SPARK-27808 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 2.3.3, 2.4.3 >Reporter: Vladimir Matveev >Priority: Major > > Currently it is not easily possible to make a structured streaming query to > ignore all of the existing data inside a directory and only process new > files, created after the job was started. See here for example: > [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset] > > My use case is to ignore everything which existed in the directory when the > streaming job is first started (and there are no checkpoints), but to behave > as usual when the stream is restarted, e.g. catch up reading new files since > the last restart. This would allow us to use the streaming job for continuous > processing, with all the benefits it brings, but also to keep the possibility > to reprocess the data in the batch fashion by a different job, drop the > checkpoints and make the streaming job only run for the new data. > > It would be great to have an option similar to the `newFilesOnly` option on > the original StreamingContext.fileStream method: > https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V])] > but probably with slightly different semantics, described above (ignore all > existing for the first run, catch up for the following runs)> -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27808) Ability to ignore existing files for structured streaming
[ https://issues.apache.org/jira/browse/SPARK-27808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Matveev updated SPARK-27808: - Description: Currently it is not easily possible to make a structured streaming query to ignore all of the existing data inside a directory and only process new files, created after the job was started. See here for example: [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset] My use case is to ignore everything which existed in the directory when the streaming job is first started (and there are no checkpoints), but to behave as usual when the stream is restarted, e.g. catch up reading new files since the last restart. This would allow us to use the streaming job for continuous processing, with all the benefits it brings, but also to keep the possibility to reprocess the data in the batch fashion by a different job, drop the checkpoints and make the streaming job only run for the new data. It would be great to have an option similar to the `newFilesOnly` option on the original StreamingContext.fileStream method: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V)]] but probably with slightly different semantics, described above (ignore all existing for the first run, catch up for the following runs)> was: Currently it is not easily possible to make a structured streaming query to ignore all of the existing data inside a directory and only process new files, created after the job was started. See here for example: [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset] My use case is to ignore everything which existed in the directory when the streaming job is first started (and there are no checkpoints), but to behave as usual when the stream is restarted, e.g. catch up reading new files since the last restart. This would allow us to use the streaming job for continuous processing, with all the benefits it brings, but also to keep the possibility to reprocess the data in the batch fashion by a different job, drop the checkpoints and make the streaming job only run for the new data. It would be great to have an option similar to the `newFilesOnly` option on the original [StreamingContext.fileStream|https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V)]] method, but probably with slightly different semantics, described above (ignore all existing for the first run, catch up for the following runs)> > Ability to ignore existing files for structured streaming > - > > Key: SPARK-27808 > URL: https://issues.apache.org/jira/browse/SPARK-27808 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 2.3.3, 2.4.3 >Reporter: Vladimir Matveev >Priority: Major > > Currently it is not easily possible to make a structured streaming query to > ignore all of the existing data inside a directory and only process new > files, created after the job was started. See here for example: > [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset] > > My use case is to ignore everything which existed in the directory when the > streaming job is first started (and there are no checkpoints), but to behave > as usual when the stream is restarted, e.g. catch up reading new files since > the last restart. This would allow us to use the streaming job for continuous > processing, with all the benefits it brings, but also to keep the possibility > to reprocess the data in the batch fashion by a different job, drop the > checkpoints and make the streaming job only run for the new data. > > It would be great to have an option similar to the `newFilesOnly` option on > the original StreamingContext.fileStream method: > https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:S
[jira] [Updated] (SPARK-27808) Ability to ignore existing files for structured streaming
[ https://issues.apache.org/jira/browse/SPARK-27808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Matveev updated SPARK-27808: - Description: Currently it is not easily possible to make a structured streaming query to ignore all of the existing data inside a directory and only process new files, created after the job was started. See here for example: [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset] My use case is to ignore everything which existed in the directory when the streaming job is first started (and there are no checkpoints), but to behave as usual when the stream is restarted, e.g. catch up reading new files since the last restart. This would allow us to use the streaming job for continuous processing, with all the benefits it brings, but also to keep the possibility to reprocess the data in the batch fashion by a different job, drop the checkpoints and make the streaming job only run for the new data. It would be great to have an option similar to the `newFilesOnly` option on the original StreamingContext.fileStream method: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V])] but probably with slightly different semantics, described above (ignore all existing for the first run, catch up for the following runs)> was: Currently it is not easily possible to make a structured streaming query to ignore all of the existing data inside a directory and only process new files, created after the job was started. See here for example: [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset] My use case is to ignore everything which existed in the directory when the streaming job is first started (and there are no checkpoints), but to behave as usual when the stream is restarted, e.g. catch up reading new files since the last restart. This would allow us to use the streaming job for continuous processing, with all the benefits it brings, but also to keep the possibility to reprocess the data in the batch fashion by a different job, drop the checkpoints and make the streaming job only run for the new data. It would be great to have an option similar to the `newFilesOnly` option on the original StreamingContext.fileStream method: [https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V])] but probably with slightly different semantics, described above (ignore all existing for the first run, catch up for the following runs)> > Ability to ignore existing files for structured streaming > - > > Key: SPARK-27808 > URL: https://issues.apache.org/jira/browse/SPARK-27808 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 2.3.3, 2.4.3 >Reporter: Vladimir Matveev >Priority: Major > > Currently it is not easily possible to make a structured streaming query to > ignore all of the existing data inside a directory and only process new > files, created after the job was started. See here for example: > [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset] > > My use case is to ignore everything which existed in the directory when the > streaming job is first started (and there are no checkpoints), but to behave > as usual when the stream is restarted, e.g. catch up reading new files since > the last restart. This would allow us to use the streaming job for continuous > processing, with all the benefits it brings, but also to keep the possibility > to reprocess the data in the batch fashion by a different job, drop the > checkpoints and make the streaming job only run for the new data. > > It would be great to have an option similar to the `newFilesOnly` option on > the original StreamingContext.fileStream method: > https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:S
[jira] [Updated] (SPARK-27808) Ability to ignore existing files for structured streaming
[ https://issues.apache.org/jira/browse/SPARK-27808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Matveev updated SPARK-27808: - Description: Currently it is not easily possible to make a structured streaming query to ignore all of the existing data inside a directory and only process new files, created after the job was started. See here for example: [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset] My use case is to ignore everything which existed in the directory when the streaming job is first started (and there are no checkpoints), but to behave as usual when the stream is restarted, e.g. catch up reading new files since the last restart. This would allow us to use the streaming job for continuous processing, with all the benefits it brings, but also to keep the possibility to reprocess the data in the batch fashion by a different job, drop the checkpoints and make the streaming job only run for the new data. It would be great to have an option similar to the `newFilesOnly` option on the original StreamingContext.fileStream method: [https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V])] but probably with slightly different semantics, described above (ignore all existing for the first run, catch up for the following runs)> was: Currently it is not easily possible to make a structured streaming query to ignore all of the existing data inside a directory and only process new files, created after the job was started. See here for example: [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset] My use case is to ignore everything which existed in the directory when the streaming job is first started (and there are no checkpoints), but to behave as usual when the stream is restarted, e.g. catch up reading new files since the last restart. This would allow us to use the streaming job for continuous processing, with all the benefits it brings, but also to keep the possibility to reprocess the data in the batch fashion by a different job, drop the checkpoints and make the streaming job only run for the new data. It would be great to have an option similar to the `newFilesOnly` option on the original StreamingContext.fileStream method: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V)]] but probably with slightly different semantics, described above (ignore all existing for the first run, catch up for the following runs)> > Ability to ignore existing files for structured streaming > - > > Key: SPARK-27808 > URL: https://issues.apache.org/jira/browse/SPARK-27808 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 2.3.3, 2.4.3 >Reporter: Vladimir Matveev >Priority: Major > > Currently it is not easily possible to make a structured streaming query to > ignore all of the existing data inside a directory and only process new > files, created after the job was started. See here for example: > [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset] > > My use case is to ignore everything which existed in the directory when the > streaming job is first started (and there are no checkpoints), but to behave > as usual when the stream is restarted, e.g. catch up reading new files since > the last restart. This would allow us to use the streaming job for continuous > processing, with all the benefits it brings, but also to keep the possibility > to reprocess the data in the batch fashion by a different job, drop the > checkpoints and make the streaming job only run for the new data. > > It would be great to have an option similar to the `newFilesOnly` option on > the original StreamingContext.fileStream method: > [https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:
[jira] [Created] (SPARK-27808) Ability to ignore existing files for structured streaming
Vladimir Matveev created SPARK-27808: Summary: Ability to ignore existing files for structured streaming Key: SPARK-27808 URL: https://issues.apache.org/jira/browse/SPARK-27808 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 2.4.3, 2.3.3 Reporter: Vladimir Matveev Currently it is not easily possible to make a structured streaming query to ignore all of the existing data inside a directory and only process new files, created after the job was started. See here for example: [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset] My use case is to ignore everything which existed in the directory when the streaming job is first started (and there are no checkpoints), but to behave as usual when the stream is restarted, e.g. catch up reading new files since the last restart. This would allow us to use the streaming job for continuous processing, with all the benefits it brings, but also to keep the possibility to reprocess the data in the batch fashion by a different job, drop the checkpoints and make the streaming job only run for the new data. It would be great to have an option similar to the `newFilesOnly` option on the original [StreamingContext.fileStream|https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V)]] method, but probably with slightly different semantics, described above (ignore all existing for the first run, catch up for the following runs)> -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26723) Spark web UI only shows parts of SQL query graphs for queries with persist operations
[ https://issues.apache.org/jira/browse/SPARK-26723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Matveev updated SPARK-26723: - Attachment: Screen Shot 2019-01-24 at 4.13.14 PM.png Screen Shot 2019-01-24 at 4.13.02 PM.png > Spark web UI only shows parts of SQL query graphs for queries with persist > operations > - > > Key: SPARK-26723 > URL: https://issues.apache.org/jira/browse/SPARK-26723 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.3.2 >Reporter: Vladimir Matveev >Priority: Major > Attachments: Screen Shot 2019-01-24 at 4.13.02 PM.png, Screen Shot > 2019-01-24 at 4.13.14 PM.png > > > Currently it looks like the SQL view in Spark UI will truncate the graph on > the nodes corresponding to persist operations on the dataframe, only showing > everything after "LocalTableScan". This is *very* inconvenient, because in a > common case when you have a heavy computation and want to persist it before > writing to multiple outputs with some minor preprocessing, you lose almost > the entire graph with potentially very useful information in it. > The query plans below the graph, however, show the full query, including all > computations before persists. Unfortunately, for complex queries looking into > the plan is unfeasible, and graph visualization becomes a very helpful tool; > with persist, it is apparently broken. > You can verify it in Spark Shell with a very simple example: > {code} > import org.apache.spark.sql.{functions => f} > import org.apache.spark.sql.expressions.Window > val query = Vector(1, 2, 3).toDF() > .select(($"value".cast("long") * f.rand).as("value")) > .withColumn("valueAvg", f.avg($"value") over Window.orderBy("value")) > query.show() > query.persist().show() > {code} > Here the same query is executed first without persist, and then with it. If > you now navigate to the Spark web UI SQL page, you'll see two queries, but > their graphs will be radically different: the one without persist will > contain the whole transformation with exchange, sort and window steps, while > the one with persist will only contain only a LocalTableScan step with some > intermediate transformations needed for `show`. > After some looking into Spark code, I think that the reason for this is that > the `org.apache.spark.sql.execution.SparkPlanInfo#fromSparkPlan` method > (which is used to serialize a plan before emitting the > SparkListenerSQLExecutionStart event) constructs the `SparkPlanInfo` object > from a `SparkPlan` object incorrectly, because if you invoke the `toString` > method on `SparkPlan` you'll see the entire plan, but the `SparkPlanInfo` > object will only contain nodes corresponding to actions after `persist`. > However, my knowledge of Spark internals is not deep enough to understand how > to fix this, and how SparkPlanInfo.fromSparkPlan is different from what > SparkPlan.toString does. > This can be observed on Spark 2.3.2, but given that 2.4.0 code of > SparkPlanInfo does not seem to change much since 2.3.2, I'd expect that it > could be reproduced there too. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26723) Spark web UI only shows parts of SQL query graphs for queries with persist operations
Vladimir Matveev created SPARK-26723: Summary: Spark web UI only shows parts of SQL query graphs for queries with persist operations Key: SPARK-26723 URL: https://issues.apache.org/jira/browse/SPARK-26723 Project: Spark Issue Type: Bug Components: Web UI Affects Versions: 2.3.2 Reporter: Vladimir Matveev Currently it looks like the SQL view in Spark UI will truncate the graph on the nodes corresponding to persist operations on the dataframe, only showing everything after "LocalTableScan". This is *very* inconvenient, because in a common case when you have a heavy computation and want to persist it before writing to multiple outputs with some minor preprocessing, you lose almost the entire graph with potentially very useful information in it. The query plans below the graph, however, show the full query, including all computations before persists. Unfortunately, for complex queries looking into the plan is unfeasible, and graph visualization becomes a very helpful tool; with persist, it is apparently broken. You can verify it in Spark Shell with a very simple example: {code} import org.apache.spark.sql.{functions => f} import org.apache.spark.sql.expressions.Window val query = Vector(1, 2, 3).toDF() .select(($"value".cast("long") * f.rand).as("value")) .withColumn("valueAvg", f.avg($"value") over Window.orderBy("value")) query.show() query.persist().show() {code} Here the same query is executed first without persist, and then with it. If you now navigate to the Spark web UI SQL page, you'll see two queries, but their graphs will be radically different: the one without persist will contain the whole transformation with exchange, sort and window steps, while the one with persist will only contain only a LocalTableScan step with some intermediate transformations needed for `show`. After some looking into Spark code, I think that the reason for this is that the `org.apache.spark.sql.execution.SparkPlanInfo#fromSparkPlan` method (which is used to serialize a plan before emitting the SparkListenerSQLExecutionStart event) constructs the `SparkPlanInfo` object from a `SparkPlan` object incorrectly, because if you invoke the `toString` method on `SparkPlan` you'll see the entire plan, but the `SparkPlanInfo` object will only contain nodes corresponding to actions after `persist`. However, my knowledge of Spark internals is not deep enough to understand how to fix this, and how SparkPlanInfo.fromSparkPlan is different from what SparkPlan.toString does. This can be observed on Spark 2.3.2, but given that 2.4.0 code of SparkPlanInfo does not seem to change much since 2.3.2, I'd expect that it could be reproduced there too. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org