ShuffleManager and Speculative Execution

2023-12-21 Thread Enrico Minack

Hi Spark devs,

I have a question around ShuffleManager: With speculative execution, one 
map output file is being created multiple times (by multiple task 
attempts). If both attempts succeed, which is to be read by the reduce 
task in the next stage? Is any map output as good as any other?


Thanks for clarification,
Enrico


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



10x to 100x faster df.groupby().applyInPandas()

2023-12-01 Thread Enrico Minack

Hi devs,

I am looking for some PySpark dev that is interested in some 10x to 100x 
speed up of df.groupby().applyInPandas() for small groups.


A PoC and benchmark can be found at 
https://github.com/apache/spark/pull/37360#issuecomment-1228293766.


I suppose, the same approach could be taken to improve performance of 
vectorized UDFs (for small groups): 
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.pandas_udf.html


Happy to turn this into a proper pull request if someone volunteers to 
review this.


Cheers,
Enrico


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



On adding applyInArrow to groupBy and cogroup

2023-10-26 Thread Enrico Minack

Hi devs,

PySpark allows to transform a |DataFrame| via Pandas *and* Arrow API:

df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")

For |df.groupBy(...)| and |df.groupBy(...).cogroup(...)|, there is 
*only* a Pandas interface, no Arrow interface:


df.groupBy("id").applyInPandas(apply_pandas, schema="...")

Providing a pure Arrow interface allows user code to use *any* 
Arrow-based data framework, not only Pandas, e.g. Polars. Adding Arrow 
interfaces reduces the need to add more framework-specific support.


We need your thoughts on whether PySpark should support Arrow on a par 
with Pandas, or not: https://github.com/apache/spark/pull/38624


Cheers,
Enrico


Re: [Reminder] Spark 3.5 Branch Cut

2023-07-15 Thread Enrico Minack
Speaking of JdbcDialect, is there any interest in getting upserts for 
JDBC into 3.5.0?


[SPARK-19335][SPARK-38200][SQL] Add upserts for writing to JDBC: 
https://github.com/apache/spark/pull/41518
[SPARK-19335][SPARK-38200][SQL] Add upserts for writing to JDBC using 
MERGE INTO with temp table: https://github.com/apache/spark/pull/41611


Enrico


Am 15.07.23 um 04:10 schrieb Jia Fan:
Can we put [SPARK-44262][SQL] Add `dropTable` and `getInsertStatement` 
to JdbcDialect into 3.5.0?

https://github.com/apache/spark/pull/41855
Since this is the last major version update of 3.x, I think we need to 
make sure JdbcDialect can support more databases.



Gengliang Wang  于2023年7月15日周六 05:20写道:

Hi Yuanjian,

Besides the abovementioned changes, it would be great to include
the UI page for Spakr Connect: SPARK-44394
.

Best Regards,
Gengliang

On Fri, Jul 14, 2023 at 11:44 AM Julek Sompolski
 wrote:

Thank you,
My changes that you listed are tracked under this Epic:
https://issues.apache.org/jira/browse/SPARK-43754
I am also working on
https://issues.apache.org/jira/browse/SPARK-44422, didn't
mention it before because I have hopes that this one will make
it before the cut.

(Unrelated) My colleague is also working on
https://issues.apache.org/jira/browse/SPARK-43923 and I am
reviewing https://github.com/apache/spark/pull/41443, so I
hope that that one will also make it before the cut.

Best regards,
Juliusz Sompolski

On Fri, Jul 14, 2023 at 7:34 PM Yuanjian Li
 wrote:

Hi everyone,
As discussed earlier in "Time for Spark v3.5.0 release", I
will cut branch-3.5 on *Monday, July 17th at 1 pm PST* as
scheduled.

Please plan your PR merge accordingly with the given
timeline. Currently, we have received the following
exception merge requests:

  * SPARK-44421: Reattach to existing execute in Spark
Connect (server mechanism)
  * SPARK-44423:  Reattach to existing execute in Spark
Connect (scala client)
  * SPARK-44424:  Reattach to existing execute in Spark
Connect (python client)

If there are any other exception feature requests, please
reply to this email. We will not merge any new features in
3.5 after the branch cut.

Best,
Yuanjian



Re: [DISCUSS] Add SQL functions into Scala, Python and R API

2023-05-24 Thread Enrico Minack

+1

Functions available in SQL (more general in one API) should be available 
in all APIs. I am very much in favor of this.


Enrico


Am 24.05.23 um 09:41 schrieb Hyukjin Kwon:


Hi all,

I would like to discuss adding all SQL functions into Scala, Python 
and R API.

We have SQL functions that do not exist in Scala, Python and R around 175.
For example, we don’t have |pyspark.sql.functions.percentile| but you 
can invoke

it as a SQL function, e.g., |SELECT percentile(...)|.

The reason why we do not have all functions in the first place is that 
we want to
only add commonly used functions, see also 
https://github.com/apache/spark/pull/21318 (which I agreed at that time)


However, this has been raised multiple times over years, from the OSS 
community, dev mailing list, JIRAs, stackoverflow, etc.

Seems it’s confusing about which function is available or not.

Yes, we have a workaround. We can call all expressions by 
|expr("...")| or |call_udf("...", Columns ...)|
But still it seems that it’s not very user-friendly because they 
expect them available under the functions namespace.


Therefore, I would like to propose adding all expressions into all 
languages so that Spark is simpler and less confusing, e.g., which API 
is in functions or not.


Any thoughts?



Re: Spark 3.2.4 pom NOT FOUND on maven

2023-04-21 Thread Enrico Minack

Hi Dongjoon,

thanks for confirmation.

I have added the Apache release repository to my project, so it fetches 
the jars from there and not Maven central.


That is a great workaround until Maven central has resolved the issue.

Cheers,
Enrico


Am 19.04.23 um 03:04 schrieb Dongjoon Hyun:

Thank you for reporting, Enrico.

I verified your issue report and also double-checked that both the original 
official Apache repository and Google Maven Mirror works correctly. Given that, 
it could be due to some transient issues because the artifacts are copied from 
Apache repository to Maven Central to Google Mirror. It means it worked fine 
until they are copied to Google Mirror.

1) 
https://repository.apache.org/content/repositories/releases/org/apache/spark/spark-parent_2.13/3.2.4/spark-parent_2.13-3.2.4.pom

2) 
https://maven-central.storage-download.googleapis.com/maven2/org/apache/spark/spark-parent_2.13/3.2.4/spark-parent_2.13-3.2.4.pom

You may want to use (1) and (2) repositories temporarily while waiting for 
`repo1.maven.org`'s recovery.

Dongjoon.


On 2023/04/18 05:38:59 Enrico Minack wrote:

Any suggestions on how to fix or use the Spark 3.2.4 (Scala 2.13) release?

Cheers,
Enrico


Am 17.04.23 um 08:19 schrieb Enrico Minack:

Hi,

thanks for the Spark 3.2.4 release.

I have found that Maven does not serve the spark-parent_2.13 pom file.
It is listed in the directory:
https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.13/3.2.4/

But cannot be downloaded:
https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.13/3.2.4/spark-parent_2.13-3.2.4.pom


The 2.12 file is fine:
https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.12/3.2.4/spark-parent_2.12-3.2.4.pom


Any chance this can be fixed?

Cheers,
Enrico


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark 3.4.0 with Hadoop2.7 cannot be downloaded

2023-04-20 Thread Enrico Minack

Hi Xinrong,

thanks for fixing the download page.

Cheers,
Enrico


Am 20.04.23 um 16:09 schrieb Xinrong Meng:

Hi Enrico,

Thank you for the report!

We intentionally dropped Hadoop 2 binary distribution from the Apache 
Spark 3.4 release process, considering Hadoop 3 binary distribution is 
recommended for all Hadoop clusters. Please see SPARK-40651 
<https://issues.apache.org/jira/browse/SPARK-40651> .


The option to download Spark 3.4.0 with Hadoop2.7 has been removed 
from the Downloads page to avoid confusion.


Thanks,

Xinrong Meng

On Wed, Apr 19, 2023 at 11:24 PM Enrico Minack 
 wrote:


Hi,

selecting Spark 3.4.0 with Hadoop2.7 at
https://spark.apache.org/downloads.html leads to


https://www.apache.org/dyn/closer.lua/spark/spark-3.4.0/spark-3.4.0-bin-hadoop2.tgz

saying:

The requested file or directory is *not* on the mirrors.

The object is in not in our archive https://archive.apache.org/dist/

Is this expected?

Enrico



Spark 3.4.0 with Hadoop2.7 cannot be downloaded

2023-04-20 Thread Enrico Minack

Hi,

selecting Spark 3.4.0 with Hadoop2.7 at 
https://spark.apache.org/downloads.html leads to


https://www.apache.org/dyn/closer.lua/spark/spark-3.4.0/spark-3.4.0-bin-hadoop2.tgz

saying:

The requested file or directory is *not* on the mirrors.

The object is in not in our archive https://archive.apache.org/dist/

Is this expected?

Enrico


Re: Spark 3.2.4 pom NOT FOUND on maven

2023-04-17 Thread Enrico Minack

