[jira] [Commented] (SPARK-32385) Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies

2020-08-28 Thread Vladimir Matveev (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186855#comment-17186855
 ] 

Vladimir Matveev commented on SPARK-32385:
--

> I am still not quite sure what this gains if it doesn't change dependency 
> resolution.

It does not change dependency resolution by itself, it adds an ability for the 
user, if they want to, to automatically lock onto versions explicitly declared 
by the Spark project. So yeah, this:

> Is it just that you declare one artifact POM to depend on that declares a 
> bunch of dependent versions, so people don't go depending on different 
> versions?

pretty much summarizes it; this could be expanded to say that (depending on the 
build tool) it may also enforce these dependent versions in case of conflicts.

> I mean people can already do that by setting some spark.version property in 
> their build.

They can't in general, because while it will enforce the Spark's own version, 
it won't necessarily determine the versions of transitive dependencies. The 
latter will only happen when the consumer also uses Maven, and when they have a 
particular order of dependencies in their POM declaration (e.g. no newer 
Jackson version declared transitively lexically earlier than Spark).

> What is important is: if we change the build and it changes Spark's 
> transitive dependencies for downstream users, that could be a breaking change.

My understanding is that this should not happen, unless the user explicitly 
opts into using the BOM, in which case it arguably changes the situation for 
the better in most cases, because now versions are guaranteed to align with 
Spark's declarations.

