[jira] [Commented] (SPARK-29367) pandas udf not working with latest pyarrow release (0.15.0)

2020-03-06 Thread Alexander Tronchin-James (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17053715#comment-17053715
 ] 

Alexander Tronchin-James commented on SPARK-29367:
--

The fix suggested above to add ARROW_PRE_0_15_IPC_FORMAT=1 in 
SPARK_HOME/conf/spark-env.sh did NOT resolve the issue for me.

Instead, I needed to set this environment variable in my python environment as 
described here: 
https://stackoverflow.com/questions/58269115/how-to-enable-apache-arrow-in-pyspark/58273294#58273294

> pandas udf not working with latest pyarrow release (0.15.0)
> ---
>
> Key: SPARK-29367
> URL: https://issues.apache.org/jira/browse/SPARK-29367
> Project: Spark
>  Issue Type: Documentation
>  Components: PySpark
>Affects Versions: 2.4.0, 2.4.1, 2.4.3
>Reporter: Julien Peloton
>Assignee: Bryan Cutler
>Priority: Major
> Fix For: 2.4.5, 3.0.0
>
>
> Hi,
> I recently upgraded pyarrow from 0.14 to 0.15 (released on Oct 5th), and my 
> pyspark jobs using pandas udf are failing with 
> java.lang.IllegalArgumentException (tested with Spark 2.4.0, 2.4.1, and 
> 2.4.3). Here is a full example to reproduce the failure with pyarrow 0.15:
> {code:python}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> from pyspark.sql.types import BooleanType
> import pandas as pd
> @pandas_udf(BooleanType(), PandasUDFType.SCALAR)
> def qualitycuts(nbad: int, rb: float, magdiff: float) -> pd.Series:
> """ Apply simple quality cuts
> Returns
> --
> out: pandas.Series of booleans
> Return a Pandas DataFrame with the appropriate flag: false for bad alert,
> and true for good alert.
> """
> mask = nbad.values == 0
> mask *= rb.values >= 0.55
> mask *= abs(magdiff.values) <= 0.1
> return pd.Series(mask)
> spark = SparkSession.builder.getOrCreate()
> # Create dummy DF
> colnames = ["nbad", "rb", "magdiff"]
> df = spark.sparkContext.parallelize(
> zip(
> [0, 1, 0, 0],
> [0.01, 0.02, 0.6, 0.01],
> [0.02, 0.05, 0.1, 0.01]
> )
> ).toDF(colnames)
> df.show()
> # Apply cuts
> df = df\
> .withColumn("toKeep", qualitycuts(*colnames))\
> .filter("toKeep == true")\
> .drop("toKeep")
> # This will fail if latest pyarrow 0.15.0 is used
> df.show()
> {code}
> and the log is:
> {code}
> Driver stacktrace:
> 19/10/07 09:37:49 INFO DAGScheduler: Job 3 failed: showString at 
> NativeMethodAccessorImpl.java:0, took 0.660523 s
> Traceback (most recent call last):
>   File 
> "/Users/julien/Documents/workspace/myrepos/fink-broker/test_pyarrow.py", line 
> 44, in 
> df.show()
>   File 
> "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
>  line 378, in show
>   File 
> "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>  line 1257, in __call__
>   File 
> "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>  line 63, in deco
>   File 
> "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
>  line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o64.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 
> (TID 5, localhost, executor driver): java.lang.IllegalArgumentException
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
>   at 
> org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
>   at 
> org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
>   at 
> org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
>   at 
> org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
>   at 
> org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
>   at 
> org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
>   at 
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
>   at 
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
>   at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at 
> org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.(ArrowEvalPythonExec.scala:98)
>   at 
> 

[jira] [Created] (SPARK-29577) Implement p-value simulation and unit tests for chi2 test

2019-10-23 Thread Alexander Tronchin-James (Jira)
Alexander Tronchin-James created SPARK-29577:


 Summary: Implement p-value simulation and unit tests for chi2 test
 Key: SPARK-29577
 URL: https://issues.apache.org/jira/browse/SPARK-29577
 Project: Spark
  Issue Type: Improvement
  Components: ML, MLlib
Affects Versions: 2.4.5, 3.0.0
Reporter: Alexander Tronchin-James


Spark mllib's chi-squared test does not yet include p-value simulation for the 
goodness of fit test, and implementing a robust/scaleable approach was 
non-trivial for us, so we wanted to give this work back to the community for 
others to use.

https://github.com/apache/spark/pull/26197



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14543) SQL/Hive insertInto has unexpected results

2019-07-18 Thread Alexander Tronchin-James (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-14543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888246#comment-16888246
 ] 

Alexander Tronchin-James commented on SPARK-14543:
--

OK, thanks!