Any suggestions on how to fix or use the Spark 3.2.4 (Scala 2.13) release?

Cheers,
Enrico


Am 17.04.23 um 08:19 schrieb Enrico Minack:

Hi,

thanks for the Spark 3.2.4 release.

I have found that Maven does not serve the spark-parent_2.13 pom file. 
It is listed in the directory:

https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.13/3.2.4/

But cannot be downloaded:
https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.13/3.2.4/spark-parent_2.13-3.2.4.pom 



The 2.12 file is fine:
https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.12/3.2.4/spark-parent_2.12-3.2.4.pom 



Any chance this can be fixed?

Cheers,
Enrico


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Spark 3.2.4 pom NOT FOUND on maven

2023-04-17 Thread Enrico Minack

Hi,

thanks for the Spark 3.2.4 release.

I have found that Maven does not serve the spark-parent_2.13 pom file. 
It is listed in the directory:

https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.13/3.2.4/

But cannot be downloaded:
https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.13/3.2.4/spark-parent_2.13-3.2.4.pom

The 2.12 file is fine:
https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.12/3.2.4/spark-parent_2.12-3.2.4.pom

Any chance this can be fixed?

Cheers,
Enrico


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark Union performance issue

2023-02-22 Thread Enrico Minack
So you union two tables, union the result with another one, and finally 
with a last one?


How many columns do all these tables have?

Are you sure creating the plan depends on the number of rows?

Enrico


Am 22.02.23 um 19:08 schrieb Prem Sahoo:

here is the information missed
1. Spark 3.2.0
2. it is scala based
3. size of tables will be ~60G
4. explain plan for catalysts shows lots of time is being spent in 
creating the plan

5. number of union table is 2 , and another 2 then finally 2

slowness is providing resylut as the data size & column size increases .

On Wed, Feb 22, 2023 at 11:07 AM Enrico Minack 
 wrote:


Plus number of unioned tables would be helpful, as well as which
downstream operations are performed on the unioned tables.

And what "performance issues" do you exactly measure?

Enrico



Am 22.02.23 um 16:50 schrieb Mich Talebzadeh:

Hi,

Few details will help

 1. Spark version
 2. Spark SQL, Scala or PySpark
 3. size of tables in join.
 4. What does explain() or the joining operation show?


HTH


**view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


https://en.everybodywiki.com/Mich_Talebzadeh

*Disclaimer:* Use it at your own risk.Any and all responsibility
for any loss, damage or destruction of data or any other property
which may arise from relying on this email's technical content is
explicitly disclaimed. The author will in no case be liable for
any monetary damages arising from such loss, damage or destruction.



On Wed, 22 Feb 2023 at 15:42, Prem Sahoo 
wrote:

Hello Team,
We are observing Spark Union performance issues when unioning
big tables with lots of rows. Do we have any option apart
from the Union ?





Re: Spark Union performance issue

2023-02-22 Thread Enrico Minack
Plus number of unioned tables would be helpful, as well as which 
downstream operations are performed on the unioned tables.


And what "performance issues" do you exactly measure?

Enrico



Am 22.02.23 um 16:50 schrieb Mich Talebzadeh:

Hi,

Few details will help

 1. Spark version
 2. Spark SQL, Scala or PySpark
 3. size of tables in join.
 4. What does explain() or the joining operation show?


HTH


**view my Linkedin profile 




https://en.everybodywiki.com/Mich_Talebzadeh

*Disclaimer:* Use it at your own risk.Any and all responsibility for 
any loss, damage or destruction of data or any other property which 
may arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.




On Wed, 22 Feb 2023 at 15:42, Prem Sahoo  wrote:

Hello Team,
We are observing Spark Union performance issues when unioning big
tables with lots of rows. Do we have any option apart from the Union ?



Re: [VOTE] Release Spark 3.3.2 (RC1)

2023-02-12 Thread Enrico Minack

RC builds and all our downstream tests are green, thanks for the release!


Am 11.02.23 um 06:00 schrieb L. C. Hsieh:

Please vote on releasing the following candidate as Apache Spark version 3.3.2.

The vote is open until Feb 15th 9AM (PST) and passes if a majority +1
PMC votes are cast, with a minimum of 3 +1 votes.

[ ] +1 Release this package as Apache Spark 3.3.2
[ ] -1 Do not release this package because ...

To learn more about Apache Spark, please see https://spark.apache.org/

The tag to be voted on is v3.3.2-rc1 (commit
5103e00c4ce5fcc4264ca9c4df12295d42557af6):
https://github.com/apache/spark/tree/v3.3.2-rc1

The release files, including signatures, digests, etc. can be found at:
https://dist.apache.org/repos/dist/dev/spark/v3.3.2-rc1-bin/

Signatures used for Spark RCs can be found in this file:
https://dist.apache.org/repos/dist/dev/spark/KEYS

The staging repository for this release can be found at:
https://repository.apache.org/content/repositories/orgapachespark-1433/

The documentation corresponding to this release can be found at:
https://dist.apache.org/repos/dist/dev/spark/v3.3.2-rc1-docs/

The list of bug fixes going into 3.3.2 can be found at the following URL:
https://issues.apache.org/jira/projects/SPARK/versions/12352299

This release is using the release script of the tag v3.3.2-rc1.

FAQ

=
How can I help test this release?
=

If you are a Spark user, you can help us test this release by taking
an existing Spark workload and running on this release candidate, then
reporting any regressions.