> Publish a "bill of materials" (BOM) descriptor for Spark with correct 
> versions of various dependencies
> --
>
> Key: SPARK-32385
> URL: https://issues.apache.org/jira/browse/SPARK-32385
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 3.1.0
>Reporter: Vladimir Matveev
>Priority: Major
>
> Spark has a lot of dependencies, many of them very common (e.g. Guava, 
> Jackson). Also, versions of these dependencies are not updated as frequently 
> as they are released upstream, which is totally understandable and natural, 
> but which also means that often Spark has a dependency on a lower version of 
> a library, which is incompatible with a higher, more recent version of the 
> same library. This incompatibility can manifest in different ways, e.g as 
> classpath errors or runtime check errors (like with Jackson), in certain 
> cases.
>  
> Spark does attempt to "fix" versions of its dependencies by declaring them 
> explicitly in its {{pom.xml}} file. However, this approach, being somewhat 
> workable if the Spark-using project itself uses Maven, breaks down if another 
> build system is used, like Gradle. The reason is that Maven uses an 
> unconventional "nearest first" version conflict resolution strategy, while 
> many other tools like Gradle use the "highest first" strategy which resolves 
> the highest possible version number inside the entire graph of dependencies. 
> This means that other dependencies of the project can pull a higher version 
> of some dependency, which is incompatible with Spark.
>  
> One example would be an explicit or a transitive dependency on a higher 
> version of Jackson in the project. Spark itself depends on several modules of 
> Jackson; if only one of them gets a higher version, and others remain on the 
> lower version, this will result in runtime exceptions due to an internal 
> version check in Jackson.
>  
> A widely used solution for this kind of version issues is publishing of a 
> "bill of materials" descriptor (see here: 
> [https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html]
>  and here: 
> [https://docs.gradle.org/current/userguide/platforms.html#sub:bom_import]). 
> This descriptor would contain all versions of all dependencies of Spark; then 
> downstream projects will be able to use their build system's support for BOMs 
> to enforce version constraints required for Spark to function correctly.
>  
> One example of successful implementation of the BOM-based approach is Spring: 
> [https://www.baeldung.com/spring-maven-bom#spring-bom]. For different Spring 
> projects, e.g. Spring Boot, there are BOM descriptors published which can be 
> used in downstream projects to fix the versions of Spring components and 
> their dependencies, significantly reducing confusion around proper version 
> numbers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark

[jira] [Comment Edited] (SPARK-32385) Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies

2020-08-28 Thread Vladimir Matveev (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186846#comment-17186846
 ] 

Vladimir Matveev edited comment on SPARK-32385 at 8/28/20, 11:37 PM:
-

[~srowen] Sorry for the delayed response!

> This requires us fixing every version of every transitive dependency. How 
> does that get updated as the transitive dependency graph changes? this 
> exchanges one problem for another I think. That is, we are definitely not 
> trying to fix dependency versions except where necessary.

I don't think this is right — you don't have to fix more than just direct 
dependencies, like you already do. It's pretty much the same thing as defining 
the version numbers like 
[here|https://github.com/apache/spark/blob/a0bd273bb04d9a5684e291ec44617972dcd4accd/pom.xml#L121-L197]
 and then declaring specific dependencies with the versions below. It's just it 
is done slightly differently, by using Maven's {{}} 
mechanism and POM inheritance (for Maven; for Gradle e.g. it would be this 
"platform" thing).

> Gradle isn't something that this project supports, but, wouldn't this be a 
> much bigger general problem if its resolution rules are different from Maven? 
> that is, surely gradle can emulate Maven if necessary.

I don't think Gradle can emulate Maven, and I personally don't think it should, 
because Maven's strategy for conflict resolution is quite unconventional, and 
is not used by most of the dependency management tools, not just in the Java 
world. Also, I naturally don't have statistics, so this is just my speculation, 
but it seems likely to me that most of the downstream projects which use Spark 
don't actually use Maven for dependency management, especially given its Scala 
heritage. Therefore, they can't take advantage of Maven's dependency resolution 
algorithm and the current Spark's POM configuration.

Also I'd like to point out again that this whole BOM mechanism is something 
which _Maven_ supports natively, it's not a Gradle extension or something. The 
BOM concept originated in Maven, and it is declared using Maven's 
{{}} block, which is a part of POM syntax. Hopefully this 
would reduce some of the concerns about it.


was (Author: netvl):
Sorry for the delayed response!

> This requires us fixing every version of every transitive dependency. How 
> does that get updated as the transitive dependency graph changes? this 
> exchanges one problem for another I think. That is, we are definitely not 
> trying to fix dependency versions except where necessary.

I don't think this is right — you don't have to fix more than just direct 
dependencies, like you already do. It's pretty much the same thing as defining 
the version numbers like 
[here|https://github.com/apache/spark/blob/a0bd273bb04d9a5684e291ec44617972dcd4accd/pom.xml#L121-L197]
 and then declaring specific dependencies with the versions below. It's just it 
is done slightly differently, by using Maven's `` 
mechanism and POM inheritance (for Maven; for Gradle e.g. it would be this 
"platform" thing).

> Gradle isn't something that this project supports, but, wouldn't this be a 
> much bigger general problem if its resolution rules are different from Maven? 
> that is, surely gradle can emulate Maven if necessary.

I don't think Gradle can emulate Maven, and I personally don't think it should, 
because Maven's strategy for conflict resolution is quite unconventional, and 
is not used by most of the dependency management tools, not just in the Java 
world. Also, I naturally don't have statistics, so this is just my speculation, 
but it seems likely to me that most of the downstream projects which use Spark 
don't actually use Maven for dependency management, especially given its Scala 
heritage. Therefore, they can't take advantage of Maven's dependency resolution 
algorithm and the current Spark's POM configuration.

Also I'd like to point out again that this whole BOM mechanism is something 
which _Maven_ supports natively, it's not a Gradle extension or something. The 
BOM concept originated in Maven, and it is declared using Maven's 
{{}} block, which is a part of POM syntax. Hopefully this 
would reduce some of the concerns about it.

> Publish a "bill of materials" (BOM) descriptor for Spark with correct 
> versions of various dependencies
> --
>
> Key: SPARK-32385
> URL: https://issues.apache.org/jira/browse/SPARK-32385
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 3.1.0
>Reporter: Vladimir Matveev
>Priority: Major
>
> Spark has a lot of dependencies, many of them very common (e.g. Guava, 
> Jackson). Also, versions of these dependencies are not updated as freq

[jira] [Commented] (SPARK-32385) Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies

2020-08-28 Thread Vladimir Matveev (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17186846#comment-17186846
 ] 

Vladimir Matveev commented on SPARK-32385:
--

Sorry for the delayed response!

> This requires us fixing every version of every transitive dependency. How 
> does that get updated as the transitive dependency graph changes? this 
> exchanges one problem for another I think. That is, we are definitely not 
> trying to fix dependency versions except where necessary.

I don't think this is right — you don't have to fix more than just direct 
dependencies, like you already do. It's pretty much the same thing as defining 
the version numbers like 
[here|https://github.com/apache/spark/blob/a0bd273bb04d9a5684e291ec44617972dcd4accd/pom.xml#L121-L197]
 and then declaring specific dependencies with the versions below. It's just it 
is done slightly differently, by using Maven's `` 
mechanism and POM inheritance (for Maven; for Gradle e.g. it would be this 
"platform" thing).

> Gradle isn't something that this project supports, but, wouldn't this be a 
> much bigger general problem if its resolution rules are different from Maven? 
> that is, surely gradle can emulate Maven if necessary.

I don't think Gradle can emulate Maven, and I personally don't think it should, 
because Maven's strategy for conflict resolution is quite unconventional, and 
is not used by most of the dependency management tools, not just in the Java 
world. Also, I naturally don't have statistics, so this is just my speculation, 
but it seems likely to me that most of the downstream projects which use Spark 
don't actually use Maven for dependency management, especially given its Scala 
heritage. Therefore, they can't take advantage of Maven's dependency resolution 
algorithm and the current Spark's POM configuration.

Also I'd like to point out again that this whole BOM mechanism is something 
which _Maven_ supports natively, it's not a Gradle extension or something. The 
BOM concept originated in Maven, and it is declared using Maven's 
{{}} block, which is a part of POM syntax. Hopefully this 
would reduce some of the concerns about it.

> Publish a "bill of materials" (BOM) descriptor for Spark with correct 
> versions of various dependencies
> --
>
> Key: SPARK-32385
> URL: https://issues.apache.org/jira/browse/SPARK-32385
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 3.1.0
>Reporter: Vladimir Matveev
>Priority: Major
>
> Spark has a lot of dependencies, many of them very common (e.g. Guava, 
> Jackson). Also, versions of these dependencies are not updated as frequently 
> as they are released upstream, which is totally understandable and natural, 
> but which also means that often Spark has a dependency on a lower version of 
> a library, which is incompatible with a higher, more recent version of the 
> same library. This incompatibility can manifest in different ways, e.g as 
> classpath errors or runtime check errors (like with Jackson), in certain 
> cases.
>  
> Spark does attempt to "fix" versions of its dependencies by declaring them 
> explicitly in its {{pom.xml}} file. However, this approach, being somewhat 
> workable if the Spark-using project itself uses Maven, breaks down if another 
> build system is used, like Gradle. The reason is that Maven uses an 
> unconventional "nearest first" version conflict resolution strategy, while 
> many other tools like Gradle use the "highest first" strategy which resolves 
> the highest possible version number inside the entire graph of dependencies. 
> This means that other dependencies of the project can pull a higher version 
> of some dependency, which is incompatible with Spark.
>  
> One example would be an explicit or a transitive dependency on a higher 
> version of Jackson in the project. Spark itself depends on several modules of 
> Jackson; if only one of them gets a higher version, and others remain on the 
> lower version, this will result in runtime exceptions due to an internal 
> version check in Jackson.
>  
> A widely used solution for this kind of version issues is publishing of a 
> "bill of materials" descriptor (see here: 
> [https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html]
>  and here: 
> [https://docs.gradle.org/current/userguide/platforms.html#sub:bom_import]). 
> This descriptor would contain all versions of all dependencies of Spark; then 
> downstream projects will be able to use their build system's support for BOMs 
> to enforce version constraints required for Spark to function correctly.
>  
> One example of successful implementation of the BOM-based approach is Spring: 
> [https://w

[jira] [Commented] (SPARK-32385) Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies

2020-08-16 Thread Vladimir Matveev (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17178581#comment-17178581
 ] 

Vladimir Matveev commented on SPARK-32385:
--

[~srowen] a BOM descriptor can be used as a "platform" in Gradle and most 
likely in Maven (I don't know for sure, but the concept of BOM originates from 
Maven, so supposedly the tool itself supports it) to enforce compatible version 
numbers in the dependency graph. Just regular POMs cannot do this, because a 
regular POM forms just a single node in a dependency graph, and most of the 
dependency resolution tools take the entire graph of dependencies into account, 
which may result in accidentally bumped versions somewhere, require manual and 
ad-hoc resolution in most cases. With BOM, it is sufficient to tell the 
dependency engine that it should use this BOM to enforce versions, and that's 
it - the versions will now be fixed to the versions declared by the framework 
(Spark in this case). As I said, Spring framework uses this concept to a great 
success to ensure that applications using Spring always have compatible and 
tested versions.

 

Naturally, `deps/` files is just a list of jar files, and cannot be used for 
dependency resolution.

 

Also note that such a BOM descriptor would allow to centralize the version 
declarations within the Spark project itself, so it won't be something "on top" 
to support, at least as far as I understand it.

 

> Publish a "bill of materials" (BOM) descriptor for Spark with correct 
> versions of various dependencies
> --
>
> Key: SPARK-32385
> URL: https://issues.apache.org/jira/browse/SPARK-32385
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 3.1.0
>Reporter: Vladimir Matveev
>Priority: Major
>
> Spark has a lot of dependencies, many of them very common (e.g. Guava, 
> Jackson). Also, versions of these dependencies are not updated as frequently 
> as they are released upstream, which is totally understandable and natural, 
> but which also means that often Spark has a dependency on a lower version of 
> a library, which is incompatible with a higher, more recent version of the 
> same library. This incompatibility can manifest in different ways, e.g as 
> classpath errors or runtime check errors (like with Jackson), in certain 
> cases.
>  
> Spark does attempt to "fix" versions of its dependencies by declaring them 
> explicitly in its {{pom.xml}} file. However, this approach, being somewhat 
> workable if the Spark-using project itself uses Maven, breaks down if another 
> build system is used, like Gradle. The reason is that Maven uses an 
> unconventional "nearest first" version conflict resolution strategy, while 
> many other tools like Gradle use the "highest first" strategy which resolves 
> the highest possible version number inside the entire graph of dependencies. 
> This means that other dependencies of the project can pull a higher version 
> of some dependency, which is incompatible with Spark.
>  
> One example would be an explicit or a transitive dependency on a higher 
> version of Jackson in the project. Spark itself depends on several modules of 
> Jackson; if only one of them gets a higher version, and others remain on the 
> lower version, this will result in runtime exceptions due to an internal 
> version check in Jackson.
>  
> A widely used solution for this kind of version issues is publishing of a 
> "bill of materials" descriptor (see here: 
> [https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html]
>  and here: 
> [https://docs.gradle.org/current/userguide/platforms.html#sub:bom_import]). 
> This descriptor would contain all versions of all dependencies of Spark; then 
> downstream projects will be able to use their build system's support for BOMs 
> to enforce version constraints required for Spark to function correctly.
>  
> One example of successful implementation of the BOM-based approach is Spring: 
> [https://www.baeldung.com/spring-maven-bom#spring-bom]. For different Spring 
> projects, e.g. Spring Boot, there are BOM descriptors published which can be 
> used in downstream projects to fix the versions of Spring components and 
> their dependencies, significantly reducing confusion around proper version 
> numbers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32385) Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies

2020-07-29 Thread Vladimir Matveev (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167527#comment-17167527
 ] 

Vladimir Matveev commented on SPARK-32385:
--

[~hyukjin.kwon] almost: those are just lists of the artifacts in distribution, 
while BOMs are proper Maven POM descriptors which contain information about 
dependencies in terms of Maven coordinates. This makes BOMs usable directly as 
input to build systems like Gradle. Still, the general idea is similar, I guess.

> Publish a "bill of materials" (BOM) descriptor for Spark with correct 
> versions of various dependencies
> --
>
> Key: SPARK-32385
> URL: https://issues.apache.org/jira/browse/SPARK-32385
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 3.1.0
>Reporter: Vladimir Matveev
>Priority: Major
>
> Spark has a lot of dependencies, many of them very common (e.g. Guava, 
> Jackson). Also, versions of these dependencies are not updated as frequently 
> as they are released upstream, which is totally understandable and natural, 
> but which also means that often Spark has a dependency on a lower version of 
> a library, which is incompatible with a higher, more recent version of the 
> same library. This incompatibility can manifest in different ways, e.g as 
> classpath errors or runtime check errors (like with Jackson), in certain 
> cases.
>  
> Spark does attempt to "fix" versions of its dependencies by declaring them 
> explicitly in its {{pom.xml}} file. However, this approach, being somewhat 
> workable if the Spark-using project itself uses Maven, breaks down if another 
> build system is used, like Gradle. The reason is that Maven uses an 
> unconventional "nearest first" version conflict resolution strategy, while 
> many other tools like Gradle use the "highest first" strategy which resolves 
> the highest possible version number inside the entire graph of dependencies. 
> This means that other dependencies of the project can pull a higher version 
> of some dependency, which is incompatible with Spark.
>  
> One example would be an explicit or a transitive dependency on a higher 
> version of Jackson in the project. Spark itself depends on several modules of 
> Jackson; if only one of them gets a higher version, and others remain on the 
> lower version, this will result in runtime exceptions due to an internal 
> version check in Jackson.
>  
> A widely used solution for this kind of version issues is publishing of a 
> "bill of materials" descriptor (see here: 
> [https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html]
>  and here: 
> [https://docs.gradle.org/current/userguide/platforms.html#sub:bom_import]). 
> This descriptor would contain all versions of all dependencies of Spark; then 
> downstream projects will be able to use their build system's support for BOMs 
> to enforce version constraints required for Spark to function correctly.
>  
> One example of successful implementation of the BOM-based approach is Spring: 
> [https://www.baeldung.com/spring-maven-bom#spring-bom]. For different Spring 
> projects, e.g. Spring Boot, there are BOM descriptors published which can be 
> used in downstream projects to fix the versions of Spring components and 
> their dependencies, significantly reducing confusion around proper version 
> numbers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32385) Publish a "bill of materials" (BOM) descriptor for Spark with correct versions of various dependencies

2020-07-22 Thread Vladimir Matveev (Jira)
Vladimir Matveev created SPARK-32385:


 Summary: Publish a "bill of materials" (BOM) descriptor for Spark 
with correct versions of various dependencies
 Key: SPARK-32385
 URL: https://issues.apache.org/jira/browse/SPARK-32385
 Project: Spark
  Issue Type: Improvement
  Components: Build
Affects Versions: 3.0.0, 2.4.6
Reporter: Vladimir Matveev


Spark has a lot of dependencies, many of them very common (e.g. Guava, 
Jackson). Also, versions of these dependencies are not updated as frequently as 
they are released upstream, which is totally understandable and natural, but 
which also means that often Spark has a dependency on a lower version of a 
library, which is incompatible with a higher, more recent version of the same 
library. This incompatibility can manifest in different ways, e.g as classpath 
errors or runtime check errors (like with Jackson), in certain cases.

 

Spark does attempt to "fix" versions of its dependencies by declaring them 
explicitly in its {{pom.xml}} file. However, this approach, being somewhat 
workable if the Spark-using project itself uses Maven, breaks down if another 
build system is used, like Gradle. The reason is that Maven uses an 
unconventional "nearest first" version conflict resolution strategy, while many 
other tools like Gradle use the "highest first" strategy which resolves the 
highest possible version number inside the entire graph of dependencies. This 
means that other dependencies of the project can pull a higher version of some 
dependency, which is incompatible with Spark.

 

One example would be an explicit or a transitive dependency on a higher version 
of Jackson in the project. Spark itself depends on several modules of Jackson; 
if only one of them gets a higher version, and others remain on the lower 
version, this will result in runtime exceptions due to an internal version 
check in Jackson.

 

A widely used solution for this kind of version issues is publishing of a "bill 
of materials" descriptor (see here: 
[https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html]
 and here: 
[https://docs.gradle.org/current/userguide/platforms.html#sub:bom_import]). 
This descriptor would contain all versions of all dependencies of Spark; then 
downstream projects will be able to use their build system's support for BOMs 
to enforce version constraints required for Spark to function correctly.

 

One example of successful implementation of the BOM-based approach is Spring: 
[https://www.baeldung.com/spring-maven-bom#spring-bom]. For different Spring 
projects, e.g. Spring Boot, there are BOM descriptors published which can be 
used in downstream projects to fix the versions of Spring components and their 
dependencies, significantly reducing confusion around proper version numbers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28321) functions.udf(UDF0, DataType) produces unexpected results

2019-07-09 Thread Vladimir Matveev (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Matveev updated SPARK-28321:
-
Description: 
It looks like that the `f.udf(UDF0, DataType)` variant of the UDF 
Column-creating methods is wrong 
([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061|https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):]):

 
{code:java}
def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
  val func = f.asInstanceOf[UDF0[Any]].call()
  SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = 
Seq.fill(0)(None))
}
{code}
Here the UDF passed as the first argument will be called *right inside the 
`udf` method* on the driver, rather than at the dataframe computation time on 
executors. One of the major issues here is that non-deterministic UDFs (e.g. 
generating a random value) will produce unexpected results:

 

 
{code:java}
val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic()
val javaudf = f.udf(new UDF0[Int] { override def call(): Int = 
scala.util.Random.nextInt() }, IntegerType).asNondeterministic()

(1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show()

// prints

+---+-+
|  scala| java|
+---+-+
|  934190385|478543809|
|-1082102515|478543809|
|  774466710|478543809|
| 1883582103|478543809|
|-1959743031|478543809|
| 1534685218|478543809|
| 1158899264|478543809|
|-1572590653|478543809|
| -309451364|478543809|
| -906574467|478543809|
| -436584308|478543809|
| 1598340674|478543809|
|-1331343156|478543809|
|-1804177830|478543809|
|-1682906106|478543809|
| -197444289|478543809|
|  260603049|478543809|
|-1993515667|478543809|
|-1304685845|478543809|
|  481017016|478543809|
+---+-{code}
Note that the version which relies on a different overload of the 
`functions.udf` method works correctly.

 

  was:
It looks like that the `f.udf(UDF0, DataType)` variant of the UDF 
Column-creating methods is wrong 
([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):]

 
{code:java}
def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
  val func = f.asInstanceOf[UDF0[Any]].call()
  SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = 
Seq.fill(0)(None))
}
{code}
Here the UDF passed as the first argument will be called *right inside the 
`udf` method* on the driver, rather than at the dataframe computation time on 
executors. One of the major issues here is that non-deterministic UDFs (e.g. 
generating a random value) will produce unexpected results:

 

 
{code:java}
val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic()
val javaudf = f.udf(new UDF0[Int] { override def call(): Int = 
scala.util.Random.nextInt() }, IntegerType).asNondeterministic()

(1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show()

// prints

+---+-+
|  scala| java|
+---+-+
|  934190385|478543809|
|-1082102515|478543809|
|  774466710|478543809|
| 1883582103|478543809|
|-1959743031|478543809|
| 1534685218|478543809|
| 1158899264|478543809|
|-1572590653|478543809|
| -309451364|478543809|
| -906574467|478543809|
| -436584308|478543809|
| 1598340674|478543809|
|-1331343156|478543809|
|-1804177830|478543809|
|-1682906106|478543809|
| -197444289|478543809|
|  260603049|478543809|
|-1993515667|478543809|
|-1304685845|478543809|
|  481017016|478543809|
+---+-{code}
Note that the version which relies on a different overload of the 
`functions.udf` method works correctly.

 


> functions.udf(UDF0, DataType) produces unexpected results
> -
>
> Key: SPARK-28321
> URL: https://issues.apache.org/jira/browse/SPARK-28321
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.2, 2.4.3
>Reporter: Vladimir Matveev
>Priority: Major
>
> It looks like that the `f.udf(UDF0, DataType)` variant of the UDF 
> Column-creating methods is wrong 
> ([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061|https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):]):
>  
> {code:java}
> def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
>   val func = f.asInstanceOf[UDF0[Any]].call()
>   SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = 
> Seq.fill(0)(Non

[jira] [Created] (SPARK-28321) functions.udf(UDF0, DataType) produces unexpected results

2019-07-09 Thread Vladimir Matveev (JIRA)
Vladimir Matveev created SPARK-28321:


 Summary: functions.udf(UDF0, DataType) produces unexpected results
 Key: SPARK-28321
 URL: https://issues.apache.org/jira/browse/SPARK-28321
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.3, 2.3.2
Reporter: Vladimir Matveev


It looks like that the `f.udf(UDF0, DataType)` variant of the UDF 
Column-creating methods is wrong 
([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):]

 
{code:java}
def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
  val func = f.asInstanceOf[UDF0[Any]].call()
  SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = 
Seq.fill(0)(None))
}
{code}
Here the UDF passed as the first argument will be called *right inside the 
`udf` method* on the driver, rather than at the dataframe computation time on 
executors. One of the major issues here is that non-deterministic UDFs (e.g. 
generating a random value) will produce unexpected results:

 

 
{code:java}
val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic()
val javaudf = f.udf(new UDF0[Int] { override def call(): Int = 
scala.util.Random.nextInt() }, IntegerType).asNondeterministic()

(1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show()

// prints

+---+-+
|  scala| java|
+---+-+
|  934190385|478543809|
|-1082102515|478543809|
|  774466710|478543809|
| 1883582103|478543809|
|-1959743031|478543809|
| 1534685218|478543809|
| 1158899264|478543809|
|-1572590653|478543809|
| -309451364|478543809|
| -906574467|478543809|
| -436584308|478543809|
| 1598340674|478543809|
|-1331343156|478543809|
|-1804177830|478543809|
|-1682906106|478543809|
| -197444289|478543809|
|  260603049|478543809|
|-1993515667|478543809|
|-1304685845|478543809|
|  481017016|478543809|
+---+-{code}
Note that the version which relies on a different overload of the 
`functions.udf` method works correctly.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27808) Ability to ignore existing files for structured streaming

2019-05-24 Thread Vladimir Matveev (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848066#comment-16848066
 ] 

Vladimir Matveev commented on SPARK-27808:
--

Hello Gabor,

 

The issue is about structured streaming. I've provided a link to the old 
dstreams documentation to illustrate an option which was available there but is 
not available now.

 

A different sink implementation is still more like a workaround rather than a 
proper solution. I'll have to do extra steps, potentially changing the code, to 
do what I need, rather than changing a configuration option. Also, I'm not sure 
about the exact implementation, but I suspect it is possible that even though 
this new sink does not do anything, running the query with it would still mean 
that the entire input will be read. I would very much like to avoid it because 
in our case the input is very large and has infinite retention.

> Ability to ignore existing files for structured streaming
> -
>
> Key: SPARK-27808
> URL: https://issues.apache.org/jira/browse/SPARK-27808
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.3, 2.4.3
>Reporter: Vladimir Matveev
>Priority: Major
>
> Currently it is not easily possible to make a structured streaming query to 
> ignore all of the existing data inside a directory and only process new 
> files, created after the job was started. See here for example: 
> [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]
>  
> My use case is to ignore everything which existed in the directory when the 
> streaming job is first started (and there are no checkpoints), but to behave 
> as usual when the stream is restarted, e.g. catch up reading new files since 
> the last restart. This would allow us to use the streaming job for continuous 
> processing, with all the benefits it brings, but also to keep the possibility 
> to reprocess the data in the batch fashion by a different job, drop the 
> checkpoints and make the streaming job only run for the new data.
>  
> It would be great to have an option similar to the `newFilesOnly` option on 
> the original StreamingContext.fileStream method: 
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V])]
> but probably with slightly different semantics, described above (ignore all 
> existing for the first run, catch up for the following runs)>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27808) Ability to ignore existing files for structured streaming

2019-05-22 Thread Vladimir Matveev (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Matveev updated SPARK-27808:
-
Description: 
Currently it is not easily possible to make a structured streaming query to 
ignore all of the existing data inside a directory and only process new files, 
created after the job was started. See here for example: 
[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]

 

My use case is to ignore everything which existed in the directory when the 
streaming job is first started (and there are no checkpoints), but to behave as 
usual when the stream is restarted, e.g. catch up reading new files since the 
last restart. This would allow us to use the streaming job for continuous 
processing, with all the benefits it brings, but also to keep the possibility 
to reprocess the data in the batch fashion by a different job, drop the 
checkpoints and make the streaming job only run for the new data.

 

It would be great to have an option similar to the `newFilesOnly` option on the 
original StreamingContext.fileStream method: 
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V)]]

but probably with slightly different semantics, described above (ignore all 
existing for the first run, catch up for the following runs)>

  was:
Currently it is not easily possible to make a structured streaming query to 
ignore all of the existing data inside a directory and only process new files, 
created after the job was started. See here for example: 
[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]

 

My use case is to ignore everything which existed in the directory when the 
streaming job is first started (and there are no checkpoints), but to behave as 
usual when the stream is restarted, e.g. catch up reading new files since the 
last restart. This would allow us to use the streaming job for continuous 
processing, with all the benefits it brings, but also to keep the possibility 
to reprocess the data in the batch fashion by a different job, drop the 
checkpoints and make the streaming job only run for the new data.

 

It would be great to have an option similar to the `newFilesOnly` option on the 
original 
[StreamingContext.fileStream|https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V)]]
 method, but probably with slightly different semantics, described above 
(ignore all existing for the first run, catch up for the following runs)>


> Ability to ignore existing files for structured streaming
> -
>
> Key: SPARK-27808
> URL: https://issues.apache.org/jira/browse/SPARK-27808
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.3, 2.4.3
>Reporter: Vladimir Matveev
>Priority: Major
>
> Currently it is not easily possible to make a structured streaming query to 
> ignore all of the existing data inside a directory and only process new 
> files, created after the job was started. See here for example: 
> [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]
>  
> My use case is to ignore everything which existed in the directory when the 
> streaming job is first started (and there are no checkpoints), but to behave 
> as usual when the stream is restarted, e.g. catch up reading new files since 
> the last restart. This would allow us to use the streaming job for continuous 
> processing, with all the benefits it brings, but also to keep the possibility 
> to reprocess the data in the batch fashion by a different job, drop the 
> checkpoints and make the streaming job only run for the new data.
>  
> It would be great to have an option similar to the `newFilesOnly` option on 
> the original StreamingContext.fileStream method: 
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:S

[jira] [Updated] (SPARK-27808) Ability to ignore existing files for structured streaming

2019-05-22 Thread Vladimir Matveev (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Matveev updated SPARK-27808:
-
Description: 
Currently it is not easily possible to make a structured streaming query to 
ignore all of the existing data inside a directory and only process new files, 
created after the job was started. See here for example: 
[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]

 

My use case is to ignore everything which existed in the directory when the 
streaming job is first started (and there are no checkpoints), but to behave as 
usual when the stream is restarted, e.g. catch up reading new files since the 
last restart. This would allow us to use the streaming job for continuous 
processing, with all the benefits it brings, but also to keep the possibility 
to reprocess the data in the batch fashion by a different job, drop the 
checkpoints and make the streaming job only run for the new data.

 

It would be great to have an option similar to the `newFilesOnly` option on the 
original StreamingContext.fileStream method: 
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V])]