> SQL/Hive insertInto has unexpected results
> --
>
> Key: SPARK-14543
> URL: https://issues.apache.org/jira/browse/SPARK-14543
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Ryan Blue
>Assignee: Ryan Blue
>Priority: Major
>
> *Updated description*
> There should be an option to match input data to output columns by name. The 
> API allows operations on tables, which hide the column resolution problem. 
> It's easy to copy from one table to another without listing the columns, and 
> in the API it is common to work with columns by name rather than by position. 
> I think the API should add a way to match columns by name, which is closer to 
> what users expect. I propose adding something like this:
> {code}
> CREATE TABLE src (id: bigint, count: int, total: bigint)
> CREATE TABLE dst (id: bigint, total: bigint, count: int)
> sqlContext.table("src").write.byName.insertInto("dst")
> {code}
> *Original description*
> The Hive write path adds a pre-insertion cast (projection) to reconcile 
> incoming data columns with the outgoing table schema. Columns are matched by 
> position and casts are inserted to reconcile the two column schemas.
> When columns aren't correctly aligned, this causes unexpected results. I ran 
> into this by not using a correct {{partitionBy}} call (addressed by 
> SPARK-14459), which caused an error message that an int could not be cast to 
> an array. However, if the columns are vaguely compatible, for example string 
> and float, then no error or warning is produced and data is written to the 
> wrong columns using unexpected casts (string -> bigint -> float).
> A real-world use case that will hit this is when a table definition changes 
> by adding a column in the middle of a table. Spark SQL statements that copied 
> from that table to a destination table will then map the columns differently 
> but insert casts that mask the problem. The last column's data will be 
> dropped without a reliable warning for the user.
> This highlights a few problems:
> * Too many or too few incoming data columns should cause an AnalysisException 
> to be thrown
> * Only "safe" casts should be inserted automatically, like int -> long, using 
> UpCast
> * Pre-insertion casts currently ignore extra columns by using zip
> * The pre-insertion cast logic differs between Hive's MetastoreRelation and 
> LogicalRelation
> Also, I think there should be an option to match input data to output columns 
> by name. The API allows operations on tables, which hide the column 
> resolution problem. It's easy to copy from one table to another without 
> listing the columns, and in the API it is common to work with columns by name 
> rather than by position. I think the API should add a way to match columns by 
> name, which is closer to what users expect. I propose adding something like 
> this:
> {code}
> CREATE TABLE src (id: bigint, count: int, total: bigint)
> CREATE TABLE dst (id: bigint, total: bigint, count: int)
> sqlContext.table("src").write.byName.insertInto("dst")
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14543) SQL/Hive insertInto has unexpected results

2019-07-17 Thread Alexander Tronchin-James (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-14543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16887508#comment-16887508
 ] 

Alexander Tronchin-James commented on SPARK-14543:
--

Did the calling syntax change for this? I'm using 2.4.x and can't find anything 
about .byName on writers in the docs, but maybe I'm just bad at searching the 
docs...

> SQL/Hive insertInto has unexpected results
> --
>
> Key: SPARK-14543
> URL: https://issues.apache.org/jira/browse/SPARK-14543
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Ryan Blue
>Assignee: Ryan Blue
>Priority: Major
>
> *Updated description*
> There should be an option to match input data to output columns by name. The 
> API allows operations on tables, which hide the column resolution problem. 
> It's easy to copy from one table to another without listing the columns, and 
> in the API it is common to work with columns by name rather than by position. 
> I think the API should add a way to match columns by name, which is closer to 
> what users expect. I propose adding something like this:
> {code}
> CREATE TABLE src (id: bigint, count: int, total: bigint)
> CREATE TABLE dst (id: bigint, total: bigint, count: int)
> sqlContext.table("src").write.byName.insertInto("dst")
> {code}
> *Original description*
> The Hive write path adds a pre-insertion cast (projection) to reconcile 
> incoming data columns with the outgoing table schema. Columns are matched by 
> position and casts are inserted to reconcile the two column schemas.
> When columns aren't correctly aligned, this causes unexpected results. I ran 
> into this by not using a correct {{partitionBy}} call (addressed by 
> SPARK-14459), which caused an error message that an int could not be cast to 
> an array. However, if the columns are vaguely compatible, for example string 
> and float, then no error or warning is produced and data is written to the 
> wrong columns using unexpected casts (string -> bigint -> float).
> A real-world use case that will hit this is when a table definition changes 
> by adding a column in the middle of a table. Spark SQL statements that copied 
> from that table to a destination table will then map the columns differently 
> but insert casts that mask the problem. The last column's data will be 
> dropped without a reliable warning for the user.
> This highlights a few problems:
> * Too many or too few incoming data columns should cause an AnalysisException 
> to be thrown
> * Only "safe" casts should be inserted automatically, like int -> long, using 
> UpCast
> * Pre-insertion casts currently ignore extra columns by using zip
> * The pre-insertion cast logic differs between Hive's MetastoreRelation and 
> LogicalRelation
> Also, I think there should be an option to match input data to output columns 
> by name. The API allows operations on tables, which hide the column 
> resolution problem. It's easy to copy from one table to another without 
> listing the columns, and in the API it is common to work with columns by name 
> rather than by position. I think the API should add a way to match columns by 
> name, which is closer to what users expect. I propose adding something like 
> this:
> {code}
> CREATE TABLE src (id: bigint, count: int, total: bigint)
> CREATE TABLE dst (id: bigint, total: bigint, count: int)
> sqlContext.table("src").write.byName.insertInto("dst")
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18829) Printing to logger