If you're working in PySpark you can set up a virtual env and install
the current RC and see if anything important breaks, in the Java/Scala
you can add the staging repository to your projects resolvers and test
with the RC (make sure to clean up the artifact cache before/after so
you don't end up building with a out of date RC going forward).

===
What should happen to JIRA tickets still targeting 3.3.2?
===

The current list of open tickets targeted at 3.3.2 can be found at:
https://issues.apache.org/jira/projects/SPARK and search for "Target
Version/s" = 3.3.2

Committers should look at those and triage. Extremely important bug
fixes, documentation, and API tweaks that impact compatibility should
be worked on immediately. Everything else please retarget to an
appropriate release.

==
But my bug isn't fixed?
==

In order to make timely releases, we will typically not hold the
release unless the bug in question is a regression from the previous
release. That being said, if there is something which is a regression
that has not been correctly targeted please ping me or a committer to
help target the issue.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Time for Spark 3.4.0 release?

2023-01-17 Thread Enrico Minack
You are saying the RCs are cut from that branch at a later point? What 
is the estimate deadline for that?


Enrico



Am 18.01.23 um 07:59 schrieb Hyukjin Kwon:

These look like we can fix it after the branch-cut so should be fine.

On Wed, 18 Jan 2023 at 15:57, Enrico Minack  
wrote:


Hi Xinrong,

what about regression issue
https://issues.apache.org/jira/browse/SPARK-40819
and correctness issue
https://issues.apache.org/jira/browse/SPARK-40885?

The latter gets fixed by either
https://issues.apache.org/jira/browse/SPARK-41959 or
https://issues.apache.org/jira/browse/SPARK-42049.

Are those considered important?

Cheers,
Enrico


Am 18.01.23 um 04:29 schrieb Xinrong Meng:

Hi All,

Considering there are still important issues unresolved (some are
as shown below), I would suggest to be conservative, we delay the
branch-3.4's cut for one week.

https://issues.apache.org/jira/browse/SPARK-39375
https://issues.apache.org/jira/browse/SPARK-41589
https://issues.apache.org/jira/browse/SPARK-42075
https://issues.apache.org/jira/browse/SPARK-25299
https://issues.apache.org/jira/browse/SPARK-41053

I plan to cut *branch-3.4* at *18:30 PT, January 24, 2023*.
Please ensure your changes for Apache Spark 3.4 to be ready by
that time.

Feel free to reply to the email if you have other ongoing big
items for Spark 3.4.

Thanks,

Xinrong Meng

On Sat, Jan 7, 2023 at 9:16 AM Hyukjin Kwon 
wrote:

Thanks Xinrong.

On Sat, Jan 7, 2023 at 9:18 AM Xinrong Meng
 wrote:

The release window for Apache Spark 3.4.0 is updated per
https://github.com/apache/spark-website/pull/430.

Thank you all!

On Thu, Jan 5, 2023 at 2:10 PM Maxim Gekk
 wrote:

+1

On Thu, Jan 5, 2023 at 12:25 AM huaxin gao
 wrote:

+1 Thanks!

On Wed, Jan 4, 2023 at 10:19 AM L. C. Hsieh
 wrote:

+1

Thank you!

On Wed, Jan 4, 2023 at 9:13 AM Chao Sun
 wrote:

+1, thanks!

Chao

On Wed, Jan 4, 2023 at 1:56 AM Mridul
Muralidharan  wrote:


+1, Thanks !

Regards,
Mridul

On Wed, Jan 4, 2023 at 2:20 AM
Gengliang Wang  wrote:

+1, thanks for driving the release!


Gengliang

On Tue, Jan 3, 2023 at 10:55 PM
Dongjoon Hyun
 wrote:

+1

Thank you!

Dongjoon

On Tue, Jan 3, 2023 at 9:44
PM Rui Wang
 wrote:

+1 to cut the branch
starting from a workday!

Great to see this is
happening!

Thanks Xinrong!

-Rui

On Tue, Jan 3, 2023 at
9:21 PM 416161...@qq.com
 wrote:

+1, thank you Xinrong
for driving this release!




Ruifeng Zheng
ruife...@foxmail.com


<https://wx.mail.qq.com/home/index?t=readmail_businesscard_midpage=true=Ruifeng+Zheng=https%3A%2F%2Fres.mail.qq.com%2Fzh_CN%2Fhtmledition%2Fimages%2Frss%2Fmale.gif%3Frand%3D1617349242=ruifengz%40foxmail.com=>


--
Original
--
*From:* "Hyukjin
Kwon"
;
*D

Re: Time for Spark 3.4.0 release?

2023-01-17 Thread Enrico Minack

Hi Xinrong,

what about regression issue 
https://issues.apache.org/jira/browse/SPARK-40819

and correctness issue https://issues.apache.org/jira/browse/SPARK-40885?

The latter gets fixed by either 
https://issues.apache.org/jira/browse/SPARK-41959 or 
https://issues.apache.org/jira/browse/SPARK-42049.


Are those considered important?

Cheers,
Enrico


Am 18.01.23 um 04:29 schrieb Xinrong Meng:

Hi All,

Considering there are still important issues unresolved (some are as 
shown below), I would suggest to be conservative, we delay the 
branch-3.4's cut for one week.


https://issues.apache.org/jira/browse/SPARK-39375
https://issues.apache.org/jira/browse/SPARK-41589
https://issues.apache.org/jira/browse/SPARK-42075
https://issues.apache.org/jira/browse/SPARK-25299
https://issues.apache.org/jira/browse/SPARK-41053

I plan to cut *branch-3.4* at *18:30 PT, January 24, 2023*. Please 
ensure your changes for Apache Spark 3.4 to be ready by that time.


Feel free to reply to the email if you have other ongoing big items 
for Spark 3.4.


Thanks,

Xinrong Meng

On Sat, Jan 7, 2023 at 9:16 AM Hyukjin Kwon  wrote:

Thanks Xinrong.

On Sat, Jan 7, 2023 at 9:18 AM Xinrong Meng
 wrote:

The release window for Apache Spark 3.4.0 is updated per
https://github.com/apache/spark-website/pull/430.

Thank you all!

On Thu, Jan 5, 2023 at 2:10 PM Maxim Gekk
 wrote:

+1

On Thu, Jan 5, 2023 at 12:25 AM huaxin gao
 wrote:

+1 Thanks!

On Wed, Jan 4, 2023 at 10:19 AM L. C. Hsieh
 wrote:

+1

Thank you!

On Wed, Jan 4, 2023 at 9:13 AM Chao Sun
 wrote:

+1, thanks!

Chao

On Wed, Jan 4, 2023 at 1:56 AM Mridul
Muralidharan  wrote:


+1, Thanks !

Regards,
Mridul

On Wed, Jan 4, 2023 at 2:20 AM Gengliang
Wang  wrote:

+1, thanks for driving the release!


Gengliang

On Tue, Jan 3, 2023 at 10:55 PM
Dongjoon Hyun
 wrote:

+1

Thank you!

Dongjoon

On Tue, Jan 3, 2023 at 9:44 PM Rui
Wang  wrote:

+1 to cut the branch starting
from a workday!

Great to see this is happening!

Thanks Xinrong!

-Rui

On Tue, Jan 3, 2023 at 9:21 PM
416161...@qq.com
 wrote:

+1, thank you Xinrong for
driving this release!




Ruifeng Zheng
ruife...@foxmail.com





--
Original --
*From:* "Hyukjin Kwon"
;
*Date:* Wed, Jan 4, 2023
01:15 PM
*To:* "Xinrong
Meng";
*Cc:* "dev";
*Subject:* Re: Time for
Spark 3.4.0 release?

SGTM +1

On Wed, Jan 4, 2023 at
2:13 PM Xinrong Meng

wrote:

Hi All,

Shall we cut
*branch-3.4* on

Re: Time for Spark 3.4.0 release?

2023-01-04 Thread Enrico Minack

Hi All,

can we get these correctness issues fixed with the 3.4 release, please?

SPARK-41162 incorrect query plan for anti-join and semi-join of 
self-joined aggregations (since 3.1), fix in 
https://github.com/apache/spark/pull/39131
SPARK-40885 loosing in-partition order for string type partition columns 
when partitioned-writing (introduced in Spark 3.4)


Cheers,
Enrico


Am 04.01.23 um 06:12 schrieb Xinrong Meng:

Hi All,

Shall we cut *branch-3.4* on *January 16th, 2023*? We proposed January 
15th per
https://spark.apache.org/versioning-policy.html, but I would suggest 
we postpone one day since January 15th is a Sunday.


I would like to volunteer as the release manager for *Apache Spark 3.4.0*.

Thanks,

Xinrong Meng



Re: Does partitioned write preserve in-partition order?

2022-10-11 Thread Enrico Minack

More insights on config regarding this issue:

With spark.sql.adaptive.enabled set true, this fails for all 3.x 
versions, except for master (3.4.0-SNAPSHOT). When set false, it works 
as expected for all versions.


With spark.sql.adaptive.enabled set true, and 
spark.sql.adaptive.coalescePartitions.enabled set false, it still fails 
for all versions before 3.4.0.



Enrico


Am 11.10.22 um 12:15 schrieb Enrico Minack:


Hi Devs,

this has been raised by Swetha on the user mailing list, which also 
hit us recently.


Here is the question again:

*Is it guaranteed that written files are sorted as stated in 
**sortWithinPartitions**?*


ds.repartition($"day")
  .sortWithinPartitions($"day", $"id")
  .write
  .partitionBy("day")
  .csv("interleaved.csv")

This construct is a common use case to generate partitioned and sorted 
files, where downstream systems depend on guaranteed order.


Instead of

0
1
2
3
4
...
995
996
997
998
999

You get

0
8388608
1
8388609
2
8388610
3
8388611
4
...
1611390
998
1611391
999
1611392
1611393
1611394
...
8388600
8388601
8388602
8388603
8388604
8388605
8388606
8388607

It used to work until 3.0.3. *Was this guaranteed to work or just 
happened to be correct?*


It stopped working with 3.1.0, but we can workaround setting 
spark.sql.adaptive.coalescePartitions.enabled="false". *Is that 
guaranteed to fix it?*


With 3.2.x and 3.3.x, the workaround does not work. *Is there a 
workaround?*


It has been fixed in 3.4.0-SNAPSHOT. *Was that fixed intentionally or 
accidentally?*



Code to reproduce:

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SaveMode

val ids = 1000
val days = 10

case class Value(day: Long, id: Long)

val ds = spark.range(days).withColumnRenamed("id", 
"day").join(spark.range(ids)).as[Value]


// days * 10 is required, as well as a sufficiently large value for 
ids (10m) and day (10)

ds.repartition(days * 10, $"day")
  .sortWithinPartitions($"day", $"id")
  .write
  .partitionBy("day")
  .mode(SaveMode.Overwrite)
  .csv("interleaved.csv")

val df = 
spark.read.schema(Encoders.product[Value].schema).csv("interleaved.csv")


Check the written files are sorted (says OK when they are sorted):

for file in interleaved.csv/day\=*/part-*
do
  echo "$(sort -n "$file" | md5sum | cut -d " " -f 1)  $file"
done | md5sum -c


Thanks for your background knowledge on this.

Cheers,
Enrico



Does partitioned write preserve in-partition order?

2022-10-11 Thread Enrico Minack

Hi Devs,

this has been raised by Swetha on the user mailing list, which also hit 
us recently.


Here is the question again:

*Is it guaranteed that written files are sorted as stated in 
**sortWithinPartitions**?*


ds.repartition($"day")
  .sortWithinPartitions($"day", $"id")
  .write
  .partitionBy("day")
  .csv("interleaved.csv")

This construct is a common use case to generate partitioned and sorted 
files, where downstream systems depend on guaranteed order.


Instead of

0
1
2
3
4
...
995
996
997
998
999

You get

0
8388608
1
8388609
2
8388610
3
8388611
4
...
1611390
998
1611391
999
1611392
1611393
1611394
...
8388600
8388601
8388602
8388603
8388604
8388605
8388606
8388607

It used to work until 3.0.3. *Was this guaranteed to work or just 
happened to be correct?*


It stopped working with 3.1.0, but we can workaround setting 
spark.sql.adaptive.coalescePartitions.enabled="false". *Is that 
guaranteed to fix it?*


With 3.2.x and 3.3.x, the workaround does not work. *Is there a workaround?*

It has been fixed in 3.4.0-SNAPSHOT. *Was that fixed intentionally or 
accidentally?*



Code to reproduce:

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SaveMode

val ids = 1000
val days = 10

case class Value(day: Long, id: Long)

val ds = spark.range(days).withColumnRenamed("id", 
"day").join(spark.range(ids)).as[Value]


// days * 10 is required, as well as a sufficiently large value for ids 
(10m) and day (10)

ds.repartition(days * 10, $"day")
  .sortWithinPartitions($"day", $"id")
  .write
  .partitionBy("day")
  .mode(SaveMode.Overwrite)
  .csv("interleaved.csv")

val df = 
spark.read.schema(Encoders.product[Value].schema).csv("interleaved.csv")


Check the written files are sorted (says OK when they are sorted):

for file in interleaved.csv/day\=*/part-*
do
  echo "$(sort -n "$file" | md5sum | cut -d " " -f 1)  $file"
done | md5sum -c


Thanks for your background knowledge on this.

Cheers,
Enrico


Support for spark-packages.org

2022-09-13 Thread Enrico Minack

Hi devs,

I understand that spark-packages.org is not associated with Apache and 
Apache Spark, but hosted by Databricks. Does anyone have any pointers on 
how to get support? The e-mail address feedb...@spark-packages.org does 
not respond.


I found a few "missing features" that block me from registering my 
packages / releases.


Thanks a lot,
Enrico


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Cannot resolve graphx 3.4.0-SNAPSHOT

2022-06-19 Thread Enrico Minack
Issue solved by explicitly adding the 
https://repository.apache.org/snapshots repository to my POM.


Mvn resolved other packages from that repo, and this has worked for 
snapshots before.


Thanks anyway,
Enrico


Am 19.06.22 um 22:30 schrieb Enrico Minack:


Hi devs,

moving to 3.4.0 snapshots, Spark modules resolve perfectly fine for 
3.4.0-SNAPSHOT, except for graphx:


 org.apache.spark spark-graphx_2.12 
3.4.0-SNAPSHOT provided 
...
Downloading from apache.snapshots: 
https://repository.apache.org/snapshots/org/apache/spark/spark-catalyst_2.12/3.4.0-SNAPSHOT/maven-metadata.xml
Downloaded from apache.snapshots: 
https://repository.apache.org/snapshots/org/apache/spark/spark-catalyst_2.12/3.4.0-SNAPSHOT/maven-metadata.xml 
(1.7 kB at 15 kB/s)
[WARNING] The POM for 
org.apache.spark:spark-graphx_2.12:jar:3.4.0-SNAPSHOT is missing, no 
dependency information available


The respective 
https://repository.apache.org/snapshots/org/apache/spark/spark-graphx_2.12/3.4.0-SNAPSHOT/maven-metadata.xml 
exists, it contains the POM entry, and the respective POM file exists.


I cannot see why mvn cannot resolve this particular dependency.

Any hints?

Cheers,
Enrico






Cannot resolve graphx 3.4.0-SNAPSHOT

2022-06-19 Thread Enrico Minack

Hi devs,

moving to 3.4.0 snapshots, Spark modules resolve perfectly fine for 
3.4.0-SNAPSHOT, except for graphx:


 org.apache.spark spark-graphx_2.12 
3.4.0-SNAPSHOT provided 

...
Downloading from apache.snapshots: 
https://repository.apache.org/snapshots/org/apache/spark/spark-catalyst_2.12/3.4.0-SNAPSHOT/maven-metadata.xml
Downloaded from apache.snapshots: 
https://repository.apache.org/snapshots/org/apache/spark/spark-catalyst_2.12/3.4.0-SNAPSHOT/maven-metadata.xml 
(1.7 kB at 15 kB/s)
[WARNING] The POM for 
org.apache.spark:spark-graphx_2.12:jar:3.4.0-SNAPSHOT is missing, no 
dependency information available


The respective 
https://repository.apache.org/snapshots/org/apache/spark/spark-graphx_2.12/3.4.0-SNAPSHOT/maven-metadata.xml 
exists, it contains the POM entry, and the respective POM file exists.


I cannot see why mvn cannot resolve this particular dependency.

Any hints?

Cheers,
Enrico


Re: [Spark] [SQL] Updating Spark from version 3.0.1 to 3.2.1 reduced functionality for working with parquet files

2022-06-05 Thread Enrico Minack

Hi,

looks like the error comes from the Parquet library, has the library 
version changed moving to 3.2.1? What are the parquet versions used in 
3.0.1 and 3.2.1? Can you read that parquet file with the newer parquet 
library version natively (without Spark)? Then this might be a Parquet 
issue, not a Spark issue.


Unless Spark 3.2.1 does predicate filter pushdown while 3.0.1 did not 
and it has never been supported by Parquet. Then disable filter pushdown 
feature should help: config("spark.sql.parquet.filterPushdown", false).


Enrico


Am 05.06.22 um 10:37 schrieb Amin Borjian:


Hi.

We are updating our Spark cluster from version 3.0.1 to 3.2.1 in order 
to get benefits from lots of improvement. Everything was good until we 
see strange behavior. Assume follow protobuf structure:


message Model {

 string name = 1;

 repeated string keywords = 2;

}

We store protobuf in parquet files with parquet library in HDFS. 
Before Spark 3.2.1 we could run below query on Spark:


val df = spark.read.parquet("/path/to/parquet")

df.registerTempTable("models")

spark.sql("select * from models where array_contains(keywords, 
'XXX’)").show(false)


However after updating Spark to version 3.2.1, we receive following 
error (at the end of email). I think we lost good feature! Is it by 
mistake or on purpose? Can we some how fix problem without reverting 
or not? Should we wait for new release or not? Thank you in advance 
for help.


Caused by: java.lang.IllegalArgumentException: FilterPredicates do not 
currently support repeated columns. Column keywords is repeated.


  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)


  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)


  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)


  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)


  at 