but probably with slightly different semantics, described above (ignore all 
existing for the first run, catch up for the following runs)>

  was:
Currently it is not easily possible to make a structured streaming query to 
ignore all of the existing data inside a directory and only process new files, 
created after the job was started. See here for example: 
[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]

 

My use case is to ignore everything which existed in the directory when the 
streaming job is first started (and there are no checkpoints), but to behave as 
usual when the stream is restarted, e.g. catch up reading new files since the 
last restart. This would allow us to use the streaming job for continuous 
processing, with all the benefits it brings, but also to keep the possibility 
to reprocess the data in the batch fashion by a different job, drop the 
checkpoints and make the streaming job only run for the new data.

 

It would be great to have an option similar to the `newFilesOnly` option on the 
original StreamingContext.fileStream method: 
[https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V])]

but probably with slightly different semantics, described above (ignore all 
existing for the first run, catch up for the following runs)>


> Ability to ignore existing files for structured streaming
> -
>
> Key: SPARK-27808
> URL: https://issues.apache.org/jira/browse/SPARK-27808
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.3, 2.4.3
>Reporter: Vladimir Matveev
>Priority: Major
>
> Currently it is not easily possible to make a structured streaming query to 
> ignore all of the existing data inside a directory and only process new 
> files, created after the job was started. See here for example: 
> [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]
>  
> My use case is to ignore everything which existed in the directory when the 
> streaming job is first started (and there are no checkpoints), but to behave 
> as usual when the stream is restarted, e.g. catch up reading new files since 
> the last restart. This would allow us to use the streaming job for continuous 
> processing, with all the benefits it brings, but also to keep the possibility 
> to reprocess the data in the batch fashion by a different job, drop the 
> checkpoints and make the streaming job only run for the new data.
>  
> It would be great to have an option similar to the `newFilesOnly` option on 
> the original StreamingContext.fileStream method: 
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:S

