[jira] [Commented] (SPARK-29367) pandas udf not working with latest pyarrow release (0.15.0)
[ https://issues.apache.org/jira/browse/SPARK-29367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17053715#comment-17053715 ] Alexander Tronchin-James commented on SPARK-29367: -- The fix suggested above to add ARROW_PRE_0_15_IPC_FORMAT=1 in SPARK_HOME/conf/spark-env.sh did NOT resolve the issue for me. Instead, I needed to set this environment variable in my python environment as described here: https://stackoverflow.com/questions/58269115/how-to-enable-apache-arrow-in-pyspark/58273294#58273294 > pandas udf not working with latest pyarrow release (0.15.0) > --- > > Key: SPARK-29367 > URL: https://issues.apache.org/jira/browse/SPARK-29367 > Project: Spark > Issue Type: Documentation > Components: PySpark >Affects Versions: 2.4.0, 2.4.1, 2.4.3 >Reporter: Julien Peloton >Assignee: Bryan Cutler >Priority: Major > Fix For: 2.4.5, 3.0.0 > > > Hi, > I recently upgraded pyarrow from 0.14 to 0.15 (released on Oct 5th), and my > pyspark jobs using pandas udf are failing with > java.lang.IllegalArgumentException (tested with Spark 2.4.0, 2.4.1, and > 2.4.3). Here is a full example to reproduce the failure with pyarrow 0.15: > {code:python} > from pyspark.sql import SparkSession > from pyspark.sql.functions import pandas_udf, PandasUDFType > from pyspark.sql.types import BooleanType > import pandas as pd > @pandas_udf(BooleanType(), PandasUDFType.SCALAR) > def qualitycuts(nbad: int, rb: float, magdiff: float) -> pd.Series: > """ Apply simple quality cuts > Returns > -- > out: pandas.Series of booleans > Return a Pandas DataFrame with the appropriate flag: false for bad alert, > and true for good alert. > """ > mask = nbad.values == 0 > mask *= rb.values >= 0.55 > mask *= abs(magdiff.values) <= 0.1 > return pd.Series(mask) > spark = SparkSession.builder.getOrCreate() > # Create dummy DF > colnames = ["nbad", "rb", "magdiff"] > df = spark.sparkContext.parallelize( > zip( > [0, 1, 0, 0], > [0.01, 0.02, 0.6, 0.01], > [0.02, 0.05, 0.1, 0.01] > ) > ).toDF(colnames) > df.show() > # Apply cuts > df = df\ > .withColumn("toKeep", qualitycuts(*colnames))\ > .filter("toKeep == true")\ > .drop("toKeep") > # This will fail if latest pyarrow 0.15.0 is used > df.show() > {code} > and the log is: > {code} > Driver stacktrace: > 19/10/07 09:37:49 INFO DAGScheduler: Job 3 failed: showString at > NativeMethodAccessorImpl.java:0, took 0.660523 s > Traceback (most recent call last): > File > "/Users/julien/Documents/workspace/myrepos/fink-broker/test_pyarrow.py", line > 44, in > df.show() > File > "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", > line 378, in show > File > "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", > line 1257, in __call__ > File > "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", > line 63, in deco > File > "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o64.showString. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 > (TID 5, localhost, executor driver): java.lang.IllegalArgumentException > at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) > at > org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) > at > org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) > at > org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) > at > org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) > at > org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) > at > org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65) > at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162) > at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at > org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.(ArrowEvalPythonExec.scala:98) > at >
[jira] [Created] (SPARK-29577) Implement p-value simulation and unit tests for chi2 test
Alexander Tronchin-James created SPARK-29577: Summary: Implement p-value simulation and unit tests for chi2 test Key: SPARK-29577 URL: https://issues.apache.org/jira/browse/SPARK-29577 Project: Spark Issue Type: Improvement Components: ML, MLlib Affects Versions: 2.4.5, 3.0.0 Reporter: Alexander Tronchin-James Spark mllib's chi-squared test does not yet include p-value simulation for the goodness of fit test, and implementing a robust/scaleable approach was non-trivial for us, so we wanted to give this work back to the community for others to use. https://github.com/apache/spark/pull/26197 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14543) SQL/Hive insertInto has unexpected results
[ https://issues.apache.org/jira/browse/SPARK-14543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888246#comment-16888246 ] Alexander Tronchin-James commented on SPARK-14543: -- OK, thanks! > SQL/Hive insertInto has unexpected results > -- > > Key: SPARK-14543 > URL: https://issues.apache.org/jira/browse/SPARK-14543 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Ryan Blue >Assignee: Ryan Blue >Priority: Major > > *Updated description* > There should be an option to match input data to output columns by name. The > API allows operations on tables, which hide the column resolution problem. > It's easy to copy from one table to another without listing the columns, and > in the API it is common to work with columns by name rather than by position. > I think the API should add a way to match columns by name, which is closer to > what users expect. I propose adding something like this: > {code} > CREATE TABLE src (id: bigint, count: int, total: bigint) > CREATE TABLE dst (id: bigint, total: bigint, count: int) > sqlContext.table("src").write.byName.insertInto("dst") > {code} > *Original description* > The Hive write path adds a pre-insertion cast (projection) to reconcile > incoming data columns with the outgoing table schema. Columns are matched by > position and casts are inserted to reconcile the two column schemas. > When columns aren't correctly aligned, this causes unexpected results. I ran > into this by not using a correct {{partitionBy}} call (addressed by > SPARK-14459), which caused an error message that an int could not be cast to > an array. However, if the columns are vaguely compatible, for example string > and float, then no error or warning is produced and data is written to the > wrong columns using unexpected casts (string -> bigint -> float). > A real-world use case that will hit this is when a table definition changes > by adding a column in the middle of a table. Spark SQL statements that copied > from that table to a destination table will then map the columns differently > but insert casts that mask the problem. The last column's data will be > dropped without a reliable warning for the user. > This highlights a few problems: > * Too many or too few incoming data columns should cause an AnalysisException > to be thrown > * Only "safe" casts should be inserted automatically, like int -> long, using > UpCast > * Pre-insertion casts currently ignore extra columns by using zip > * The pre-insertion cast logic differs between Hive's MetastoreRelation and > LogicalRelation > Also, I think there should be an option to match input data to output columns > by name. The API allows operations on tables, which hide the column > resolution problem. It's easy to copy from one table to another without > listing the columns, and in the API it is common to work with columns by name > rather than by position. I think the API should add a way to match columns by > name, which is closer to what users expect. I propose adding something like > this: > {code} > CREATE TABLE src (id: bigint, count: int, total: bigint) > CREATE TABLE dst (id: bigint, total: bigint, count: int) > sqlContext.table("src").write.byName.insertInto("dst") > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14543) SQL/Hive insertInto has unexpected results
[ https://issues.apache.org/jira/browse/SPARK-14543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16887508#comment-16887508 ] Alexander Tronchin-James commented on SPARK-14543: -- Did the calling syntax change for this? I'm using 2.4.x and can't find anything about .byName on writers in the docs, but maybe I'm just bad at searching the docs... > SQL/Hive insertInto has unexpected results > -- > > Key: SPARK-14543 > URL: https://issues.apache.org/jira/browse/SPARK-14543 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Ryan Blue >Assignee: Ryan Blue >Priority: Major > > *Updated description* > There should be an option to match input data to output columns by name. The > API allows operations on tables, which hide the column resolution problem. > It's easy to copy from one table to another without listing the columns, and > in the API it is common to work with columns by name rather than by position. > I think the API should add a way to match columns by name, which is closer to > what users expect. I propose adding something like this: > {code} > CREATE TABLE src (id: bigint, count: int, total: bigint) > CREATE TABLE dst (id: bigint, total: bigint, count: int) > sqlContext.table("src").write.byName.insertInto("dst") > {code} > *Original description* > The Hive write path adds a pre-insertion cast (projection) to reconcile > incoming data columns with the outgoing table schema. Columns are matched by > position and casts are inserted to reconcile the two column schemas. > When columns aren't correctly aligned, this causes unexpected results. I ran > into this by not using a correct {{partitionBy}} call (addressed by > SPARK-14459), which caused an error message that an int could not be cast to > an array. However, if the columns are vaguely compatible, for example string > and float, then no error or warning is produced and data is written to the > wrong columns using unexpected casts (string -> bigint -> float). > A real-world use case that will hit this is when a table definition changes > by adding a column in the middle of a table. Spark SQL statements that copied > from that table to a destination table will then map the columns differently > but insert casts that mask the problem. The last column's data will be > dropped without a reliable warning for the user. > This highlights a few problems: > * Too many or too few incoming data columns should cause an AnalysisException > to be thrown > * Only "safe" casts should be inserted automatically, like int -> long, using > UpCast > * Pre-insertion casts currently ignore extra columns by using zip > * The pre-insertion cast logic differs between Hive's MetastoreRelation and > LogicalRelation > Also, I think there should be an option to match input data to output columns > by name. The API allows operations on tables, which hide the column > resolution problem. It's easy to copy from one table to another without > listing the columns, and in the API it is common to work with columns by name > rather than by position. I think the API should add a way to match columns by > name, which is closer to what users expect. I propose adding something like > this: > {code} > CREATE TABLE src (id: bigint, count: int, total: bigint) > CREATE TABLE dst (id: bigint, total: bigint, count: int) > sqlContext.table("src").write.byName.insertInto("dst") > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18829) Printing to logger
[ https://issues.apache.org/jira/browse/SPARK-18829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16887450#comment-16887450 ] Alexander Tronchin-James commented on SPARK-18829: -- FWIW, the showString method on datasets is private, so it doesn't seem possible to call except by internal Dataset methods. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L295 > Printing to logger > -- > > Key: SPARK-18829 > URL: https://issues.apache.org/jira/browse/SPARK-18829 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.6.2 > Environment: ALL >Reporter: David Hodeffi >Priority: Trivial > Labels: easyfix, patch > Original Estimate: 1h > Remaining Estimate: 1h > > I would like to print dataframe.show or df.explain(true) into log file. > right now the code print to standard output without a way to redirect it. > It also cannot be configured on log4j.properties. > My suggestion is to write to the logger and standard output. > i.e > class DataFrame {.. > override def explain(extended: Boolean): Unit = { > val explain = ExplainCommand(queryExecution.logical, extended = extended) > sqlContext.executePlan(explain).executedPlan.executeCollect().foreach { > // scalastyle:off println > r => { > println(r.getString(0)) > logger.debug(r.getString(0)) > } > } > // scalastyle:on println > } > } > def show(numRows: Int, truncate: Boolean): Unit = { > val str =showString(numRows, truncate) > println(str) > logger.debug(str) > } > } -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12394) Support writing out pre-hash-partitioned data and exploit that in join optimizations to avoid shuffle (i.e. bucketing in Hive)
[ https://issues.apache.org/jira/browse/SPARK-12394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15438461#comment-15438461 ] Alexander Tronchin-James commented on SPARK-12394: -- Awesome news Tejas! The filter feature is secondary AFAIK, and I'd prioritize the sorted-merge bucketed-map (SMB) join if I had the choice. Strong preference for supporting all of inner and left/right/full outer joins between tables with integer multiple differences in the number of buckets, selecting the number of executors (a rational multiple or fraction of the number of buckets), and also selecting the number of emitted buckets. Bonus points for an implementation that automatically applies SMB joins and avoids re-sorts where possible. Maybe a tall order, but we know it can be done. ;-) If we don't get it all in the first pull request we can always iterate. Thanks for pushing on this! > Support writing out pre-hash-partitioned data and exploit that in join > optimizations to avoid shuffle (i.e. bucketing in Hive) > -- > > Key: SPARK-12394 > URL: https://issues.apache.org/jira/browse/SPARK-12394 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Reynold Xin >Assignee: Nong Li > Fix For: 2.0.0 > > Attachments: BucketedTables.pdf > > > In many cases users know ahead of time the columns that they will be joining > or aggregating on. Ideally they should be able to leverage this information > and pre-shuffle the data so that subsequent queries do not require a shuffle. > Hive supports this functionality by allowing the user to define buckets, > which are hash partitioning of the data based on some key. > - Allow the user to specify a set of columns when caching or writing out data > - Allow the user to specify some parallelism > - Shuffle the data when writing / caching such that its distributed by these > columns > - When planning/executing a query, use this distribution to avoid another > shuffle when reading, assuming the join or aggregation is compatible with the > columns specified > - Should work with existing save modes: append, overwrite, etc > - Should work at least with all Hadoops FS data sources > - Should work with any data source when caching -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12394) Support writing out pre-hash-partitioned data and exploit that in join optimizations to avoid shuffle (i.e. bucketing in Hive)
[ https://issues.apache.org/jira/browse/SPARK-12394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15431322#comment-15431322 ] Alexander Tronchin-James commented on SPARK-12394: -- Where can we read about/contribute to efforts for implementing the filter on sorted data and optimized sort merge join strategies mentioned in the attached BucketedTables.pdf? Looking forward to these features! > Support writing out pre-hash-partitioned data and exploit that in join > optimizations to avoid shuffle (i.e. bucketing in Hive) > -- > > Key: SPARK-12394 > URL: https://issues.apache.org/jira/browse/SPARK-12394 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Reynold Xin >Assignee: Nong Li > Fix For: 2.0.0 > > Attachments: BucketedTables.pdf > > > In many cases users know ahead of time the columns that they will be joining > or aggregating on. Ideally they should be able to leverage this information > and pre-shuffle the data so that subsequent queries do not require a shuffle. > Hive supports this functionality by allowing the user to define buckets, > which are hash partitioning of the data based on some key. > - Allow the user to specify a set of columns when caching or writing out data > - Allow the user to specify some parallelism > - Shuffle the data when writing / caching such that its distributed by these > columns > - When planning/executing a query, use this distribution to avoid another > shuffle when reading, assuming the join or aggregation is compatible with the > columns specified > - Should work with existing save modes: append, overwrite, etc > - Should work at least with all Hadoops FS data sources > - Should work with any data source when caching -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org