org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)


  at 
org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)


  at 
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)


  at 
org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)


  at 
org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)


  at 
org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)


  at 
org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)


  at 
org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:789)


  at 
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)


  at 
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)


  at 
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)


  at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)


  at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)


  at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)


  at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)


  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)

  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown 
Source)


  at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)


  at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)


  at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)


  at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)


  at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)


  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)


  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)

  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)

  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)

  at org.apache.spark.scheduler.Task.run(Task.scala:131)

  at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)


  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)

  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)

  at 

Re: [Spark Core]: Support for un-pivoting data ('melt')

2022-04-11 Thread Enrico Minack
The melt function has recently been implemented in the PySpark Pandas 
API (because melt is part of the Pandas API). I think, Scala/Java 
Dataset and Python DataFrame APIs deserve this method equally well, 
ideally all based on one implementation.


I'd like to fuel the conversation with some code: 
https://github.com/apache/spark/pull/36150


Cheers,
Enrico


Am 02.01.22 um 20:59 schrieb Daniel Davies:

Level: Intermediate (I think?)
Scenario: Feature Request

Hello dev@,

(First time posting on this mailing list; apologies in advance if this 
should have been routed elsewhere or is missing any information).


Un-pivoting data is supported on numerous SQL engines & in Pandas 
(with the 'melt' function), but it isn't directly available in spark. 
It's easy enough to derive this functionality using the 'stack' 
function or a combination of struct, array, and explode (e.g. such as 
the reproduction of the melt function in pandas-on-pyspark here 
), 
but I was wondering whether a more native solution had been 
considered? It would make end-user code more lightweight at the very 
least; and I wonder whether it could be made more efficient than using 
the stack function/struct-array-explode method.


I'm happy to try and make a PR if this is something that might be 
useful within spark. No worries if this is not something that you 
think should be supported; the methods above work and are well 
documented on StackOverflow. I was personally just caught out by this, 
and thought it would be useful to raise.


I did see a thread in the Pony archive about this issue, but it looks 
like it didn't go anywhere. Does anyone else have context on this 
?


Kind Regards,

--
/_*Daniel Davies*_/




Re: [SPARK-34806] Observable Metrics on Spark Datasets

2021-03-20 Thread Enrico Minack

The PR can be found here: https://github.com/apache/spark/pull/31905


Am 19.03.21 um 10:55 schrieb Enrico Minack:


I'll sketch out a PR so we can talk code and move the discussion there.



Am 18.03.21 um 14:55 schrieb Wenchen Fan:
I think a listener-based API makes sense for streaming (since you 
need to keep watching the result), but may not be very reasonable for 
batch queries (you only get the result once). The idea of 
Observation looks good, but we should define what happens if 
`observation.get` is called before the batch query finishes.


Can we have a PR for it so that we can have more detailed discussions?

On Tue, Mar 16, 2021 at 3:59 PM Jungtaek Lim 
mailto:kabhwan.opensou...@gmail.com>> 
wrote:


Please follow up the discussion in the origin PR.
https://github.com/apache/spark/pull/26127
<https://github.com/apache/spark/pull/26127>

Dataset.observe() relies on the query listener for the batch
query which is an "unstable" API - that's why we decided to not
add an example for the batch query. For streaming query, it
relies on the streaming query listener which is a stable API.
That said, personally I'd consider the new API to be fit to the
streaming query first, and see whether it fits to the batch query
as well.

If we found Dataset.observe() to be useful enough on the batch
query, we'd probably be better to discuss how to provide these
metrics against a stable API (so that Scala users could leverage
it), and look back later for PySpark. That looks to be the first
one to do if we have a consensus on the usefulness of observable
metrics on batch query.


On Tue, Mar 16, 2021 at 4:17 PM Enrico Minack
mailto:m...@enrico.minack.dev>> wrote:

I am focusing on batch mode, not streaming mode. I would
argue that Dataset.observe() is equally useful for large
batch processing. If you need some motivating use cases,
please let me know.

Anyhow, the documentation of observe states it works for
both, batch and streaming. And in batch mode, the helper
class Observation helps reducing code and avoiding repetition.

The PySpark implementation of the Observation class can
implement *all* methods by merely calling into their JVM
counterpart, where the locking, listening, registration and
un-registration happens. I think this qualifies as: "all the
logic happens in the JVM". All that is transferred to Python
is a row's data. No listeners needed.

Enrico



Am 16.03.21 um 00:13 schrieb Jungtaek Lim:

If I remember correctly, the major audience of the "observe"
API is Structured Streaming, micro-batch mode. From the
example, the abstraction in 2 isn't something working with
Structured Streaming. It could be still done with callback,
but it remains the question how much complexity is hidden
from abstraction.

I see you're focusing on PySpark - I'm not sure whether
there's intention on not exposing query listener / streaming
query listener to PySpark, but if there's some valid reason
to do so, I'm not sure we do like to expose them to PySpark
in any way. 2 isn't making sense to me with PySpark - how do
you ensure all the logic is happening in the JVM and you can
leverage these values from PySpark?
(I see there's support for listeners with DStream in
PySpark, so there might be reasons not to add the same for
SQL/SS. Probably a lesson learned?)


    On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack
mailto:m...@enrico.minack.dev>> wrote:

Hi Spark-Devs,

the observable metrics that have been added to the
Dataset API in 3.0.0 are a great improvement over the
Accumulator APIs that seem to have much weaker
guarantees. I have two questions regarding follow-up
contributions:

*1. Add observe to Python ***DataFrame**

As I can see from master branch, there is no equivalent
in the Python API. Is this something planned to happen,
or is it missing because there are not listeners in
PySpark which renders this method useless in Python. I
would be happy to contribute here.

*2. Add Observation class to simplify result access
*

The Dataset.observe method requires users to register
listeners

<https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#observe(name:String,expr:org.apache.spark.sql.Column,exprs:org.apache.spark.sql.Column*):org.apache.spark.sql.Dataset[T]>
with QueryExecutionListener or StreamingQUeryListener to
obtain the result. I think for simple setups, this could
  

Re: Observable Metrics on Spark Datasets

2021-03-19 Thread Enrico Minack

I'll sketch out a PR so we can talk code and move the discussion there.



Am 18.03.21 um 14:55 schrieb Wenchen Fan:
I think a listener-based API makes sense for streaming (since you need 
to keep watching the result), but may not be very reasonable for batch 
queries (you only get the result once). The idea of Observation looks 
good, but we should define what happens if `observation.get` is called 
before the batch query finishes.


Can we have a PR for it so that we can have more detailed discussions?

On Tue, Mar 16, 2021 at 3:59 PM Jungtaek Lim 
mailto:kabhwan.opensou...@gmail.com>> 
wrote:


Please follow up the discussion in the origin PR.
https://github.com/apache/spark/pull/26127
<https://github.com/apache/spark/pull/26127>

Dataset.observe() relies on the query listener for the batch query
which is an "unstable" API - that's why we decided to not add an
example for the batch query. For streaming query, it relies on the
streaming query listener which is a stable API. That said,
personally I'd consider the new API to be fit to the streaming
query first, and see whether it fits to the batch query as well.

If we found Dataset.observe() to be useful enough on the batch
query, we'd probably be better to discuss how to provide these
metrics against a stable API (so that Scala users could leverage
it), and look back later for PySpark. That looks to be the first
one to do if we have a consensus on the usefulness of observable
metrics on batch query.


On Tue, Mar 16, 2021 at 4:17 PM Enrico Minack
mailto:m...@enrico.minack.dev>> wrote:

I am focusing on batch mode, not streaming mode. I would argue
that Dataset.observe() is equally useful for large batch
processing. If you need some motivating use cases, please let
me know.

Anyhow, the documentation of observe states it works for both,
batch and streaming. And in batch mode, the helper class
Observation helps reducing code and avoiding repetition.

The PySpark implementation of the Observation class can
implement *all* methods by merely calling into their JVM
counterpart, where the locking, listening, registration and
un-registration happens. I think this qualifies as: "all the
logic happens in the JVM". All that is transferred to Python
is a row's data. No listeners needed.

Enrico



Am 16.03.21 um 00:13 schrieb Jungtaek Lim:

If I remember correctly, the major audience of the "observe"
API is Structured Streaming, micro-batch mode. From the
example, the abstraction in 2 isn't something working with
Structured Streaming. It could be still done with callback,
but it remains the question how much complexity is hidden
from abstraction.

I see you're focusing on PySpark - I'm not sure whether
there's intention on not exposing query listener / streaming
query listener to PySpark, but if there's some valid reason
to do so, I'm not sure we do like to expose them to PySpark
in any way. 2 isn't making sense to me with PySpark - how do
you ensure all the logic is happening in the JVM and you can
leverage these values from PySpark?
(I see there's support for listeners with DStream in PySpark,
so there might be reasons not to add the same for SQL/SS.
Probably a lesson learned?)


    On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack
mailto:m...@enrico.minack.dev>> wrote:

Hi Spark-Devs,

the observable metrics that have been added to the
Dataset API in 3.0.0 are a great improvement over the
Accumulator APIs that seem to have much weaker
guarantees. I have two questions regarding follow-up
contributions:

*1. Add observe to Python ***DataFrame**

As I can see from master branch, there is no equivalent
in the Python API. Is this something planned to happen,
or is it missing because there are not listeners in
PySpark which renders this method useless in Python. I
would be happy to contribute here.

*2. Add Observation class to simplify result access
*

The Dataset.observe method requires users to register
listeners

<https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#observe(name:String,expr:org.apache.spark.sql.Column,exprs:org.apache.spark.sql.Column*):org.apache.spark.sql.Dataset[T]>
with QueryExecutionListener or StreamingQUeryListener to
obtain the result. I think for simple setups, this could
be hidden behind a common helper class here called
Observation, which reduces the usage of observe to three
 

Re: Observable Metrics on Spark Datasets

2021-03-16 Thread Enrico Minack
I am focusing on batch mode, not streaming mode. I would argue that 
Dataset.observe() is equally useful for large batch processing. If you 
need some motivating use cases, please let me know.


Anyhow, the documentation of observe states it works for both, batch and 
streaming. And in batch mode, the helper class Observation helps 
reducing code and avoiding repetition.


The PySpark implementation of the Observation class can implement *all* 
methods by merely calling into their JVM counterpart, where the locking, 
listening, registration and un-registration happens. I think this 
qualifies as: "all the logic happens in the JVM". All that is 
transferred to Python is a row's data. No listeners needed.


Enrico



Am 16.03.21 um 00:13 schrieb Jungtaek Lim:
If I remember correctly, the major audience of the "observe" API is 
Structured Streaming, micro-batch mode. From the example, the 
abstraction in 2 isn't something working with Structured Streaming. It 
could be still done with callback, but it remains the question how 
much complexity is hidden from abstraction.


I see you're focusing on PySpark - I'm not sure whether there's 
intention on not exposing query listener / streaming query listener to 
PySpark, but if there's some valid reason to do so, I'm not sure we do 
like to expose them to PySpark in any way. 2 isn't making sense to me 
with PySpark - how do you ensure all the logic is happening in the JVM 
and you can leverage these values from PySpark?
(I see there's support for listeners with DStream in PySpark, so there 
might be reasons not to add the same for SQL/SS. Probably a lesson 
learned?)



On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack <mailto:m...@enrico.minack.dev>> wrote:


Hi Spark-Devs,

the observable metrics that have been added to the Dataset API in
3.0.0 are a great improvement over the Accumulator APIs that seem
to have much weaker guarantees. I have two questions regarding
follow-up contributions:

*1. Add observe to Python ***DataFrame**

As I can see from master branch, there is no equivalent in the
Python API. Is this something planned to happen, or is it missing
because there are not listeners in PySpark which renders this
method useless in Python. I would be happy to contribute here.

*2. Add Observation class to simplify result access
*

The Dataset.observe method requires users to register listeners

<https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#observe(name:String,expr:org.apache.spark.sql.Column,exprs:org.apache.spark.sql.Column*):org.apache.spark.sql.Dataset[T]>
with QueryExecutionListener or StreamingQUeryListener to obtain
the result. I think for simple setups, this could be hidden behind
a common helper class here called Observation, which reduces the
usage of observe to three lines of code:

// our Dataset (this does not count as a line of code) val df =Seq((1, "a"), (2, "b"), (4, "c"), (8, 
"d")).toDF("id", "value")

// define the observation we want to make val observation =Observation("stats", 
count($"id"), sum($"id"))

// add the observation to the Dataset and execute an action on it
val cnt = df.observe(observation).count()

// retrieve the result assert(observation.get ===Row(4, 15))

The Observation class can handle the registration and
de-registration of the listener, as well as properly accessing the
result across thread boundaries.

With *2.*, the observe method can be added to PySpark without
introducing listeners there at all. All the logic is happening in
the JVM.

Thanks for your thoughts on this.

Regards,
Enrico