[jira] [Updated] (SPARK-27808) Ability to ignore existing files for structured streaming

2019-05-22 Thread Vladimir Matveev (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Matveev updated SPARK-27808:
-
Description: 
Currently it is not easily possible to make a structured streaming query to 
ignore all of the existing data inside a directory and only process new files, 
created after the job was started. See here for example: 
[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]

 

My use case is to ignore everything which existed in the directory when the 
streaming job is first started (and there are no checkpoints), but to behave as 
usual when the stream is restarted, e.g. catch up reading new files since the 
last restart. This would allow us to use the streaming job for continuous 
processing, with all the benefits it brings, but also to keep the possibility 
to reprocess the data in the batch fashion by a different job, drop the 
checkpoints and make the streaming job only run for the new data.

 

It would be great to have an option similar to the `newFilesOnly` option on the 
original StreamingContext.fileStream method: 
[https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V])]

but probably with slightly different semantics, described above (ignore all 
existing for the first run, catch up for the following runs)>

  was:
Currently it is not easily possible to make a structured streaming query to 
ignore all of the existing data inside a directory and only process new files, 
created after the job was started. See here for example: 
[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]

 

My use case is to ignore everything which existed in the directory when the 
streaming job is first started (and there are no checkpoints), but to behave as 
usual when the stream is restarted, e.g. catch up reading new files since the 
last restart. This would allow us to use the streaming job for continuous 
processing, with all the benefits it brings, but also to keep the possibility 
to reprocess the data in the batch fashion by a different job, drop the 
checkpoints and make the streaming job only run for the new data.

 