2019-07-17 Thread Alexander Tronchin-James (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16887450#comment-16887450
 ] 

Alexander Tronchin-James commented on SPARK-18829:
--

FWIW, the showString method on datasets is private, so it doesn't seem possible 
to call except by internal Dataset methods.

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L295

> Printing to logger
> --
>
> Key: SPARK-18829
> URL: https://issues.apache.org/jira/browse/SPARK-18829
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.6.2
> Environment: ALL
>Reporter: David Hodeffi
>Priority: Trivial
>  Labels: easyfix, patch
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> I would like to print dataframe.show or  df.explain(true)  into log file.
> right now the code print to standard output without a way to redirect it.
> It also cannot be configured on log4j.properties.
> My suggestion is to write to the logger and standard output.
> i.e 
> class DataFrame {..
> override def explain(extended: Boolean): Unit = {
> val explain = ExplainCommand(queryExecution.logical, extended = extended)
> sqlContext.executePlan(explain).executedPlan.executeCollect().foreach {
>   // scalastyle:off println
>   r => {
> println(r.getString(0))
> logger.debug(r.getString(0))
>   }
>  }
>   // scalastyle:on println
> }
>   }
> def show(numRows: Int, truncate: Boolean): Unit = {
> val str =showString(numRows, truncate) 
> println(str)
> logger.debug(str)
> }
> }



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12394) Support writing out pre-hash-partitioned data and exploit that in join optimizations to avoid shuffle (i.e. bucketing in Hive)

2016-08-25 Thread Alexander Tronchin-James (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15438461#comment-15438461
 ] 

Alexander Tronchin-James commented on SPARK-12394:
--

Awesome news Tejas!

The filter feature is secondary AFAIK, and I'd prioritize the sorted-merge
bucketed-map (SMB) join if I had the choice. Strong preference for
supporting all of inner and left/right/full outer joins between tables with
integer multiple differences in the number of buckets, selecting the number
of executors (a rational multiple or fraction of the number of buckets),
and also selecting the number of emitted buckets. Bonus points for an
implementation that automatically applies SMB joins and avoids
re-sorts where possible. Maybe a tall order, but we know it can be done.
;-)

If we don't get it all in the first pull request we can always iterate.
Thanks for pushing on this!






> Support writing out pre-hash-partitioned data and exploit that in join 
> optimizations to avoid shuffle (i.e. bucketing in Hive)
> --
>
> Key: SPARK-12394
> URL: https://issues.apache.org/jira/browse/SPARK-12394
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Nong Li
> Fix For: 2.0.0
>
> Attachments: BucketedTables.pdf
>
>
> In many cases users know ahead of time the columns that they will be joining 
> or aggregating on.  Ideally they should be able to leverage this information 
> and pre-shuffle the data so that subsequent queries do not require a shuffle. 
>  Hive supports this functionality by allowing the user to define buckets, 
> which are hash partitioning of the data based on some key.
>  - Allow the user to specify a set of columns when caching or writing out data
>  - Allow the user to specify some parallelism
>  - Shuffle the data when writing / caching such that its distributed by these 
> columns
>  - When planning/executing  a query, use this distribution to avoid another 
> shuffle when reading, assuming the join or aggregation is compatible with the 
> columns specified
>  - Should work with existing save modes: append, overwrite, etc
>  - Should work at least with all Hadoops FS data sources
>  - Should work with any data source when caching



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12394) Support writing out pre-hash-partitioned data and exploit that in join optimizations to avoid shuffle (i.e. bucketing in Hive)

2016-08-22 Thread Alexander Tronchin-James (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15431322#comment-15431322
 ] 

Alexander Tronchin-James commented on SPARK-12394:
--

Where can we read about/contribute to efforts for implementing the filter on 
sorted data and optimized sort merge join strategies mentioned in the attached 
BucketedTables.pdf? Looking forward to these features!

> Support writing out pre-hash-partitioned data and exploit that in join 
> optimizations to avoid shuffle (i.e. bucketing in Hive)
> --
>
> Key: SPARK-12394
> URL: https://issues.apache.org/jira/browse/SPARK-12394
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Nong Li
> Fix For: 2.0.0
>
> Attachments: BucketedTables.pdf
>
>
> In many cases users know ahead of time the columns that they will be joining 
> or aggregating on.  Ideally they should be able to leverage this information 
> and pre-shuffle the data so that subsequent queries do not require a shuffle. 
>  Hive supports this functionality by allowing the user to define buckets, 
> which are hash partitioning of the data based on some key.
>  - Allow the user to specify a set of columns when caching or writing out data
>  - Allow the user to specify some parallelism
>  - Shuffle the data when writing / caching such that its distributed by these 
> columns
>  - When planning/executing  a query, use this distribution to avoid another 
> shuffle when reading, assuming the join or aggregation is compatible with the 
> columns specified
>  - Should work with existing save modes: append, overwrite, etc
>  - Should work at least with all Hadoops FS data sources
>  - Should work with any data source when caching



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org