Observable Metrics on Spark Datasets

2021-03-15 Thread Enrico Minack

Hi Spark-Devs,

the observable metrics that have been added to the Dataset API in 3.0.0 
are a great improvement over the Accumulator APIs that seem to have much 
weaker guarantees. I have two questions regarding follow-up contributions:


*1. Add observe to Python ***DataFrame**

As I can see from master branch, there is no equivalent in the Python 
API. Is this something planned to happen, or is it missing because there 
are not listeners in PySpark which renders this method useless in 
Python. I would be happy to contribute here.


*2. Add Observation class to simplify result access
*

The Dataset.observe method requires users to register listeners 
 
with QueryExecutionListener or StreamingQUeryListener to obtain the 
result. I think for simple setups, this could be hidden behind a common 
helper class here called Observation, which reduces the usage of observe 
to three lines of code:


// our Dataset (this does not count as a line of code) val df =Seq((1, "a"), (2, "b"), (4, "c"), (8, 
"d")).toDF("id", "value")

// define the observation we want to make val observation =Observation("stats", 
count($"id"), sum($"id"))

// add the observation to the Dataset and execute an action on it val cnt = 
df.observe(observation).count()

// retrieve the result assert(observation.get ===Row(4, 15))

The Observation class can handle the registration and de-registration of 
the listener, as well as properly accessing the result across thread 
boundaries.


With *2.*, the observe method can be added to PySpark without 
introducing listeners there at all. All the logic is happening in the JVM.


Thanks for your thoughts on this.

Regards,
Enrico



Re: Auto-closing PRs or How to get reviewers' attention

2021-02-23 Thread Enrico Minack

Am 18.02.21 um 16:34 schrieb Sean Owen:
One other aspect is that a committer is taking some degree of 
responsibility for merging a change, so the ask is more than just a 
few minutes of eyeballing. If it breaks something the merger pretty 
much owns resolving it, and, the whole project owns any consequence of 
the change for the future.


I think this explains the hesitation pretty well: Committers take 
ownership of the change. It is understandable that PRs then have to be 
very convincing with low risk/benefit ratio.


Are there plans or initiatives to proactively widen the base of 
committers to mitigate the current situation?


Enrico


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Auto-closing PRs or How to get reviewers' attention

2021-02-18 Thread Enrico Minack

Hi Spark Developers,

I have a fundamental question on the process of contributing to Apache 
Spark from outside the circle of committers.


I have gone through a number of pull requests and I always found it hard 
to get feedback, especially from committers. I understand there is a 
very high competition for getting attention of those few committers. 
Given Spark's code base is so huge, only very few people will feel 
comfortable approving code changes for a specific code section. Still, 
the motivation of those that want to contribute suffers from this.


In particular I am getting annoyed by the auto-closing PR feature on 
GitHub. I understand the usefulness of this feature for such a frequent 
project, but I personally am impacted by the weaknesses of this 
approach. I hope, this can be improved.


The feature first warns in advance that it is "... closing this PR 
because it hasn't been updated in a while". This comment looks a bit 
silly in situations where the contributor is waiting for committers' 
feedback.


*What is the approved way to ...*

*... prevent it from being auto-closed?* Committing and commenting to 
the PR does not prevent it from being closed the next day.


*...**re-open it? *The comment says "If you'd like to revive this PR, 
please reopen it ...", but there is no re-open button anywhere on the PR!


*... remove the Stale tag?* The comment says "...  ask a committer to 
remove the Stale tag!". Where can I find a list of committers and their 
contact details? What is the best way to contact them? E-Mail? 
Mentioning them in a PR comment?


*... find the right committer to review a PR?* The contributors page 
states "ping likely reviewers", but it does not state how to identify 
likely reviewers. Do you recommend git-blaming the relevant code 
section? What if those committers are not available any more? Whom to 
ask next?


*... contact committers to get their attention?* Cc'ing them in PR 
comments? Sending E-Mails? Doesn't that contribute to their cognitive load?


What is the expected contributor's response to a PR that does not get 
feedback? Giving up?


Are there processes in place to increase the probability PRs do not get 
forgotten, auto-closed and lost?



This is not about my specific pull requests or reviewers of those. I 
appreciate their time and engagement.
This is about the general process of getting feedback and needed 
improvements for it in order to increase contributor community happiness.


Cheers,
Enrico



Re: is there any tool to visualize the spark physical plan or spark plan

2020-05-02 Thread Enrico Minack

Kelly Zhang,

You can add a SparkListenerto your spark context:

sparkContext.addSparkListener(newSparkListener{})


That one can override onTaskEnd, which provides you a 
SparkListenerTaskEnd for each task. That instance provides you access to 
the metrics.


See:

- 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/scheduler/SparkListener.html#onTaskEnd-org.apache.spark.scheduler.SparkListenerTaskEnd-
- 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/scheduler/SparkListenerTaskEnd.html#taskMetrics--


Regards,
Enrico


Am 02.05.20 um 00:45 schrieb zhangliyun:




Hi Wenchen Fan:
 thanks for reply.  in the link, i saw sql metrics which is very userful.
```
SQL metrics

The metrics of SQL operators are shown in the block of physical 
operators. The SQL metrics can be useful when we want to dive into the 
execution details of each operator. For example, “number of output 
rows” can answer how many rows are output after a Filter operator, 
“shuffle bytes written total” in an Exchange operator shows the number 
of bytes written by a shuffle.


Here is the list of SQL metrics:



  my question is except reading these metrics in the spark web ui., is 
there any way to read the metrics in driver side by code?



Best regards

Kelly Zhang



At 2020-04-30 21:38:56, "Wenchen Fan"  wrote:

Does the Spark SQL web UI work for you?
https://spark.apache.org/docs/3.0.0-preview/web-ui.html#sql-tab

On Thu, Apr 30, 2020 at 5:30 PM Manu Zhang
mailto:owenzhang1...@gmail.com>> wrote:

Hi Kelly,

If you can parse event log, then try listening on
`SparkListenerSQLExecutionStart` event and build a
`SparkPlanGraph` like

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala#L306.

`SparkPlanGraph` has a `makeDotFile` method  where you can
write out a `.dot` file and visualize it with Graphviz tools,
e.g. http://www.webgraphviz.com/

Thanks,
Manu

On Thu, Apr 30, 2020 at 3:21 PM zhangliyun mailto:kelly...@126.com>> wrote:

Hi all
  i want to  ask a question is there any tool to visualize
the spark physical plan or spark plan? sometimes the
physical plan is very long so it is difficult to view it.

Best Regards
KellyZhang








Re: Need to order iterator values in spark dataframe

2020-03-26 Thread Enrico Minack

Abhinav,

you can repartition by your key, then sortWithinPartition, and the 
groupByKey. Since data are already hash-partitioned by key, Spark should 
not shuffle the data hence change the sort wihtin each partition:


ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")

Enrico


Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:


Hi,

I have a dataframe which has data like:

key                         |    code    |    code_value
1                            |    c1        |    11
1                            |    c2        |    12
1                            |    c2        |    9
1                            |    c3        |    12
1                            |    c2        |    13
1                            |    c2        |    14
1                            |    c4        |    12
1                            |    c2        |    15
1                            |    c1        |    12


I need to group the data based on key and then apply some custom logic 
on every of the value I got by grouping. So I did this:


lets suppose it is in a dataframe df.

*case class key_class(key: string, code: string, code_value: string)*


df
.as[key_class]
.groupByKey(_.key)
.mapGroups {
  (x, groupedValues) =>
    val status = groupedValues.map(row => {
  // do some custom logic on row
  ("SUCCESS")
    }).toList

}.toDF("status")


The issue with above approach is the values I get after applying 
groupByKey are not sorted/ordered. I want the values to be sorted by 
the column 'code'.


There is a way to do this:

1. get them in a list and then apply sort ==> this will result in OOM 
if the iterartor is too big.


2. I think some how to apply the secondary sort, but problem with that 
approach is I have to keep track of the key change.


3. sortWithinPartitions cannot be applied because groupBy will mess up 
the order.


4. Another approach is:

df
.as[key_class]
.sort("key").sort("code")
.map {
 // do stuff here
}

but here also I have to keep track of the key change within map 
function, and sometimes this also overflows if the keys are skewed.



_/*So is there any way in which I can get the values sorted after 
grouping them by a key.??*/_


_/*
*/_

_/*Thanks,*/_

_/*Abhinav
*/_





Re: comparable and orderable CalendarInterval

2020-03-05 Thread Enrico Minack
There is another feature missing for CalendarInterval, which is related 
to comparability: measure the length of an interval.


Would be nice if you could access the length of an interval, than you 
could compute something like this:


|Seq((Timestamp.valueOf("2020-02-01 12:00:00"), 
Timestamp.valueOf("2020-02-01 13:30:25"))) .toDF("start", "end") 
.withColumn("interval", $"end" - $"start") .withColumn("interval [h]", 
*INTERVAL LENGTH IN HOURS*) .withColumn("rate [€/h]", lit(1.45)) 
.withColumn("price [€]", $"interval [h]" * $"rate [€/h]") .show(false) 
+---+---+-+--+--+--+ 
|start |end |interval |interval [h] |rate [€/h]|price [€] | 
+---+---+-+--+--+--+ 
|2020-02-01 12:00:00|2020-02-01 13:30:25|1 hours 30 minutes 25 
seconds|1.5069|1.45 |2.18506943| 
+---+---+-+--+--+--+ 
|



The length of an interval can be measured by dividing it with the length 
of your measuring unit, e.g. "1 hour":


||$"interval" / lit("1 hour").cast(CalendarIntervalType)| |


Which brings us to CalendarInterval division: 
https://github.com/apache/spark/pull/27805


Enrico


Am 11.02.20 um 21:09 schrieb Enrico Minack:
I compute the difference of two timestamps and compare them with a 
constant interval:


Seq(("2019-01-02 12:00:00", "2019-01-02 13:30:00"))
  .toDF("start", "end")
.select($"start".cast(TimestampType), $"end".cast(TimestampType))
  .select($"start", $"end", ($"end" - $"start").as("diff"))
  .where($"diff" < lit("INTERVAL 2 HOUR").cast(CalendarIntervalType))
  .show

Coming from timestamps, the interval should have correct hours 
(millisecond component), so comparing it with the "right kinds of 
intervals" should always be correct.


Enrico


Am 11.02.20 um 17:06 schrieb Wenchen Fan:
What's your use case to compare intervals? It's tricky in Spark as 
there is only one interval type and you can't really compare one 
month with 30 days.


On Wed, Feb 12, 2020 at 12:01 AM Enrico Minack 
mailto:m...@enrico.minack.dev>> wrote:


Hi Devs,

I would like to know what is the current roadmap of making
CalendarInterval comparable and orderable again (SPARK-29679,
SPARK-29385, #26337).

With #27262, this got reverted but SPARK-30551 does not mention
how to
go forward in this matter. I have found SPARK-28494, but this
seems to
be stale.

While I find it useful to compare such intervals, I cannot find a
way to
work around the missing comparability. Is there a way to get,
e.g. the
seconds that an interval represents to be able to compare
intervals? In
org.apache.spark.sql.catalyst.util.IntervalUtils there are
methods like
getEpoch or getDuration, which I cannot see are exposed to SQL or
in the
org.apache.spark.sql.functions package.

Thanks for the insights,
Enrico


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
<mailto:dev-unsubscr...@spark.apache.org>







[SPARK-30957][SQL] Null-safe variant of Dataset.join(Dataset[_], Seq[String])

2020-02-26 Thread Enrico Minack
I have created a jira to track this request: 
https://issues.apache.org/jira/browse/SPARK-30957


Enrico

Am 08.02.20 um 16:56 schrieb Enrico Minack:


Hi Devs,

I am forwarding this from the user mailing list. I agree that the <=> 
version of join(Dataset[_], Seq[String]) would be useful.


Does any PMC consider this useful enough to be added to the Dataset 
API? I'd be happy to create a PR in that case.


Enrico



 Weitergeleitete Nachricht 
Betreff:dataframe null safe joins given a list of columns
Datum:  Thu, 6 Feb 2020 12:45:11 +
Von:Marcelo Valle 
An: user @spark 



I was surprised I couldn't find a way of solving this in spark, as it 
must be a very common problem for users. Then I decided to ask here.


Consider the code bellow:

```
val joinColumns = Seq("a", "b")
val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null, 
"c4")).toDF("a", "b", "c")
val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null, 
"d4")).toDF("a", "b", "d")