It would be great to have an option similar to the `newFilesOnly` option on the 
original StreamingContext.fileStream method: 
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V)]]

but probably with slightly different semantics, described above (ignore all 
existing for the first run, catch up for the following runs)>


> Ability to ignore existing files for structured streaming
> -
>
> Key: SPARK-27808
> URL: https://issues.apache.org/jira/browse/SPARK-27808
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.3, 2.4.3
>Reporter: Vladimir Matveev
>Priority: Major
>
> Currently it is not easily possible to make a structured streaming query to 
> ignore all of the existing data inside a directory and only process new 
> files, created after the job was started. See here for example: 
> [https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]
>  
> My use case is to ignore everything which existed in the directory when the 
> streaming job is first started (and there are no checkpoints), but to behave 
> as usual when the stream is restarted, e.g. catch up reading new files since 
> the last restart. This would allow us to use the streaming job for continuous 
> processing, with all the benefits it brings, but also to keep the possibility 
> to reprocess the data in the batch fashion by a different job, drop the 
> checkpoints and make the streaming job only run for the new data.
>  
> It would be great to have an option similar to the `newFilesOnly` option on 
> the original StreamingContext.fileStream method: 
> [https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:

[jira] [Created] (SPARK-27808) Ability to ignore existing files for structured streaming

2019-05-22 Thread Vladimir Matveev (JIRA)
Vladimir Matveev created SPARK-27808:


 Summary: Ability to ignore existing files for structured streaming
 Key: SPARK-27808
 URL: https://issues.apache.org/jira/browse/SPARK-27808
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 2.4.3, 2.3.3
Reporter: Vladimir Matveev


Currently it is not easily possible to make a structured streaming query to 
ignore all of the existing data inside a directory and only process new files, 
created after the job was started. See here for example: 
[https://stackoverflow.com/questions/51391722/spark-structured-streaming-file-source-starting-offset]

 

My use case is to ignore everything which existed in the directory when the 
streaming job is first started (and there are no checkpoints), but to behave as 
usual when the stream is restarted, e.g. catch up reading new files since the 
last restart. This would allow us to use the streaming job for continuous 
processing, with all the benefits it brings, but also to keep the possibility 
to reprocess the data in the batch fashion by a different job, drop the 
checkpoints and make the streaming job only run for the new data.

 

It would be great to have an option similar to the `newFilesOnly` option on the 
original 
[StreamingContext.fileStream|https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext@fileStream[K,V,F%3C:org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:String,filter:org.apache.hadoop.fs.Path=%3EBoolean,newFilesOnly:Boolean)(implicitevidence$7:scala.reflect.ClassTag[K],implicitevidence$8:scala.reflect.ClassTag[V],implicitevidence$9:scala.reflect.ClassTag[F]):org.apache.spark.streaming.dstream.InputDStream[(K,V)]]
 method, but probably with slightly different semantics, described above 
(ignore all existing for the first run, catch up for the following runs)>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26723) Spark web UI only shows parts of SQL query graphs for queries with persist operations

2019-01-24 Thread Vladimir Matveev (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladimir Matveev updated SPARK-26723:
-
Attachment: Screen Shot 2019-01-24 at 4.13.14 PM.png
Screen Shot 2019-01-24 at 4.13.02 PM.png

> Spark web UI only shows parts of SQL query graphs for queries with persist 
> operations
> -
>
> Key: SPARK-26723
> URL: https://issues.apache.org/jira/browse/SPARK-26723
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.2
>Reporter: Vladimir Matveev
>Priority: Major
> Attachments: Screen Shot 2019-01-24 at 4.13.02 PM.png, Screen Shot 
> 2019-01-24 at 4.13.14 PM.png
>
>
> Currently it looks like the SQL view in Spark UI will truncate the graph on 
> the nodes corresponding to persist operations on the dataframe, only showing 
> everything after "LocalTableScan". This is *very* inconvenient, because in a 
> common case when you have a heavy computation and want to persist it before 
> writing to multiple outputs with some minor preprocessing, you lose almost 
> the entire graph with potentially very useful information in it.
> The query plans below the graph, however, show the full query, including all 
> computations before persists. Unfortunately, for complex queries looking into 
> the plan is unfeasible, and graph visualization becomes a very helpful tool; 
> with persist, it is apparently broken.
> You can verify it in Spark Shell with a very simple example:
> {code}
> import org.apache.spark.sql.{functions => f}
> import org.apache.spark.sql.expressions.Window
> val query = Vector(1, 2, 3).toDF()
>   .select(($"value".cast("long") * f.rand).as("value"))
>   .withColumn("valueAvg", f.avg($"value") over Window.orderBy("value"))
> query.show()
> query.persist().show()
> {code}
> Here the same query is executed first without persist, and then with it. If 
> you now navigate to the Spark web UI SQL page, you'll see two queries, but 
> their graphs will be radically different: the one without persist will 
> contain the whole transformation with exchange, sort and window steps, while 
> the one with persist will only contain only a LocalTableScan step with some 
> intermediate transformations needed for `show`.
> After some looking into Spark code, I think that the reason for this is that 
> the `org.apache.spark.sql.execution.SparkPlanInfo#fromSparkPlan` method 
> (which is used to serialize a plan before emitting the 
> SparkListenerSQLExecutionStart event) constructs the `SparkPlanInfo` object 
> from a `SparkPlan` object incorrectly, because if you invoke the `toString` 
> method on `SparkPlan` you'll see the entire plan, but the `SparkPlanInfo` 
> object will only contain nodes corresponding to actions after `persist`. 
> However, my knowledge of Spark internals is not deep enough to understand how 
> to fix this, and how SparkPlanInfo.fromSparkPlan is different from what 
> SparkPlan.toString does.
> This can be observed on Spark 2.3.2, but given that 2.4.0 code of 
> SparkPlanInfo does not seem to change much since 2.3.2, I'd expect that it 
> could be reproduced there too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26723) Spark web UI only shows parts of SQL query graphs for queries with persist operations

2019-01-24 Thread Vladimir Matveev (JIRA)
Vladimir Matveev created SPARK-26723:


 Summary: Spark web UI only shows parts of SQL query graphs for 
queries with persist operations
 Key: SPARK-26723
 URL: https://issues.apache.org/jira/browse/SPARK-26723
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 2.3.2
Reporter: Vladimir Matveev


Currently it looks like the SQL view in Spark UI will truncate the graph on the 
nodes corresponding to persist operations on the dataframe, only showing 
everything after "LocalTableScan". This is *very* inconvenient, because in a 
common case when you have a heavy computation and want to persist it before 
writing to multiple outputs with some minor preprocessing, you lose almost the 
entire graph with potentially very useful information in it.

The query plans below the graph, however, show the full query, including all 
computations before persists. Unfortunately, for complex queries looking into 
the plan is unfeasible, and graph visualization becomes a very helpful tool; 
with persist, it is apparently broken.

You can verify it in Spark Shell with a very simple example:
{code}
import org.apache.spark.sql.{functions => f}
import org.apache.spark.sql.expressions.Window

val query = Vector(1, 2, 3).toDF()
  .select(($"value".cast("long") * f.rand).as("value"))
  .withColumn("valueAvg", f.avg($"value") over Window.orderBy("value"))
query.show()
query.persist().show()
{code}
Here the same query is executed first without persist, and then with it. If you 
now navigate to the Spark web UI SQL page, you'll see two queries, but their 
graphs will be radically different: the one without persist will contain the 
whole transformation with exchange, sort and window steps, while the one with 
persist will only contain only a LocalTableScan step with some intermediate 
transformations needed for `show`.

After some looking into Spark code, I think that the reason for this is that 
the `org.apache.spark.sql.execution.SparkPlanInfo#fromSparkPlan` method (which 
is used to serialize a plan before emitting the SparkListenerSQLExecutionStart 
event) constructs the `SparkPlanInfo` object from a `SparkPlan` object 
incorrectly, because if you invoke the `toString` method on `SparkPlan` you'll 
see the entire plan, but the `SparkPlanInfo` object will only contain nodes 
corresponding to actions after `persist`. However, my knowledge of Spark 
internals is not deep enough to understand how to fix this, and how 
SparkPlanInfo.fromSparkPlan is different from what SparkPlan.toString does.

This can be observed on Spark 2.3.2, but given that 2.4.0 code of SparkPlanInfo 
does not seem to change much since 2.3.2, I'd expect that it could be 
reproduced there too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org