df1.join(df2, joinColumns).show()
```

The output is :

```
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
| a1| b1| c1| d1|
+---+---+---+---+
```

But I want it to be:

```
+---+-+---+---+
|  a|    b|  c|  d|
+---+-+---+---+
| a1|   b1| c1| d1|
| a4| null| c4| d4|
+---+-+---+---+
```

The join syntax of `df1.join(df2, joinColumns)` has some advantages, 
as it doesn't create duplicate columns by default. However, it uses 
the operator `===` to join, not the null safe one `<=>`.


Using the following syntax:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
```

Would produce:

```
+---++---+---++---+
|  a|   b|  c|  a|   b|  d|
+---++---+---++---+
| a1|  b1| c1| a1|  b1| d1|
| a4|null| c4| a4|null| d4|
+---++---+---++---+
```

So to get the result I really want, I must do:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> 
df2("b")).drop(df2("a")).drop(df2("b")).show()

+---++---+---+
|  a|   b|  c|  d|
+---++---+---+
| a1|  b1| c1| d1|
| a4|null| c4| d4|
+---++---+---+
```

Which works, but is really verbose, especially when you have many join 
columns.


Is there a better way of solving this without needing a 
utility method? This same problem is something I find in every spark 
project.




This email is confidential [and may be protected by legal privilege]. 
If you are not the intended recipient, please do not copy or disclose 
its content but contact the sender immediately upon receipt.


KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, 
United Kingdom






Re: comparable and orderable CalendarInterval

2020-02-11 Thread Enrico Minack
I compute the difference of two timestamps and compare them with a 
constant interval:


Seq(("2019-01-02 12:00:00", "2019-01-02 13:30:00"))
  .toDF("start", "end")
  .select($"start".cast(TimestampType), $"end".cast(TimestampType))
  .select($"start", $"end", ($"end" - $"start").as("diff"))
  .where($"diff" < lit("INTERVAL 2 HOUR").cast(CalendarIntervalType))
  .show

Coming from timestamps, the interval should have correct hours 
(millisecond component), so comparing it with the "right kinds of 
intervals" should always be correct.


Enrico


Am 11.02.20 um 17:06 schrieb Wenchen Fan:
What's your use case to compare intervals? It's tricky in Spark as 
there is only one interval type and you can't really compare one month 
with 30 days.


On Wed, Feb 12, 2020 at 12:01 AM Enrico Minack <mailto:m...@enrico.minack.dev>> wrote:


Hi Devs,

I would like to know what is the current roadmap of making
CalendarInterval comparable and orderable again (SPARK-29679,
SPARK-29385, #26337).

With #27262, this got reverted but SPARK-30551 does not mention
how to
go forward in this matter. I have found SPARK-28494, but this
seems to
be stale.

While I find it useful to compare such intervals, I cannot find a
way to
work around the missing comparability. Is there a way to get, e.g.
the
seconds that an interval represents to be able to compare
intervals? In
org.apache.spark.sql.catalyst.util.IntervalUtils there are methods
like
getEpoch or getDuration, which I cannot see are exposed to SQL or
in the
org.apache.spark.sql.functions package.

Thanks for the insights,
Enrico


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
<mailto:dev-unsubscr...@spark.apache.org>





comparable and orderable CalendarInterval

2020-02-11 Thread Enrico Minack

Hi Devs,

I would like to know what is the current roadmap of making 
CalendarInterval comparable and orderable again (SPARK-29679, 
SPARK-29385, #26337).


With #27262, this got reverted but SPARK-30551 does not mention how to 
go forward in this matter. I have found SPARK-28494, but this seems to 
be stale.


While I find it useful to compare such intervals, I cannot find a way to 
work around the missing comparability. Is there a way to get, e.g. the 
seconds that an interval represents to be able to compare intervals? In 
org.apache.spark.sql.catalyst.util.IntervalUtils there are methods like 
getEpoch or getDuration, which I cannot see are exposed to SQL or in the 
org.apache.spark.sql.functions package.


Thanks for the insights,
Enrico


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Fwd: dataframe null safe joins given a list of columns

2020-02-08 Thread Enrico Minack

Hi Devs,

I am forwarding this from the user mailing list. I agree that the <=> 
version of join(Dataset[_], Seq[String]) would be useful.


Does any PMC consider this useful enough to be added to the Dataset API? 
I'd be happy to create a PR in that case.


Enrico



 Weitergeleitete Nachricht 
Betreff:dataframe null safe joins given a list of columns
Datum:  Thu, 6 Feb 2020 12:45:11 +
Von:Marcelo Valle 
An: user @spark 



I was surprised I couldn't find a way of solving this in spark, as it 
must be a very common problem for users. Then I decided to ask here.


Consider the code bellow:

```
val joinColumns = Seq("a", "b")
val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null, 
"c4")).toDF("a", "b", "c")
val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null, 
"d4")).toDF("a", "b", "d")

df1.join(df2, joinColumns).show()
```

The output is :

```
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
| a1| b1| c1| d1|
+---+---+---+---+
```

But I want it to be:

```
+---+-+---+---+
|  a|    b|  c|  d|
+---+-+---+---+
| a1|   b1| c1| d1|
| a4| null| c4| d4|
+---+-+---+---+
```

The join syntax of `df1.join(df2, joinColumns)` has some advantages, as 
it doesn't create duplicate columns by default. However, it uses the 
operator `===` to join, not the null safe one `<=>`.


Using the following syntax:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
```

Would produce:

```
+---++---+---++---+
|  a|   b|  c|  a|   b|  d|
+---++---+---++---+
| a1|  b1| c1| a1|  b1| d1|
| a4|null| c4| a4|null| d4|
+---++---+---++---+
```

So to get the result I really want, I must do:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> 
df2("b")).drop(df2("a")).drop(df2("b")).show()

+---++---+---+
|  a|   b|  c|  d|
+---++---+---+
| a1|  b1| c1| d1|
| a4|null| c4| d4|
+---++---+---+
```

Which works, but is really verbose, especially when you have many join 
columns.


Is there a better way of solving this without needing a utility method? 
This same problem is something I find in every spark project.




This email is confidential [and may be protected by legal privilege]. If 
you are not the intended recipient, please do not copy or disclose its 
content but contact the sender immediately upon receipt.


KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, 
United Kingdom




Re: [SPARK-30319][SQL] Add a stricter version of as[T]

2020-01-08 Thread Enrico Minack
Yes, as[T] is lazy as any transformation is, but in terms of data 
processing not schema. You seem to imply the as[T] is lazy in terms of 
the schema, where I do no know of any other transformation that behaves 
like this.


Your proposed solution works, because the map transformation returns the 
right schema, though it is also a lazy transformation. The as[T] should 
behave like this too.


The map transformation is a quick fix in terms of code length, but it 
materializes the data as instances of T, which introduces a prohibitive 
deserialization / serialization round trip for no good reason:


I think returning the right schema does not need to touch any data and 
should be as lightweight as a projection.


Enrico


Am 07.01.20 um 10:13 schrieb Wenchen Fan:
I think it's simply because as[T] is lazy. You will see the right 
schema if you do `df.as <http://df.as>[T].map(identity)`.




On Tue, Jan 7, 2020 at 4:42 PM Enrico Minack <mailto:m...@enrico.minack.dev>> wrote:


Hi Devs,

I'd like to propose a stricter version of as[T]. Given the
interface def as[T](): Dataset[T], it is counter-intuitive that
the schema of the returned Dataset[T] is not agnostic to the
schema of the originating Dataset. The schema should always be
derived only from T.

I am proposing a stricter version so that user code does not need
to pair an .as[T] with a select(schemaOfT.fields.map(col(_.name)):
_*) whenever your code expects Dataset[T] to really contain only
columns of T.

https://github.com/apache/spark/pull/26969

Regards,
Enrico





[SPARK-30319][SQL] Add a stricter version of as[T]

2020-01-07 Thread Enrico Minack

Hi Devs,

I'd like to propose a stricter version of as[T]. Given the interface def 
as[T](): Dataset[T], it is counter-intuitive that the schema of the 
returned Dataset[T] is not agnostic to the schema of the originating 
Dataset. The schema should always be derived only from T.


I am proposing a stricter version so that user code does not need to 
pair an .as[T] with a select(schemaOfT.fields.map(col(_.name)): _*) 
whenever your code expects Dataset[T] to really contain only columns of T.


https://github.com/apache/spark/pull/26969

Regards,
Enrico



[SPARK-30296][SQL] Add Dataset diffing feature

2020-01-07 Thread Enrico Minack

Hi Devs,

I'd like to get your thoughts on this Dataset feature proposal. 
Comparing datasets is a central operation when regression testing your 
code changes.


It would be super useful if Spark's Datasets provide this transformation 
natively.


https://github.com/apache/spark/pull/26936

Regards,
Enrico


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [DISCUSS] Expensive deterministic UDFs

2019-11-08 Thread Enrico Minack
I agree that 'non-deterministic' is the right term for what it currently 
does: mark an expression as non-deterministic (returns different values 
for the same input, e.g. rand()), and the optimizer does its best to not 
break semantics when moving expressions around.


In case of expensive deterministic UDFs, or any expensive deterministic 
expression, the optimizer should not multiply effort. Even in case of 
cheap expressions like a * 5, where performance impact is comparably 
small, it simply should not execute things twice. So this is not about 
expensive deterministic expressions but deterministic expressions that 
get referenced multiple times.


Pushing those expressions into other expressions that reference them is 
useful in order to simplify those other expressions, e.g. 
df.withColumn("b", not($"a")).where(not($"b")) will eliminate the double 
negation of a.


So if expressions are referenced multiple times, they should not be 
collapsed, unless referencing expressions get simplified. And then the 
simplification must pay off for evaluating the referenced expression 
twice. This needs some kind of cost-model, or at least heuristics.


In case of UDFs, I think they should never be collapsed because they 
cannot be used to simplify other expressions (can they?). They should 
rather be materialised as close to the first reference as possible. If 
executing the UDF and referencing it multiple times happens in the same 
stage, hence the same generated code, we end up with the perfect 
situation where that materialisation of the result per call is hold in 
memory and processed by all referencing expressions.


Marking UDFs as expensive is not the right approach here, I agree, they 
should simply not be executed multiple times.


Enrico


Am 08.11.19 um 04:26 schrieb Wenchen Fan:
We really need some documents to define what non-deterministic means. 
AFAIK, non-deterministic expressions may produce a different result 
for the same input row, if the already processed input rows are 
different.


The optimizer tries its best to not change the input sequence 
of non-deterministic expressions. For example, `df.select(..., 
nonDeterministicExpr).filter...` can't do filter pushdown. An 
exception is filter condition. For `df.filter(nonDeterministic && 
cond)`, Spark still pushes down `cond` even if it may change the input 
sequence of the first condition. This is to respect the SQL semantic 
that filter conditions ANDed together are order-insensitive. Users 
should write `df.filter(nonDeterministic).filter(cond)` to guarantee 
the order.


For this particular problem, I think it's not only about UDF, but a 
general problem with how Spark collapses Projects.
For example, `df.select('a * 5 as 'b).select('b + 2, 'b + 3)`,  Spark 
optimizes it to `df.select('a * 5 + 2, 'a * 5 + 3)`, and execute 'a * 
5 twice.


I think we should revisit this optimization and think about when we 
can collapse.


On Thu, Nov 7, 2019 at 6:20 PM Rubén Berenguel <mailto:rbereng...@gmail.com>> wrote:


That was very interesting, thanks Enrico.

Sean, IIRC it also prevents push down of the UDF in Catalyst in
some cases.

Regards,

Ruben

> On 7 Nov 2019, at 11:09, Sean Owen mailto:sro...@gmail.com>> wrote:
>
> Interesting, what does non-deterministic do except have this effect?
> aside from the naming, it could be a fine use of this flag if that's
> all it effectively does. I'm not sure I'd introduce another flag
with
> the same semantics just over naming. If anything 'expensive' also
> isn't the right word, more like 'try not to evaluate multiple
times'.
>
> Why isn't caching the answer? I realize it's big, but you can
cache to
> disk. This may be faster than whatever plan reordering has to happen
> to evaluate once.
>
> Usually I'd say, can you redesign your UDF and code to be more
> efficient too? or use a big a cluster if that's really what you
need.
>
> At first look, no I don't think this Spark-side workaround for
naming
> for your use case is worthwhile. There are existing better
solutions.
>
> On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack
mailto:m...@enrico.minack.dev>> wrote:
>>
>> Hi all,
>>
>> Running expensive deterministic UDFs that return complex types,
followed by multiple references to those results cause Spark to
evaluate the UDF multiple times per row. This has been reported
and discussed before: SPARK-18748 SPARK-17728
>>
>>    val f: Int => Array[Int]
>>    val udfF = udf(f)
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .select($"array"(0).as("array0"), $"array"(1).as("a

Re: [SPARK-29176][DISCUSS] Optimization should change join type to CROSS

2019-11-06 Thread Enrico Minack
So you say the optimized inner join with no conditions is also a valid 
query?


Then I agree the optimizer is not breaking the query, hence it is not a bug.

Enrico

Am 06.11.19 um 15:53 schrieb Sean Owen:

You asked for an inner join but it turned into a cross-join. This
might be surprising, hence the error you can disable.
The query is not invalid in any case. It's just stopping you from
doing something you may not meant to, and which may be expensive.
However I think we've already changed the default to enable it in
Spark 3 anyway.

On Wed, Nov 6, 2019 at 8:50 AM Enrico Minack  wrote:

Hi,

I would like to discuss issue SPARK-29176 to see if this is considered a bug 
and if so, to sketch out a fix.

In short, the issue is that a valid inner join with condition gets optimized so 
that no condition is left, but the type is still INNER. Then 
CheckCartesianProducts throws an exception. The type should have changed to 
CROSS when it gets optimized in that way.

I understand that with spark.sql.crossJoin.enabled you can make Spark not throw 
this exception, but I think you should not need this work-around for a valid 
query.

Please let me know what you think about this issue and how I could fix it. It 
might affect more rules than the two given in the Jira ticket.

Thanks,
Enrico

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



[SPARK-29176][DISCUSS] Optimization should change join type to CROSS

2019-11-06 Thread Enrico Minack

Hi,

I would like to discuss issue SPARK-29176 to see if this is considered a 
bug and if so, to sketch out a fix.


In short, the issue is that a valid inner join with condition gets 
optimized so that no condition is left, but the type is still INNER. 
Then CheckCartesianProducts throws an exception. The type should have 
changed to CROSS when it gets optimized in that way.


I understand that with spark.sql.crossJoin.enabled you can make Spark 
not throw this exception, but I think you should not need this 
work-around for a valid query.


Please let me know what you think about this issue and how I could fix 
it. It might affect more rules than the two given in the Jira ticket.


Thanks,
Enrico