[jira] [Commented] (SPARK-17896) Dataset groupByKey + reduceGroups fails with codegen-related exception

2016-11-28 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703320#comment-15703320
 ] 

Andrew Ray commented on SPARK-17896:


The given code seems to work in 2.0.2

> Dataset groupByKey + reduceGroups fails with codegen-related exception
> --
>
> Key: SPARK-17896
> URL: https://issues.apache.org/jira/browse/SPARK-17896
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1
> Environment: Databricks, MacOS
>Reporter: Adam Breindel
>
> possible regression: works on 2.0, fails on 2.0.1
> following code raises exception related to wholestage codegen:
> case class Zip(city:String, zip:String, state:String)
> val z1 = Zip("New York", "1", "NY")
> val z2 = Zip("New York", "10001", "NY")
> val z3 = Zip("Chicago", "60606", "IL")
> val zips = sc.parallelize(Seq(z1, z2, z3)).toDS
> zips.groupByKey(_.state).reduceGroups((z1, z2) => Zip("*", z1.zip + " " + 
> z2.zip, z1.state)).show



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11705) Eliminate unnecessary Cartesian Join

2016-12-02 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15715675#comment-15715675
 ] 

Andrew Ray commented on SPARK-11705:


Above example does not have a cartesian product in Spark 2.0.2

> Eliminate unnecessary Cartesian Join
> 
>
> Key: SPARK-11705
> URL: https://issues.apache.org/jira/browse/SPARK-11705
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Zhan Zhang
>
> When we have some queries similar to following (don’t remember the exact 
> form):
> select * from a, b, c, d where a.key1 = c.key1 and b.key2 = c.key2 and c.key3 
> = d.key3
> There will be a cartesian join between a and b. But if we just simply change 
> the table order, for example from a, c, b, d, such cartesian join are 
> eliminated.
> Without such manual tuning, the query will never finish if a, b are big. But 
> we should not relies on such manual optimization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18717) Datasets - crash (compile exception) when mapping to immutable scala map

2016-12-05 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15723499#comment-15723499
 ] 

Andrew Ray commented on SPARK-18717:


Use `scala.collection.Map` as the type in your case class definition (instead 
of the default `scala.collection.immutable.Map`). Supported scala types are 
listed in the doc here: 
https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types 
although this should probably still be fixed or at least have a better 
error/warning.

> Datasets - crash (compile exception) when mapping to immutable scala map
> 
>
> Key: SPARK-18717
> URL: https://issues.apache.org/jira/browse/SPARK-18717
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.2
>Reporter: Damian Momot
>
> {code}
> val spark: SparkSession = ???
> case class Test(id: String, map_test: Map[Long, String])
> spark.sql("CREATE TABLE xyz.map_test (id string, map_test map) 
> STORED AS PARQUET")
> spark.sql("SELECT * FROM xyz.map_test").as[Test].map(t => t).collect()
> {code}
> {code}
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 307, Column 108: No applicable constructor/method found for actual parameters 
> "java.lang.String, scala.collection.Map"; candidates are: 
> "$line14.$read$$iw$$iw$Test(java.lang.String, scala.collection.immutable.Map)"
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18717) Datasets - crash (compile exception) when mapping to immutable scala map

2016-12-05 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15723527#comment-15723527
 ] 

Andrew Ray commented on SPARK-18717:


I have a fix for this, will make a PR in a bit

> Datasets - crash (compile exception) when mapping to immutable scala map
> 
>
> Key: SPARK-18717
> URL: https://issues.apache.org/jira/browse/SPARK-18717
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.2
>Reporter: Damian Momot
>
> {code}
> val spark: SparkSession = ???
> case class Test(id: String, map_test: Map[Long, String])
> spark.sql("CREATE TABLE xyz.map_test (id string, map_test map) 
> STORED AS PARQUET")
> spark.sql("SELECT * FROM xyz.map_test").as[Test].map(t => t).collect()
> {code}
> {code}
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 307, Column 108: No applicable constructor/method found for actual parameters 
> "java.lang.String, scala.collection.Map"; candidates are: 
> "$line14.$read$$iw$$iw$Test(java.lang.String, scala.collection.immutable.Map)"
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17859) persist should not impede with spark's ability to perform a broadcast join.

2016-12-08 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15733247#comment-15733247
 ] 

Andrew Ray commented on SPARK-17859:


this appears to be fixed in 2.0.2

{code}
scala> df1.persist.join(df2 , $"id1" === $"id2" ).explain
== Physical Plan ==
*BroadcastHashJoin [id1#3L], [id2#9L], Inner, BuildRight
:- InMemoryTableScan [id1#3L]
:  :  +- InMemoryRelation [id1#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
:  : :  +- *Project [id#0L AS id1#3L]
:  : : +- *Range (0, 100, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Project [id#6L AS id2#9L]
  +- *Range (0, 1000, step=1, splits=Some(8))
{code}

> persist should not impede with spark's ability to perform a broadcast join.
> ---
>
> Key: SPARK-17859
> URL: https://issues.apache.org/jira/browse/SPARK-17859
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.0.0
> Environment: spark 2.0.0 , Linux RedHat
>Reporter: Franck Tago
>
> I am using Spark 2.0.0 
> My investigation leads me to conclude that calling persist could prevent 
> broadcast join  from happening .
> Example
> Case1: No persist call 
> var  df1 =spark.range(100).select($"id".as("id1"))
> df1: org.apache.spark.sql.DataFrame = [id1: bigint]
>  var df2 =spark.range(1000).select($"id".as("id2"))
> df2: org.apache.spark.sql.DataFrame = [id2: bigint]
>  df1.join(df2 , $"id1" === $"id2" ).explain 
> == Physical Plan ==
> *BroadcastHashJoin [id1#117L], [id2#123L], Inner, BuildRight
> :- *Project [id#114L AS id1#117L]
> :  +- *Range (0, 100, splits=2)
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> false]))
>+- *Project [id#120L AS id2#123L]
>   +- *Range (0, 1000, splits=2)
> Case 2:  persist call 
>  df1.persist.join(df2 , $"id1" === $"id2" ).explain 
> 16/10/10 15:50:21 WARN CacheManager: Asked to cache already cached data.
> == Physical Plan ==
> *SortMergeJoin [id1#3L], [id2#9L], Inner
> :- *Sort [id1#3L ASC], false, 0
> :  +- Exchange hashpartitioning(id1#3L, 10)
> : +- InMemoryTableScan [id1#3L]
> ::  +- InMemoryRelation [id1#3L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> :: :  +- *Project [id#0L AS id1#3L]
> :: : +- *Range (0, 100, splits=2)
> +- *Sort [id2#9L ASC], false, 0
>+- Exchange hashpartitioning(id2#9L, 10)
>   +- InMemoryTableScan [id2#9L]
>  :  +- InMemoryRelation [id2#9L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>  : :  +- *Project [id#6L AS id2#9L]
>  : : +- *Range (0, 1000, splits=2)
> Why does the persist call prevent the broadcast join . 
> My opinion is that it should not .
> I was made aware that the persist call is  lazy and that might have something 
> to do with it , but I still contend that it should not . 
> Losing broadcast joins is really costly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18845) PageRank has incorrect initialization value that leads to slow convergence

2016-12-13 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-18845:
--

 Summary: PageRank has incorrect initialization value that leads to 
slow convergence
 Key: SPARK-18845
 URL: https://issues.apache.org/jira/browse/SPARK-18845
 Project: Spark
  Issue Type: Bug
  Components: GraphX
Affects Versions: 2.0.2, 1.6.3, 1.5.2, 1.4.1, 1.3.1, 1.2.2
Reporter: Andrew Ray


All variants of PageRank in GraphX have incorrect initialization value that 
leads to slow convergence. In the current implementations ranks are seeded with 
the reset probability when it should be 1. This appears to have been introduced 
a long time ago in 
https://github.com/apache/spark/commit/15a564598fe63003652b1e24527c432080b5976c#diff-b2bf3f97dcd2f19d61c921836159cda9L90

This also hides the fact that source vertices (vertices with no incoming edges) 
are not updated. This is because source vertices generally* have pagerank equal 
to the reset probability. Therefore both need to be fixed at once.

PR will be added shortly

*when there are no sinks -- but that's a separate bug



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18847) PageRank gives incorrect results for graphs with sinks

2016-12-13 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-18847:
--

 Summary: PageRank gives incorrect results for graphs with sinks
 Key: SPARK-18847
 URL: https://issues.apache.org/jira/browse/SPARK-18847
 Project: Spark
  Issue Type: Bug
  Components: GraphX
Affects Versions: 2.0.2, 1.6.3, 1.5.2, 1.4.1, 1.3.1, 1.2.2, 1.1.1, 1.0.2
Reporter: Andrew Ray


Sink vertices (those with no outgoing edges) should evenly distribute their 
rank to the entire graph but in the current implementation it is just lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18848) PageRank gives incorrect results for graphs with sinks

2016-12-13 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-18848:
--

 Summary: PageRank gives incorrect results for graphs with sinks
 Key: SPARK-18848
 URL: https://issues.apache.org/jira/browse/SPARK-18848
 Project: Spark
  Issue Type: Bug
  Components: GraphX
Affects Versions: 2.0.2, 1.6.3, 1.5.2, 1.4.1, 1.3.1, 1.2.2, 1.1.1, 1.0.2
Reporter: Andrew Ray


Sink vertices (those with no outgoing edges) should evenly distribute their 
rank to the entire graph but in the current implementation it is just lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18845) PageRank has incorrect initialization value that leads to slow convergence

2016-12-13 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15746385#comment-15746385
 ] 

Andrew Ray commented on SPARK-18845:


[~srowen] No that's a different thing just whether the result sums to 1 or n. 
But to expand on that, since we are using the version that sums to n our 
initial ranks need to sum to n or it takes a lot longer to converge. 

> PageRank has incorrect initialization value that leads to slow convergence
> --
>
> Key: SPARK-18845
> URL: https://issues.apache.org/jira/browse/SPARK-18845
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 1.2.2, 1.3.1, 1.4.1, 1.5.2, 1.6.3, 2.0.2
>Reporter: Andrew Ray
>
> All variants of PageRank in GraphX have incorrect initialization value that 
> leads to slow convergence. In the current implementations ranks are seeded 
> with the reset probability when it should be 1. This appears to have been 
> introduced a long time ago in 
> https://github.com/apache/spark/commit/15a564598fe63003652b1e24527c432080b5976c#diff-b2bf3f97dcd2f19d61c921836159cda9L90
> This also hides the fact that source vertices (vertices with no incoming 
> edges) are not updated. This is because source vertices generally* have 
> pagerank equal to the reset probability. Therefore both need to be fixed at 
> once.
> PR will be added shortly
> *when there are no sinks -- but that's a separate bug



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18847) PageRank gives incorrect results for graphs with sinks

2016-12-13 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15746395#comment-15746395
 ] 

Andrew Ray commented on SPARK-18847:


I have and have not found any relevant. I'm currently working on a fix

> PageRank gives incorrect results for graphs with sinks
> --
>
> Key: SPARK-18847
> URL: https://issues.apache.org/jira/browse/SPARK-18847
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 1.0.2, 1.1.1, 1.2.2, 1.3.1, 1.4.1, 1.5.2, 1.6.3, 2.0.2
>Reporter: Andrew Ray
>
> Sink vertices (those with no outgoing edges) should evenly distribute their 
> rank to the entire graph but in the current implementation it is just lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18393) DataFrame pivot output column names should respect aliases

2016-12-27 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15781473#comment-15781473
 ] 

Andrew Ray commented on SPARK-18393:


It wouldn't hurt to backport to 2.0, its a pretty simple fix.

> DataFrame pivot output column names should respect aliases
> --
>
> Key: SPARK-18393
> URL: https://issues.apache.org/jira/browse/SPARK-18393
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Eric Liang
>Priority: Minor
>
> For example
> {code}
> val df = spark.range(100).selectExpr("id % 5 as x", "id % 2 as a", "id as b")
> df
>   .groupBy('x)
>   .pivot("a", Seq(0, 1))
>   .agg(expr("sum(b)").as("blah"), expr("count(b)").as("foo"))
>   .show()
> +---++-++-+
> |  x|0_sum(`b`) AS `blah`|0_count(`b`) AS `foo`|1_sum(`b`) AS 
> `blah`|1_count(`b`) AS `foo`|
> +---++-++-+
> |  0| 450|   10| 500| 
>   10|
> |  1| 510|   10| 460| 
>   10|
> |  3| 530|   10| 480| 
>   10|
> |  2| 470|   10| 520| 
>   10|
> |  4| 490|   10| 540| 
>   10|
> +---++-++-+
> {code}
> The column names here are quite hard to read. Ideally we would respect the 
> aliases and generate column names like 0_blah, 0_foo, 1_blah, 1_foo instead.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18457) ORC and other columnar formats using HiveShim read all columns when doing a simple count

2016-11-15 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-18457:
--

 Summary: ORC and other columnar formats using HiveShim read all 
columns when doing a simple count
 Key: SPARK-18457
 URL: https://issues.apache.org/jira/browse/SPARK-18457
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.2, 1.6.3
 Environment: Hadoop 2.7.0
Reporter: Andrew Ray
Priority: Minor


Doing a `select count(*) from a_orc_table` reads all columns and thus is slower 
than a query selecting one like `select count(a_column) from a_orc_table`. Data 
read can be seen in the UI (appears to only be accurate for Hadoop 2.5+ based 
on comment in FileScanRDD.scala line 80).

I will create a PR shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20769) Incorrect documentation for using Jupyter notebook

2017-05-16 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-20769:
--

 Summary: Incorrect documentation for using Jupyter notebook
 Key: SPARK-20769
 URL: https://issues.apache.org/jira/browse/SPARK-20769
 Project: Spark
  Issue Type: Bug
  Components: Documentation
Affects Versions: 2.1.1
Reporter: Andrew Ray
Priority: Minor


SPARK-13973 incorrectly removed the required 
PYSPARK_DRIVER_PYTHON_OPTS="notebook" from documentation to use pyspark with 
Jupyter notebook



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20839) Incorrect Dynamic PageRank calculation

2017-06-14 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16049214#comment-16049214
 ] 

Andrew Ray commented on SPARK-20839:


1 & 2 work together to do the algorithm properly with an active set, this is by 
design.
3 although it may look wrong, the deltas (and thus messages) will always be 
positive so this is not a problem.

> Incorrect Dynamic PageRank calculation
> --
>
> Key: SPARK-20839
> URL: https://issues.apache.org/jira/browse/SPARK-20839
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 2.1.1
>Reporter: BahaaEddin AlAila
>
> Correct me if I am wrong
> I think there are three places where the pagerank calculation is incorrect
> 1st) in the VertexProgram (line 318 of PageRank.scala in spark 2.1.1)
> val newPR = oldPR + (1.0 - resetProb) * msgSum
> it should be
> val newPR = resetProb + (1.0 - resetProb) * msgSum
> 2nd) in the message sending part (line 336 of the same file)
> Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
> should be 
> Iterator((edge.dstId, edge.srcAttr._1 * edge.attr))
> as we should be sending the edge weight multiplied by the current pagerank of 
> the source vertex (not the vertex's delta)
> 3rd) the tol check against the abs of the delta (line 335)
>   if (edge.srcAttr._2 > tol) {
> should be
>   if (Math.abs(edge.srcAttr._2) > tol) {
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-20839) Incorrect Dynamic PageRank calculation

2017-06-14 Thread Andrew Ray (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Ray resolved SPARK-20839.

Resolution: Not A Problem

> Incorrect Dynamic PageRank calculation
> --
>
> Key: SPARK-20839
> URL: https://issues.apache.org/jira/browse/SPARK-20839
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 2.1.1
>Reporter: BahaaEddin AlAila
>
> Correct me if I am wrong
> I think there are three places where the pagerank calculation is incorrect
> 1st) in the VertexProgram (line 318 of PageRank.scala in spark 2.1.1)
> val newPR = oldPR + (1.0 - resetProb) * msgSum
> it should be
> val newPR = resetProb + (1.0 - resetProb) * msgSum
> 2nd) in the message sending part (line 336 of the same file)
> Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
> should be 
> Iterator((edge.dstId, edge.srcAttr._1 * edge.attr))
> as we should be sending the edge weight multiplied by the current pagerank of 
> the source vertex (not the vertex's delta)
> 3rd) the tol check against the abs of the delta (line 335)
>   if (edge.srcAttr._2 > tol) {
> should be
>   if (Math.abs(edge.srcAttr._2) > tol) {
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21100) describe should give quartiles similar to Pandas

2017-06-14 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-21100:
--

 Summary: describe should give quartiles similar to Pandas
 Key: SPARK-21100
 URL: https://issues.apache.org/jira/browse/SPARK-21100
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.1.1
Reporter: Andrew Ray
Priority: Minor


The DataFrame describe method should also include quartiles (25th, 50th, and 
75th percentiles) like Pandas.

Example pandas output:
{code}
In [4]: df.describe()
Out[4]:
   Unnamed: 0   displ year cyl cty hwy
count  234.00  234.00   234.00  234.00  234.00  234.00
mean   117.503.471795  2003.505.89   16.858974   23.440171
std 67.6941651.291959 4.5096461.6115344.2559465.954643
min  1.001.60  1999.004.009.00   12.00
25% 59.252.40  1999.004.00   14.00   18.00
50%117.503.30  2003.506.00   17.00   24.00
75%175.754.60  2008.008.00   19.00   27.00
max234.007.00  2008.008.00   35.00   44.00
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21184) QuantileSummaries implementation is wrong and QuantileSummariesSuite fails with larger n

2017-06-22 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-21184:
--

 Summary: QuantileSummaries implementation is wrong and 
QuantileSummariesSuite fails with larger n
 Key: SPARK-21184
 URL: https://issues.apache.org/jira/browse/SPARK-21184
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.1
Reporter: Andrew Ray


1. QuantileSummaries implementation does not match the paper it is supposed to 
be based on.

1a. The compress method 
(https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala#L240)
 merges neighboring buckets, but thats not what the paper says to do. The paper 
(http://infolab.stanford.edu/~datar/courses/cs361a/papers/quantiles.pdf) 
describes an implicit tree structure and the compress method deletes selected 
subtrees.

1b. The paper does not discuss merging these summary data structures at all. 
The following comment is in the merge method of QuantileSummaries:

{quote}  // The GK algorithm is a bit unclear about it, but it seems there 
is no need to adjust the
  // statistics during the merging: the invariants are still respected 
after the merge.{quote}

Unless I'm missing something that needs substantiation, it's not clear that 
that the invariants hold.

2. QuantileSummariesSuite fails with n = 1 (and other non trivial values)
https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala#L27

One possible solution if these issues can't be resolved would be to move to an 
algorithm that explicitly supports merging and is well tested like 
https://github.com/tdunning/t-digest




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21184) QuantileSummaries implementation is wrong and QuantileSummariesSuite fails with larger n

2017-06-28 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067167#comment-16067167
 ] 

Andrew Ray commented on SPARK-21184:


Also the lookup queries are just wrong

{code}
scala> Seq(1, 2).toDF("a").selectExpr("percentile_approx(a, 0.001)").head
res9: org.apache.spark.sql.Row = [2.0]
{code}


> QuantileSummaries implementation is wrong and QuantileSummariesSuite fails 
> with larger n
> 
>
> Key: SPARK-21184
> URL: https://issues.apache.org/jira/browse/SPARK-21184
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Andrew Ray
>
> 1. QuantileSummaries implementation does not match the paper it is supposed 
> to be based on.
> 1a. The compress method 
> (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala#L240)
>  merges neighboring buckets, but thats not what the paper says to do. The 
> paper 
> (http://infolab.stanford.edu/~datar/courses/cs361a/papers/quantiles.pdf) 
> describes an implicit tree structure and the compress method deletes selected 
> subtrees.
> 1b. The paper does not discuss merging these summary data structures at all. 
> The following comment is in the merge method of QuantileSummaries:
> {quote}  // The GK algorithm is a bit unclear about it, but it seems 
> there is no need to adjust the
>   // statistics during the merging: the invariants are still respected 
> after the merge.{quote}
> Unless I'm missing something that needs substantiation, it's not clear that 
> that the invariants hold.
> 2. QuantileSummariesSuite fails with n = 1 (and other non trivial values)
> https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala#L27
> One possible solution if these issues can't be resolved would be to move to 
> an algorithm that explicitly supports merging and is well tested like 
> https://github.com/tdunning/t-digest



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11275) [SQL] Regression in rollup/cube

2015-10-29 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14981338#comment-14981338
 ] 

Andrew Ray commented on SPARK-11275:


I think that I understand what is happening here. Any expression involving one 
of the group by columns in a cube/rollup gets evaluated wrong. This is missed 
in unit testing since the tests only check that the dataframe operations give 
the same result as SQL, but they are both handled by the same incorrect logic. 
I'll put a PR together to fix it and provide better unit tests.

> [SQL] Regression in rollup/cube 
> 
>
> Key: SPARK-11275
> URL: https://issues.apache.org/jira/browse/SPARK-11275
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.1
>Reporter: Xiao Li
>
> Spark SQL is unable to generate a correct result when the following query 
> using rollup. 
> "select a, b, sum(a + b) as sumAB, GROUPING__ID from mytable group by a, 
> b with rollup"
> Spark SQL generates a wrong result:
> [2,4,6,3]
> [2,null,null,1]
> [1,null,null,1]
> [null,null,null,0]
> [1,2,3,3]
> The table mytable is super simple, containing two rows and two columns:
> testData = Seq((1, 2), (2, 4)).toDF("a", "b")
> After turning off codegen, the query plan is like 
> == Parsed Logical Plan ==
> 'Rollup ['a,'b], 
> [unresolvedalias('a),unresolvedalias('b),unresolvedalias('sum(('a + 'b)) AS 
> sumAB#20),unresolvedalias('GROUPING__ID)]
>  'UnresolvedRelation `mytable`, None
> == Analyzed Logical Plan ==
> a: int, b: int, sumAB: bigint, GROUPING__ID: int
> Aggregate [a#2,b#3,grouping__id#23], [a#2,b#3,sum(cast((a#2 + b#3) as 
> bigint)) AS sumAB#20L,GROUPING__ID#23]
>  Expand [0,1,3], [a#2,b#3], grouping__id#23
>   Subquery mytable
>Project [_1#0 AS a#2,_2#1 AS b#3]
> LocalRelation [_1#0,_2#1], [[1,2],[2,4]]
> == Optimized Logical Plan ==
> Aggregate [a#2,b#3,grouping__id#23], [a#2,b#3,sum(cast((a#2 + b#3) as 
> bigint)) AS sumAB#20L,GROUPING__ID#23]
>  Expand [0,1,3], [a#2,b#3], grouping__id#23
>   LocalRelation [a#2,b#3], [[1,2],[2,4]]
> == Physical Plan ==
> Aggregate false, [a#2,b#3,grouping__id#23], [a#2,b#3,sum(PartialSum#24L) AS 
> sumAB#20L,grouping__id#23]
>  Exchange hashpartitioning(a#2,b#3,grouping__id#23,5)
>   Aggregate true, [a#2,b#3,grouping__id#23], 
> [a#2,b#3,grouping__id#23,sum(cast((a#2 + b#3) as bigint)) AS PartialSum#24L]
>Expand [List(null, null, 0),List(a#2, null, 1),List(a#2, b#3, 3)], 
> [a#2,b#3,grouping__id#23]
> LocalTableScan [a#2,b#3], [[1,2],[2,4]]
> Below are my observations:
> 1. Generation of GROUP__ID looks OK. 
> 2. The problem still exists no matter whether turning on/off CODEGEN
> 3. Rollup still works in a simple query when group-by columns have only one 
> column. For example, "select b, sum(a), GROUPING__ID from mytable group by b 
> with rollup"
> 4. The buckets in "HiveDataFrameAnalytcisSuite" are misleading. 
> Unfortunately, they hide the bugs. Although the buckets passed, they just 
> compare the results of SQL and Dataframe. This way is unable to capture the 
> regression when both return the same wrong results.  
> 5. The same problem also exists in cube. I have not started the investigation 
> in cube, but I believe the root causes should be the same. 
> 6. It looks like all the logical plans are correct.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21100) Add summary method as alternative to describe that gives quartiles similar to Pandas

2017-07-05 Thread Andrew Ray (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Ray updated SPARK-21100:
---
Summary: Add summary method as alternative to describe that gives quartiles 
similar to Pandas  (was: describe should give quartiles similar to Pandas)

> Add summary method as alternative to describe that gives quartiles similar to 
> Pandas
> 
>
> Key: SPARK-21100
> URL: https://issues.apache.org/jira/browse/SPARK-21100
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Andrew Ray
>Priority: Minor
>
> The DataFrame describe method should also include quartiles (25th, 50th, and 
> 75th percentiles) like Pandas.
> Example pandas output:
> {code}
> In [4]: df.describe()
> Out[4]:
>Unnamed: 0   displ year cyl cty hwy
> count  234.00  234.00   234.00  234.00  234.00  234.00
> mean   117.503.471795  2003.505.89   16.858974   23.440171
> std 67.6941651.291959 4.5096461.6115344.2559465.954643
> min  1.001.60  1999.004.009.00   12.00
> 25% 59.252.40  1999.004.00   14.00   18.00
> 50%117.503.30  2003.506.00   17.00   24.00
> 75%175.754.60  2008.008.00   19.00   27.00
> max234.007.00  2008.008.00   35.00   44.00
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21566) Python method for summary

2017-07-28 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-21566:
--

 Summary: Python method for summary
 Key: SPARK-21566
 URL: https://issues.apache.org/jira/browse/SPARK-21566
 Project: Spark
  Issue Type: Improvement
  Components: PySpark, SQL
Affects Versions: 2.3.0
Reporter: Andrew Ray


Add python method for summary that was added in SPARK-21100



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21584) Update R method for summary to call new implementation

2017-07-31 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-21584:
--

 Summary: Update R method for summary to call new implementation
 Key: SPARK-21584
 URL: https://issues.apache.org/jira/browse/SPARK-21584
 Project: Spark
  Issue Type: Improvement
  Components: SparkR
Affects Versions: 2.3.0
Reporter: Andrew Ray


Follow up to SPARK-21100



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21584) Update R method for summary to call new implementation

2017-07-31 Thread Andrew Ray (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Ray updated SPARK-21584:
---
Component/s: SQL

> Update R method for summary to call new implementation
> --
>
> Key: SPARK-21584
> URL: https://issues.apache.org/jira/browse/SPARK-21584
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR, SQL
>Affects Versions: 2.3.0
>Reporter: Andrew Ray
>
> Follow up to SPARK-21100



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21565) aggregate query fails with watermark on eventTime but works with watermark on timestamp column generated by current_timestamp

2017-07-31 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16107933#comment-16107933
 ] 

Andrew Ray commented on SPARK-21565:


I believe you need to use a window to group by your event time.

> aggregate query fails with watermark on eventTime but works with watermark on 
> timestamp column generated by current_timestamp
> -
>
> Key: SPARK-21565
> URL: https://issues.apache.org/jira/browse/SPARK-21565
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Amit Assudani
>
> *Short Description: *
> Aggregation query fails with eventTime as watermark column while works with 
> newTimeStamp column generated by running SQL with current_timestamp,
> *Exception:*
> Caused by: java.util.NoSuchElementException: None.get
>   at scala.None$.get(Option.scala:347)
>   at scala.None$.get(Option.scala:345)
>   at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204)
>   at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)
>   at 
> org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
>   at 
> org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
>   at 
> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> *Code to replicate:*
> package test
> import java.nio.file.{Files, Path, Paths}
> import java.text.SimpleDateFormat
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{SparkSession}
> import scala.collection.JavaConverters._
> object Test1 {
>   def main(args: Array[String]) {
> val sparkSession = SparkSession
>   .builder()
>   .master("local[*]")
>   .appName("Spark SQL basic example")
>   .config("spark.some.config.option", "some-value")
>   .getOrCreate()
> val sdf = new SimpleDateFormat("-MM-dd HH:mm:ss")
> val checkpointPath = "target/cp1"
> val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath
> delete(newEventsPath)
> delete(Paths.get(checkpointPath).toAbsolutePath)
> Files.createDirectories(newEventsPath)
> val dfNewEvents= newEvents(sparkSession)
> dfNewEvents.createOrReplaceTempView("dfNewEvents")
> //The below works - Start
> //val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as 
> newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds")
> //dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
> //val groupEvents = sparkSession.sql("select symbol,newTimeStamp, 
> count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp")
> // End
> 
> 
> //The below doesn't work - Start
> val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents 
> ").withWatermark("eventTime","2 seconds")
>  dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
>   val groupEvents = sparkSession.sql("select symbol,eventTime, 
> count(price) as count1 from dfNewEvents2 group by symbol,eventTime")
> // - End
> 
> 
> val query1 = groupEvents.writeStream
>   .outputMode("append")
> .format("console")
>   .option("checkpointLocation", checkpointPath)
>   .start("./myop")
> val newEventFile1=newEventsPath.resolve("eventNew1.json")
> Files.write(newEventFile1, List(
>   """{"symbol": 
> "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""",
>   """{"symbol": 
> "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}"""
> ).toIterable.asJava)
> query1.processAllAvailable()
> sparkSession.streams.awaitAnyTermination(1)
>   }
>   private def newEvents(sparkSession: SparkSession) = {
> val newEvents = Paths.get("target/newEvents/").toAbsolutePath
> delete(newEvents)
> Files.createDirectories(newEvents)
> val dfNewEvents = 
> sparkSession.readStream.schema(eventsSchema).json(newEvents.toString)//.withWatermark("eventTime","2
>  se

[jira] [Commented] (SPARK-21565) aggregate query fails with watermark on eventTime but works with watermark on timestamp column generated by current_timestamp

2017-07-31 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16108006#comment-16108006
 ] 

Andrew Ray commented on SPARK-21565:


No nothing like the limitations of microbatches. The window can be made 
trivially small if you want only one timestamp per group, for example 
{{window(eventTime, "1 microsecond")}}

And yes this should probably be checked in analysis if this is the intended 
limitation.

> aggregate query fails with watermark on eventTime but works with watermark on 
> timestamp column generated by current_timestamp
> -
>
> Key: SPARK-21565
> URL: https://issues.apache.org/jira/browse/SPARK-21565
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Amit Assudani
>
> *Short Description: *
> Aggregation query fails with eventTime as watermark column while works with 
> newTimeStamp column generated by running SQL with current_timestamp,
> *Exception:*
> Caused by: java.util.NoSuchElementException: None.get
>   at scala.None$.get(Option.scala:347)
>   at scala.None$.get(Option.scala:345)
>   at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204)
>   at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)
>   at 
> org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
>   at 
> org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
>   at 
> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> *Code to replicate:*
> package test
> import java.nio.file.{Files, Path, Paths}
> import java.text.SimpleDateFormat
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{SparkSession}
> import scala.collection.JavaConverters._
> object Test1 {
>   def main(args: Array[String]) {
> val sparkSession = SparkSession
>   .builder()
>   .master("local[*]")
>   .appName("Spark SQL basic example")
>   .config("spark.some.config.option", "some-value")
>   .getOrCreate()
> val sdf = new SimpleDateFormat("-MM-dd HH:mm:ss")
> val checkpointPath = "target/cp1"
> val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath
> delete(newEventsPath)
> delete(Paths.get(checkpointPath).toAbsolutePath)
> Files.createDirectories(newEventsPath)
> val dfNewEvents= newEvents(sparkSession)
> dfNewEvents.createOrReplaceTempView("dfNewEvents")
> //The below works - Start
> //val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as 
> newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds")
> //dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
> //val groupEvents = sparkSession.sql("select symbol,newTimeStamp, 
> count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp")
> // End
> 
> 
> //The below doesn't work - Start
> val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents 
> ").withWatermark("eventTime","2 seconds")
>  dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
>   val groupEvents = sparkSession.sql("select symbol,eventTime, 
> count(price) as count1 from dfNewEvents2 group by symbol,eventTime")
> // - End
> 
> 
> val query1 = groupEvents.writeStream
>   .outputMode("append")
> .format("console")
>   .option("checkpointLocation", checkpointPath)
>   .start("./myop")
> val newEventFile1=newEventsPath.resolve("eventNew1.json")
> Files.write(newEventFile1, List(
>   """{"symbol": 
> "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""",
>   """{"symbol": 
> "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}"""
> ).toIterable.asJava)
> query1.processAllAvailable()
> sparkSession.streams.awaitAnyTermination(1)
>   }
>   private def newEvents(sparkSession: SparkSession) = {
> val newEvents = Paths.get("target/newEvents/").toAb

[jira] [Commented] (SPARK-21330) Bad partitioning does not allow to read a JDBC table with extreme values on the partition column

2017-08-01 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16109568#comment-16109568
 ] 

Andrew Ray commented on SPARK-21330:


https://github.com/apache/spark/pull/18800

> Bad partitioning does not allow to read a JDBC table with extreme values on 
> the partition column
> 
>
> Key: SPARK-21330
> URL: https://issues.apache.org/jira/browse/SPARK-21330
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Stefano Parmesan
>
> When using "extreme" values in the partition column (like having a randomly 
> generated long number) overflow might happen, leading to the following 
> warning message:
> {code}WARN JDBCRelation: The number of partitions is reduced because the 
> specified number of partitions is less than the difference between upper 
> bound and lower bound. Updated number of partitions: -1559072469251914524; 
> Input number of partitions: 20; Lower bound: -7701345953623242445; Upper 
> bound: 9186325650834394647.{code}
> When this happens, no data is read from the table.
> This happens because of the following check in 
> {{org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala}}:
> {code}if ((upperBound - lowerBound) >= partitioning.numPartitions){code}
> Funny thing is that we worry about overflows a few lines later:
> {code}// Overflow and silliness can happen if you subtract then divide.
> // Here we get a little roundoff, but that's (hopefully) OK.{code}
> A better check would be:
> {code}if ((upperBound - partitioning.numPartitions) >= lowerBound){code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21110) Structs should be usable in inequality filters

2017-08-01 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16109734#comment-16109734
 ] 

Andrew Ray commented on SPARK-21110:


I'm working on this

> Structs should be usable in inequality filters
> --
>
> Key: SPARK-21110
> URL: https://issues.apache.org/jira/browse/SPARK-21110
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Nicholas Chammas
>Priority: Minor
>
> It seems like a missing feature that you can't compare structs in a filter on 
> a DataFrame.
> Here's a simple demonstration of a) where this would be useful and b) how 
> it's different from simply comparing each of the components of the structs.
> {code}
> import pyspark
> from pyspark.sql.functions import col, struct, concat
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame(
> [
> ('Boston', 'Bob'),
> ('Boston', 'Nick'),
> ('San Francisco', 'Bob'),
> ('San Francisco', 'Nick'),
> ],
> ['city', 'person']
> )
> pairs = (
> df.select(
> struct('city', 'person').alias('p1')
> )
> .crossJoin(
> df.select(
> struct('city', 'person').alias('p2')
> )
> )
> )
> print("Everything")
> pairs.show()
> print("Comparing parts separately (doesn't give me what I want)")
> (pairs
> .where(col('p1.city') < col('p2.city'))
> .where(col('p1.person') < col('p2.person'))
> .show())
> print("Comparing parts together with concat (gives me what I want but is 
> hacky)")
> (pairs
> .where(concat('p1.city', 'p1.person') < concat('p2.city', 'p2.person'))
> .show())
> print("Comparing parts together with struct (my desired solution but 
> currently yields an error)")
> (pairs
> .where(col('p1') < col('p2'))
> .show())
> {code}
> The last query yields the following error in Spark 2.1.1:
> {code}
> org.apache.spark.sql.AnalysisException: cannot resolve '(`p1` < `p2`)' due to 
> data type mismatch: '(`p1` < `p2`)' requires (boolean or tinyint or smallint 
> or int or bigint or float or double or decimal or timestamp or date or string 
> or binary) type, not struct;;
> 'Filter (p1#5 < p2#8)
> +- Join Cross
>:- Project [named_struct(city, city#0, person, person#1) AS p1#5]
>:  +- LogicalRDD [city#0, person#1]
>+- Project [named_struct(city, city#0, person, person#1) AS p2#8]
>   +- LogicalRDD [city#0, person#1]
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21110) Structs should be usable in inequality filters

2017-08-02 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16111093#comment-16111093
 ] 

Andrew Ray commented on SPARK-21110:


https://github.com/apache/spark/pull/18818

> Structs should be usable in inequality filters
> --
>
> Key: SPARK-21110
> URL: https://issues.apache.org/jira/browse/SPARK-21110
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Nicholas Chammas
>Priority: Minor
>
> It seems like a missing feature that you can't compare structs in a filter on 
> a DataFrame.
> Here's a simple demonstration of a) where this would be useful and b) how 
> it's different from simply comparing each of the components of the structs.
> {code}
> import pyspark
> from pyspark.sql.functions import col, struct, concat
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame(
> [
> ('Boston', 'Bob'),
> ('Boston', 'Nick'),
> ('San Francisco', 'Bob'),
> ('San Francisco', 'Nick'),
> ],
> ['city', 'person']
> )
> pairs = (
> df.select(
> struct('city', 'person').alias('p1')
> )
> .crossJoin(
> df.select(
> struct('city', 'person').alias('p2')
> )
> )
> )
> print("Everything")
> pairs.show()
> print("Comparing parts separately (doesn't give me what I want)")
> (pairs
> .where(col('p1.city') < col('p2.city'))
> .where(col('p1.person') < col('p2.person'))
> .show())
> print("Comparing parts together with concat (gives me what I want but is 
> hacky)")
> (pairs
> .where(concat('p1.city', 'p1.person') < concat('p2.city', 'p2.person'))
> .show())
> print("Comparing parts together with struct (my desired solution but 
> currently yields an error)")
> (pairs
> .where(col('p1') < col('p2'))
> .show())
> {code}
> The last query yields the following error in Spark 2.1.1:
> {code}
> org.apache.spark.sql.AnalysisException: cannot resolve '(`p1` < `p2`)' due to 
> data type mismatch: '(`p1` < `p2`)' requires (boolean or tinyint or smallint 
> or int or bigint or float or double or decimal or timestamp or date or string 
> or binary) type, not struct;;
> 'Filter (p1#5 < p2#8)
> +- Join Cross
>:- Project [named_struct(city, city#0, person, person#1) AS p1#5]
>:  +- LogicalRDD [city#0, person#1]
>+- Project [named_struct(city, city#0, person, person#1) AS p2#8]
>   +- LogicalRDD [city#0, person#1]
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21034) Filter not getting pushed down the groupBy clause when first() or last() aggregate function is used

2017-08-02 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1631#comment-1631
 ] 

Andrew Ray commented on SPARK-21034:


{{first}} is not a deterministic function and thus filters are not pushed 
through, this is not a bug.

> Filter not getting pushed down the groupBy clause when first() or last() 
> aggregate function is used
> ---
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21034) Filter not getting pushed down the groupBy clause when first() or last() aggregate function is used

2017-08-02 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16111454#comment-16111454
 ] 

Andrew Ray commented on SPARK-21034:


Yes a=1 is the filter to be pushed down. It is not pushed *through* the 
{{first}} aggregation because it is labeled as nondeterministic see 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala#L47


> Filter not getting pushed down the groupBy clause when first() or last() 
> aggregate function is used
> ---
>
> Key: SPARK-21034
> URL: https://issues.apache.org/jira/browse/SPARK-21034
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> Here is a sample code - 
> {code:java}
> from pyspark.sql import functions as F
> df = spark.createDataFrame([{ "a": 1, "b" : 2, "c":7}, { "a": 3, "b" : 4, "c" 
> : 8},
>{ "a": 1, "b" : 5, "c":7}, { "a": 1, "b" : 6, 
> "c":7} ])
> df.groupBy(["a"]).agg(F.sum("b")).where("a = 1").explain()
> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *HashAggregate(keys=[a#15L], functions=[sum(b#16L)])
> +- Exchange hashpartitioning(a#15L, 4)
>+- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L)])
>   +- *Project [a#15L, b#16L]
>  +- *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> >>>
> >>> df.groupBy(["a"]).agg(F.sum("b"), F.first("c")).where("a = 1").explain()
> == Physical Plan ==
> *Filter (isnotnull(a#15L) && (a#15L = 1))
> +- *HashAggregate(keys=[a#15L], functions=[sum(b#16L), first(c#17L, false)])
>+- Exchange hashpartitioning(a#15L, 4)
>   +- *HashAggregate(keys=[a#15L], functions=[partial_sum(b#16L), 
> partial_first(c#17L, false)])
>  +- Scan ExistingRDD[a#15L,b#16L,c#17L]
> {code}
> As you can see, the filter is not pushed down when F.first aggregate function 
> is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21628) Explicitly specify Java version in maven compiler plugin so IntelliJ imports project correctly

2017-08-03 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-21628:
--

 Summary: Explicitly specify Java version in maven compiler plugin 
so IntelliJ imports project correctly
 Key: SPARK-21628
 URL: https://issues.apache.org/jira/browse/SPARK-21628
 Project: Spark
  Issue Type: Improvement
  Components: Build
Affects Versions: 2.2.0
Reporter: Andrew Ray
Priority: Minor


see 
https://stackoverflow.com/questions/27037657/stop-intellij-idea-to-switch-java-language-level-every-time-the-pom-is-reloaded



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19136) Aggregator with case class as output type fails with ClassCastException

2017-01-10 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15815460#comment-15815460
 ] 

Andrew Ray commented on SPARK-19136:


You did not to a _typed_ aggregation so your result is a dataframe. It can be 
manipulated back into a dataset like:
{code}
agg.select("minmax.*").as[MinMax]
{code}

Or you can do a typed aggregation to get back a typed result, although there is 
no method for a global typed aggregate currently so the creation of a dummy 
group is needed: 
{code}
ds.groupByKey(_ => 1).agg(MinMaxAgg().toColumn)
{code}

> Aggregator with case class as output type fails with ClassCastException
> ---
>
> Key: SPARK-19136
> URL: https://issues.apache.org/jira/browse/SPARK-19136
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Mathieu D
>
> {{Aggregator}} with a case-class as output type returns a Row that cannot be 
> cast back to this type, it fails with {{ClassCastException}}.
> Here is a dummy example to reproduce the problem 
> {code}
> import org.apache.spark.sql._
> import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
> import org.apache.spark.sql.expressions.Aggregator
> import spark.implicits._
> case class MinMax(min: Int, max: Int)
> case class MinMaxAgg() extends Aggregator[Row, (Int, Int), MinMax] with 
> Serializable {
>   def zero: (Int, Int) = (Int.MaxValue, Int.MinValue)
>   def reduce(b: (Int, Int), a: Row): (Int, Int) = (Math.min(b._1, 
> a.getAs[Int](0)), Math.max(b._2, a.getAs[Int](0)))
>   def finish(r: (Int, Int)): MinMax = MinMax(r._1, r._2)
>   def merge(b1: (Int, Int), b2: (Int, Int)): (Int, Int) = (Math.min(b1._1, 
> b2._1), Math.max(b1._2, b2._2))
>   def bufferEncoder: Encoder[(Int, Int)] = ExpressionEncoder()
>   def outputEncoder: Encoder[MinMax] = ExpressionEncoder()
> }
> val ds = Seq(1, 2, 3, 4).toDF("col1")
> val agg = ds.agg(MinMaxAgg().toColumn.alias("minmax"))
> {code}
> bq. {code}
> ds: org.apache.spark.sql.DataFrame = [col1: int]
> agg: org.apache.spark.sql.DataFrame = [minmax: struct]
> {code}
> {code}agg.printSchema(){code}
> bq. {code}
> root
>  |-- minmax: struct (nullable = true)
>  ||-- min: integer (nullable = false)
>  ||-- max: integer (nullable = false)
> {code}
> {code}agg.head(){code}
> bq. {code}
> res1: org.apache.spark.sql.Row = [[1,4]]
> {code}
> {code}agg.head().getAs[MinMax](0){code}
> bq. {code}
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to line4c81e18af34342cda654c381ee91139525.$read$$iw$$iw$$iw$$iw$MinMax
> [...]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8853) FPGrowth is not Java-Friendly

2017-01-10 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15816049#comment-15816049
 ] 

Andrew Ray commented on SPARK-8853:
---

But there is no reason to directly create a {{FPGrowthModel}}, proper usage is 
something like:
{code}
FPGrowthModel model = new FPGrowth()
  .setMinSupport(0.5)
  .setNumPartitions(2)
  .run(rdd);
{code}

> FPGrowth is not Java-Friendly
> -
>
> Key: SPARK-8853
> URL: https://issues.apache.org/jira/browse/SPARK-8853
> Project: Spark
>  Issue Type: Improvement
>  Components: MLlib
>Reporter: Feynman Liang
>
> {{FPGrowth[Item : ClassTag]}} has a public constructor but uses a 
> parameterized type with a context bound {{ClassTag}}, making it difficult to 
> instantiate in Java.
> For example, to instantiate one needs to do
> {code:title=Foobar.java|borderStyle=solid}
> import scala.reflect.ClassTag;
> ClassTag tag = scala.reflect.ClassTag$.MODULE$.apply(String.class)
> FPGrowthModel model = new FPGrowthModel(rdd, 35L, tag);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19136) Aggregator with case class as output type fails with ClassCastException

2017-01-13 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15821868#comment-15821868
 ] 

Andrew Ray commented on SPARK-19136:


I forgot you can also just do:
{code}
ds.select(MinMaxAgg().toColumn)
{code}

> Aggregator with case class as output type fails with ClassCastException
> ---
>
> Key: SPARK-19136
> URL: https://issues.apache.org/jira/browse/SPARK-19136
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Mathieu D
>Priority: Minor
>
> {{Aggregator}} with a case-class as output type returns a Row that cannot be 
> cast back to this type, it fails with {{ClassCastException}}.
> Here is a dummy example to reproduce the problem 
> {code}
> import org.apache.spark.sql._
> import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
> import org.apache.spark.sql.expressions.Aggregator
> import spark.implicits._
> case class MinMax(min: Int, max: Int)
> case class MinMaxAgg() extends Aggregator[Row, (Int, Int), MinMax] with 
> Serializable {
>   def zero: (Int, Int) = (Int.MaxValue, Int.MinValue)
>   def reduce(b: (Int, Int), a: Row): (Int, Int) = (Math.min(b._1, 
> a.getAs[Int](0)), Math.max(b._2, a.getAs[Int](0)))
>   def finish(r: (Int, Int)): MinMax = MinMax(r._1, r._2)
>   def merge(b1: (Int, Int), b2: (Int, Int)): (Int, Int) = (Math.min(b1._1, 
> b2._1), Math.max(b1._2, b2._2))
>   def bufferEncoder: Encoder[(Int, Int)] = ExpressionEncoder()
>   def outputEncoder: Encoder[MinMax] = ExpressionEncoder()
> }
> val ds = Seq(1, 2, 3, 4).toDF("col1")
> val agg = ds.agg(MinMaxAgg().toColumn.alias("minmax"))
> {code}
> bq. {code}
> ds: org.apache.spark.sql.DataFrame = [col1: int]
> agg: org.apache.spark.sql.DataFrame = [minmax: struct]
> {code}
> {code}agg.printSchema(){code}
> bq. {code}
> root
>  |-- minmax: struct (nullable = true)
>  ||-- min: integer (nullable = false)
>  ||-- max: integer (nullable = false)
> {code}
> {code}agg.head(){code}
> bq. {code}
> res1: org.apache.spark.sql.Row = [[1,4]]
> {code}
> {code}agg.head().getAs[MinMax](0){code}
> bq. {code}
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to line4c81e18af34342cda654c381ee91139525.$read$$iw$$iw$$iw$$iw$MinMax
> [...]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19116) LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file

2017-01-13 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15821914#comment-15821914
 ] 

Andrew Ray commented on SPARK-19116:


The 2318 number is the size of the parquet files written to disk

> LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file
> -
>
> Key: SPARK-19116
> URL: https://issues.apache.org/jira/browse/SPARK-19116
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 2.0.1, 2.0.2
> Environment: Python 3.5.x
> Windows 10
>Reporter: Shea Parkes
>
> We're having some modestly severe issues with broadcast join inference, and 
> I've been chasing them through the join heuristics in the catalyst engine.  
> I've made it as far as I can, and I've hit upon something that does not make 
> any sense to me.
> I thought that loading from parquet would be a RelationPlan, which would just 
> use the sum of default sizeInBytes for each column times the number of rows.  
> But this trivial example shows that I am not correct:
> {code}
> import pyspark.sql.functions as F
> df_range = session.range(100).select(F.col('id').cast('integer'))
> df_range.write.parquet('c:/scratch/hundred_integers.parquet')
> df_parquet = session.read.parquet('c:/scratch/hundred_integers.parquet')
> df_parquet.explain(True)
> # Expected sizeInBytes
> integer_default_sizeinbytes = 4
> print(df_parquet.count() * integer_default_sizeinbytes)  # = 400
> # Inferred sizeInBytes
> print(df_parquet._jdf.logicalPlan().statistics().sizeInBytes())  # = 2318
> # For posterity (Didn't really expect this to match anything above)
> print(df_range._jdf.logicalPlan().statistics().sizeInBytes())  # = 600
> {code}
> And here's the results of explain(True) on df_parquet:
> {code}
> In [456]: == Parsed Logical Plan ==
> Relation[id#794] parquet
> == Analyzed Logical Plan ==
> id: int
> Relation[id#794] parquet
> == Optimized Logical Plan ==
> Relation[id#794] parquet
> == Physical Plan ==
> *BatchedScan parquet [id#794] Format: ParquetFormat, InputPaths: 
> file:/c:/scratch/hundred_integers.parquet, PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}
> So basically, I'm not understanding well how the size of the parquet file is 
> being estimated.  I don't expect it to be extremely accurate, but empirically 
> it's so inaccurate that we're having to mess with autoBroadcastJoinThreshold 
> way too much.  (It's not always too high like the example above, it's often 
> way too low.)
> Without deeper understanding, I'm considering a result of 2318 instead of 400 
> to be a bug.  My apologies if I'm missing something obvious.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18568) vertex attributes in the edge triplet not getting updated in super steps for Pregel API

2017-01-13 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15822397#comment-15822397
 ] 

Andrew Ray commented on SPARK-18568:


RDD's have the same problem for cached collections of mutable objects
{code}
import scala.collection.mutable.{Map => MMap}
val rdd = sc.parallelize(MMap(1->1)::Nil)
rdd.cache()
val rdd2 = rdd.map(_ += 2 ->2)

scala> rdd2.collect()
res21: Array[scala.collection.mutable.Map[Int,Int]] = Array(Map(2 -> 2, 1 -> 1))
scala> rdd.collect()
res22: Array[scala.collection.mutable.Map[Int,Int]] = Array(Map(2 -> 2, 1 -> 1))
{code}

So I think the moral is it use immutable objects.

> vertex attributes in the edge triplet not getting updated in super steps for 
> Pregel API
> ---
>
> Key: SPARK-18568
> URL: https://issues.apache.org/jira/browse/SPARK-18568
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 2.0.2
>Reporter: Rohit
>
> When running the Pregel API with vertex attribute as complex objects. The 
> vertex attributes are not getting updated in the triplet view. For example if 
> the vertex attributes changes in first superstep for vertex"a" the triplet 
> src attributes in the send msg program for the first super step gets the 
> latest attributes of the vertex "a" but on 2nd super step if the vertex 
> attributes changes in the vprog the edge triplets are not updated with this 
> new state of the vertex for all the edge triplets having the vertex "a" as 
> src or destination. if I re-create the graph using g = Graph(g.vertices, 
> g.edges) in the while loop before the next super step then its getting 
> updated. But this fix is not good performance wise. A detailed description of 
> the bug along with the code to recreate it is in the attached URL.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16683) Group by does not work after multiple joins of the same dataframe

2017-01-20 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831878#comment-15831878
 ] 

Andrew Ray commented on SPARK-16683:


I'm working on a solution for this

> Group by does not work after multiple joins of the same dataframe
> -
>
> Key: SPARK-16683
> URL: https://issues.apache.org/jira/browse/SPARK-16683
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 2.0.0
> Environment: local and yarn
>Reporter: Witold Jędrzejewski
> Attachments: code_2.0.txt, Duplicates Problem Presentation.json
>
>
> When I join a dataframe, group by a field from it, then join it again by 
> different field and group by field from it, second aggregation does not 
> trigger.
> Minimal example showing the problem is attached as the text to paste into 
> spark-shell (code_2.0.txt).
> The detailed description and minimal example, workaround and possible cause 
> are in the attachment, in a form of Zeppelin notebook.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-19136) Aggregator with case class as output type fails with ClassCastException

2017-03-23 Thread Andrew Ray (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Ray resolved SPARK-19136.

Resolution: Not A Bug

> Aggregator with case class as output type fails with ClassCastException
> ---
>
> Key: SPARK-19136
> URL: https://issues.apache.org/jira/browse/SPARK-19136
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Mathieu D
>Priority: Minor
>
> {{Aggregator}} with a case-class as output type returns a Row that cannot be 
> cast back to this type, it fails with {{ClassCastException}}.
> Here is a dummy example to reproduce the problem 
> {code}
> import org.apache.spark.sql._
> import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
> import org.apache.spark.sql.expressions.Aggregator
> import spark.implicits._
> case class MinMax(min: Int, max: Int)
> case class MinMaxAgg() extends Aggregator[Row, (Int, Int), MinMax] with 
> Serializable {
>   def zero: (Int, Int) = (Int.MaxValue, Int.MinValue)
>   def reduce(b: (Int, Int), a: Row): (Int, Int) = (Math.min(b._1, 
> a.getAs[Int](0)), Math.max(b._2, a.getAs[Int](0)))
>   def finish(r: (Int, Int)): MinMax = MinMax(r._1, r._2)
>   def merge(b1: (Int, Int), b2: (Int, Int)): (Int, Int) = (Math.min(b1._1, 
> b2._1), Math.max(b1._2, b2._2))
>   def bufferEncoder: Encoder[(Int, Int)] = ExpressionEncoder()
>   def outputEncoder: Encoder[MinMax] = ExpressionEncoder()
> }
> val ds = Seq(1, 2, 3, 4).toDF("col1")
> val agg = ds.agg(MinMaxAgg().toColumn.alias("minmax"))
> {code}
> bq. {code}
> ds: org.apache.spark.sql.DataFrame = [col1: int]
> agg: org.apache.spark.sql.DataFrame = [minmax: struct]
> {code}
> {code}agg.printSchema(){code}
> bq. {code}
> root
>  |-- minmax: struct (nullable = true)
>  ||-- min: integer (nullable = false)
>  ||-- max: integer (nullable = false)
> {code}
> {code}agg.head(){code}
> bq. {code}
> res1: org.apache.spark.sql.Row = [[1,4]]
> {code}
> {code}agg.head().getAs[MinMax](0){code}
> bq. {code}
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to line4c81e18af34342cda654c381ee91139525.$read$$iw$$iw$$iw$$iw$MinMax
> [...]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20429) [GRAPHX] Strange results for personalized pagerank if node is involved in a cycle

2017-05-01 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991099#comment-15991099
 ] 

Andrew Ray commented on SPARK-20429:


Can you retest your example with Spark 2.2/master. SPARK-18847 probably fixed 
your issue.

> [GRAPHX] Strange results for personalized pagerank if node is involved in a 
> cycle
> -
>
> Key: SPARK-20429
> URL: https://issues.apache.org/jira/browse/SPARK-20429
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 2.1.0
>Reporter: Francesco Elia
>  Labels: graphx
>
> I'm trying to run the personalized PageRank implementation of GraphX on a 
> simple test graph, which is the following: 
> Image: https://i.stack.imgur.com/JDv1l.jpg
> I'm a bit confused on some results that I get when I try to compute the PPR 
> for a node that is involved in a cycle. For example, the final output for the 
> node 12 is as follows:
> (13, 0.0141)
> (7,  0.0141)
> (19, 0.0153)
> (17, 0.0153)
> (20, 0.0153)
> (11, 0.0391)
> (14, 0.0460)
> (15, 0.0541)
> (16, 0.0541)
> (12, 0.1832)
> I would clearly expect that the node 13 would have a much higher PPR value 
> (in fact, I would expect it to be the first one after the starting node 
> itself). The problem appears as well with other nodes involved in cycles, for 
> example for starting node 13 the node 15 has a very low score. From all the 
> testing that I have done it seems that for starting nodes that do not 
> participate in a cycle the result is exactly how I expect.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8718) Improve EdgePartition2D for non perfect square number of partitions

2015-06-29 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-8718:
-

 Summary: Improve EdgePartition2D for non perfect square number of 
partitions
 Key: SPARK-8718
 URL: https://issues.apache.org/jira/browse/SPARK-8718
 Project: Spark
  Issue Type: Improvement
  Components: GraphX
Reporter: Andrew Ray
Priority: Minor


The current implementation of EdgePartition2D has a major limitation:

bq. One of the limitations of this approach is that the number of machines must 
either be a perfect square. We partially address this limitation by computing 
the machine assignment to the next largest perfect square and then mapping back 
down to the actual number of machines. Unfortunately, this can also lead to 
work imbalance and so it is suggested that a perfect square is used.

To remove this limitation I'm proposing the following code change. It allows us 
to partition into any number of evenly sized bins while maintaining the 
property that any vertex will only need to be replicated at most 2 * 
sqrt(numParts) times. To maintain current behavior for perfect squares we use 
the old algorithm in that case, although this could be removed if we dont care 
about producing the exact same result.

See this IPython notebook for a visualization of what is being proposed 
[https://github.com/aray/e2d/blob/master/EdgePartition2D.ipynb] and download it 
to interactively change the number of partitions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5159) Thrift server does not respect hive.server2.enable.doAs=true

2015-01-08 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-5159:
-

 Summary: Thrift server does not respect 
hive.server2.enable.doAs=true
 Key: SPARK-5159
 URL: https://issues.apache.org/jira/browse/SPARK-5159
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.0
Reporter: Andrew Ray


I'm currently testing the spark sql thrift server on a kerberos secured cluster 
in YARN mode. Currently any user can access any table regardless of HDFS 
permissions as all data is read as the hive user. In HiveServer2 the property 
hive.server2.enable.doAs=true causes all access to be done as the submitting 
user. We should do the same.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17458) Alias specified for aggregates in a pivot are not honored

2016-09-15 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15494361#comment-15494361
 ] 

Andrew Ray commented on SPARK-17458:


It's a1ray

> Alias specified for aggregates in a pivot are not honored
> -
>
> Key: SPARK-17458
> URL: https://issues.apache.org/jira/browse/SPARK-17458
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Ravi Somepalli
> Fix For: 2.1.0
>
>
> When using pivot and multiple aggregations we need to alias to avoid special 
> characters, but alias does not help because 
> df.groupBy("C").pivot("A").agg(avg("D").as("COLD"), max("B").as("COLB")).show
> ||C || bar_avg(`D`) AS `COLD` || bar_max(`B`) AS `COLB` || foo_avg(`D`) 
> AS `COLD` || foo_max(`B`) AS `COLB` ||
> |small|   5.5|   two|2.3335|  
>  two|
> |large|   5.5|   two|   2.0|  
>  one|
> Expected Output
> ||C || bar_COLD || bar_COLB || foo_COLD || foo_COLB ||
> |small|   5.5|   two|2.3335|  
>  two|
> |large|   5.5|   two|   2.0|  
>  one|
> One approach you can fix this issue is to change the class
> sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
>  and change the outputName method in 
> {code}
> object ResolvePivot extends Rule[LogicalPlan] {
> def apply(plan: LogicalPlan): LogicalPlan = plan transform {
> {code}
> {code}
> def outputName(value: Literal, aggregate: Expression): String = {
>   val suffix = aggregate match {
>  case n: NamedExpression => 
> aggregate.asInstanceOf[NamedExpression].name
>  case _ => aggregate.sql
>}
>   if (singleAgg) value.toString else value + "_" + suffix
> }
> {code}
> Version : 2.0.0
> {code}
> def outputName(value: Literal, aggregate: Expression): String = {
>   if (singleAgg) value.toString else value + "_" + aggregate.sql
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17458) Alias specified for aggregates in a pivot are not honored

2016-09-15 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15494361#comment-15494361
 ] 

Andrew Ray edited comment on SPARK-17458 at 9/15/16 8:09 PM:
-

[~hvanhovell] It's a1ray


was (Author: a1ray):
It's a1ray

> Alias specified for aggregates in a pivot are not honored
> -
>
> Key: SPARK-17458
> URL: https://issues.apache.org/jira/browse/SPARK-17458
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Ravi Somepalli
>Assignee: Herman van Hovell
> Fix For: 2.1.0
>
>
> When using pivot and multiple aggregations we need to alias to avoid special 
> characters, but alias does not help because 
> df.groupBy("C").pivot("A").agg(avg("D").as("COLD"), max("B").as("COLB")).show
> ||C || bar_avg(`D`) AS `COLD` || bar_max(`B`) AS `COLB` || foo_avg(`D`) 
> AS `COLD` || foo_max(`B`) AS `COLB` ||
> |small|   5.5|   two|2.3335|  
>  two|
> |large|   5.5|   two|   2.0|  
>  one|
> Expected Output
> ||C || bar_COLD || bar_COLB || foo_COLD || foo_COLB ||
> |small|   5.5|   two|2.3335|  
>  two|
> |large|   5.5|   two|   2.0|  
>  one|
> One approach you can fix this issue is to change the class
> sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
>  and change the outputName method in 
> {code}
> object ResolvePivot extends Rule[LogicalPlan] {
> def apply(plan: LogicalPlan): LogicalPlan = plan transform {
> {code}
> {code}
> def outputName(value: Literal, aggregate: Expression): String = {
>   val suffix = aggregate match {
>  case n: NamedExpression => 
> aggregate.asInstanceOf[NamedExpression].name
>  case _ => aggregate.sql
>}
>   if (singleAgg) value.toString else value + "_" + suffix
> }
> {code}
> Version : 2.0.0
> {code}
> def outputName(value: Literal, aggregate: Expression): String = {
>   if (singleAgg) value.toString else value + "_" + aggregate.sql
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-17458) Alias specified for aggregates in a pivot are not honored

2016-09-15 Thread Andrew Ray (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Ray updated SPARK-17458:
---
Comment: was deleted

(was: [~hvanhovell] It's a1ray)

> Alias specified for aggregates in a pivot are not honored
> -
>
> Key: SPARK-17458
> URL: https://issues.apache.org/jira/browse/SPARK-17458
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Ravi Somepalli
>Assignee: Herman van Hovell
> Fix For: 2.1.0
>
>
> When using pivot and multiple aggregations we need to alias to avoid special 
> characters, but alias does not help because 
> df.groupBy("C").pivot("A").agg(avg("D").as("COLD"), max("B").as("COLB")).show
> ||C || bar_avg(`D`) AS `COLD` || bar_max(`B`) AS `COLB` || foo_avg(`D`) 
> AS `COLD` || foo_max(`B`) AS `COLB` ||
> |small|   5.5|   two|2.3335|  
>  two|
> |large|   5.5|   two|   2.0|  
>  one|
> Expected Output
> ||C || bar_COLD || bar_COLB || foo_COLD || foo_COLB ||
> |small|   5.5|   two|2.3335|  
>  two|
> |large|   5.5|   two|   2.0|  
>  one|
> One approach you can fix this issue is to change the class
> sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
>  and change the outputName method in 
> {code}
> object ResolvePivot extends Rule[LogicalPlan] {
> def apply(plan: LogicalPlan): LogicalPlan = plan transform {
> {code}
> {code}
> def outputName(value: Literal, aggregate: Expression): String = {
>   val suffix = aggregate match {
>  case n: NamedExpression => 
> aggregate.asInstanceOf[NamedExpression].name
>  case _ => aggregate.sql
>}
>   if (singleAgg) value.toString else value + "_" + suffix
> }
> {code}
> Version : 2.0.0
> {code}
> def outputName(value: Literal, aggregate: Expression): String = {
>   if (singleAgg) value.toString else value + "_" + aggregate.sql
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17458) Alias specified for aggregates in a pivot are not honored

2016-09-15 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15494591#comment-15494591
 ] 

Andrew Ray commented on SPARK-17458:


[~hvanhovell]: My JIRA username is a1ray.

> Alias specified for aggregates in a pivot are not honored
> -
>
> Key: SPARK-17458
> URL: https://issues.apache.org/jira/browse/SPARK-17458
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Ravi Somepalli
>Assignee: Herman van Hovell
> Fix For: 2.1.0
>
>
> When using pivot and multiple aggregations we need to alias to avoid special 
> characters, but alias does not help because 
> df.groupBy("C").pivot("A").agg(avg("D").as("COLD"), max("B").as("COLB")).show
> ||C || bar_avg(`D`) AS `COLD` || bar_max(`B`) AS `COLB` || foo_avg(`D`) 
> AS `COLD` || foo_max(`B`) AS `COLB` ||
> |small|   5.5|   two|2.3335|  
>  two|
> |large|   5.5|   two|   2.0|  
>  one|
> Expected Output
> ||C || bar_COLD || bar_COLB || foo_COLD || foo_COLB ||
> |small|   5.5|   two|2.3335|  
>  two|
> |large|   5.5|   two|   2.0|  
>  one|
> One approach you can fix this issue is to change the class
> sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
>  and change the outputName method in 
> {code}
> object ResolvePivot extends Rule[LogicalPlan] {
> def apply(plan: LogicalPlan): LogicalPlan = plan transform {
> {code}
> {code}
> def outputName(value: Literal, aggregate: Expression): String = {
>   val suffix = aggregate match {
>  case n: NamedExpression => 
> aggregate.asInstanceOf[NamedExpression].name
>  case _ => aggregate.sql
>}
>   if (singleAgg) value.toString else value + "_" + suffix
> }
> {code}
> Version : 2.0.0
> {code}
> def outputName(value: Literal, aggregate: Expression): String = {
>   if (singleAgg) value.toString else value + "_" + aggregate.sql
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12911) Cacheing a dataframe causes array comparisons to fail (in filter / where) after 1.6

2016-01-20 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15109269#comment-15109269
 ] 

Andrew Ray commented on SPARK-12911:


In the current master this happens even without caching. The generated code 
calls UnsafeArrayData.equals for each row with an argument of type 
GenericArrayData (the filter value) which returns false. Presumably the 
generated code needs to convert the filter value to an UnsafeArrayData but I'm 
at a loss for where to do so.

[~sdicocco] This is a bug, not something a developer should have to worry about.

> Cacheing a dataframe causes array comparisons to fail (in filter / where) 
> after 1.6
> ---
>
> Key: SPARK-12911
> URL: https://issues.apache.org/jira/browse/SPARK-12911
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
> Environment: OSX 10.11.1, Scala 2.11.7, Spark 1.6.0
>Reporter: Jesse English
>
> When doing a *where* operation on a dataframe and testing for equality on an 
> array type, after 1.6 no valid comparisons are made if the dataframe has been 
> cached.  If it has not been cached, the results are as expected.
> This appears to be related to the underlying unsafe array data types.
> {code:title=test.scala|borderStyle=solid}
> test("test array comparison") {
> val vectors: Vector[Row] =  Vector(
>   Row.fromTuple("id_1" -> Array(0L, 2L)),
>   Row.fromTuple("id_2" -> Array(0L, 5L)),
>   Row.fromTuple("id_3" -> Array(0L, 9L)),
>   Row.fromTuple("id_4" -> Array(1L, 0L)),
>   Row.fromTuple("id_5" -> Array(1L, 8L)),
>   Row.fromTuple("id_6" -> Array(2L, 4L)),
>   Row.fromTuple("id_7" -> Array(5L, 6L)),
>   Row.fromTuple("id_8" -> Array(6L, 2L)),
>   Row.fromTuple("id_9" -> Array(7L, 0L))
> )
> val data: RDD[Row] = sc.parallelize(vectors, 3)
> val schema = StructType(
>   StructField("id", StringType, false) ::
> StructField("point", DataTypes.createArrayType(LongType, false), 
> false) ::
> Nil
> )
> val sqlContext = new SQLContext(sc)
> val dataframe = sqlContext.createDataFrame(data, schema)
> val targetPoint:Array[Long] = Array(0L,9L)
> //Cacheing is the trigger to cause the error (no cacheing causes no error)
> dataframe.cache()
> //This is the line where it fails
> //java.util.NoSuchElementException: next on empty iterator
> //However we know that there is a valid match
> val targetRow = dataframe.where(dataframe("point") === 
> array(targetPoint.map(value => lit(value)): _*)).first()
> assert(targetRow != null)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-11690) Add pivot to python api

2015-11-11 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-11690:
--

 Summary: Add pivot to python api
 Key: SPARK-11690
 URL: https://issues.apache.org/jira/browse/SPARK-11690
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Reporter: Andrew Ray
Priority: Minor


Add pivot method to the python api GroupedData class



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12184) Make python api doc for pivot consistant with scala doc

2015-12-07 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-12184:
--

 Summary: Make python api doc for pivot consistant with scala doc
 Key: SPARK-12184
 URL: https://issues.apache.org/jira/browse/SPARK-12184
 Project: Spark
  Issue Type: Documentation
  Components: PySpark
Affects Versions: 1.6.0
Reporter: Andrew Ray
Priority: Trivial


In SPARK-11946 the API for pivot was changed a bit and got updated doc, the doc 
changes were not made for the python api though.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12205) Pivot fails Analysis when aggregate is UnresolvedFunction

2015-12-08 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-12205:
--

 Summary: Pivot fails Analysis when aggregate is UnresolvedFunction
 Key: SPARK-12205
 URL: https://issues.apache.org/jira/browse/SPARK-12205
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.0
Reporter: Andrew Ray






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12211) Incorrect version number in graphx doc for migration from 1.1

2015-12-08 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-12211:
--

 Summary: Incorrect version number in graphx doc for migration from 
1.1
 Key: SPARK-12211
 URL: https://issues.apache.org/jira/browse/SPARK-12211
 Project: Spark
  Issue Type: Documentation
  Components: Documentation, GraphX
Affects Versions: 1.5.2, 1.5.1, 1.5.0, 1.4.1, 1.4.0, 1.3.1, 1.3.0, 1.2.2, 
1.2.1, 1.2.0, 1.6.0
Reporter: Andrew Ray
Priority: Minor


Migration from 1.1 section added to the GraphX doc in 1.2.0 (see 
https://spark.apache.org/docs/1.2.0/graphx-programming-guide.html#migrating-from-spark-11)
 uses {{site.SPARK_VERSION}} as the version where changes were introduced, it 
should be just 1.2.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9042) Spark SQL incompatibility if security is enforced on the Hive warehouse

2015-12-16 Thread Andrew Ray (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15060208#comment-15060208
 ] 

Andrew Ray commented on SPARK-9042:
---

Sean, I think there are a couple issues going on here. In my experience with 
the Sentry HDFS plugin, you can read tables just fine from spark (which was the 
stated issue here). However there are other similar issues that are real, you 
can't create/modify any tables. There are two issues there. First is HDFS 
permissions, the sentry hdfs plugin only gives you read access. Second is Hive 
metastore permissions, even if you create the table in some other hdfs location 
that you have write access to you will still fail as you can't make 
modifications to the hive metastore as it has a whitelist of users that is by 
default set to just hive and impala.

> Spark SQL incompatibility if security is enforced on the Hive warehouse
> ---
>
> Key: SPARK-9042
> URL: https://issues.apache.org/jira/browse/SPARK-9042
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.2.0
>Reporter: Nitin Kak
>
> Hive queries executed from Spark using HiveContext use CLI to create the 
> query plan and then access the Hive table directories(under 
> /user/hive/warehouse/) directly. This gives AccessContolException if Apache 
> Sentry is installed:
> org.apache.hadoop.security.AccessControlException: Permission denied: 
> user=kakn, access=READ_EXECUTE, 
> inode="/user/hive/warehouse/mastering.db/sample_table":hive:hive:drwxrwx--t 
> With Apache Sentry, only "hive" user(created only for Sentry) has the 
> permissions to access the hive warehouse directory. After Sentry 
> installations all the queries are directed to HiveServer2 which translates 
> the changes the invoking user to "hive" and then access the hive warehouse 
> directory. However, HiveContext does not execute the query through 
> HiveServer2 which is leading to the issue. Here is an example of executing 
> hive query through HiveContext.
> val hqlContext = new HiveContext(sc) // Create context to run Hive queries 
> val pairRDD = hqlContext.sql(hql) // where hql is the string with hive query 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13749) Faster pivot implementation for many distinct values with two phase aggregation

2016-03-08 Thread Andrew Ray (JIRA)
Andrew Ray created SPARK-13749:
--

 Summary: Faster pivot implementation for many distinct values with 
two phase aggregation
 Key: SPARK-13749
 URL: https://issues.apache.org/jira/browse/SPARK-13749
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Reporter: Andrew Ray


The existing implementation of pivot translates into a single aggregation with 
one aggregate per distinct pivot value. When the number of distinct pivot 
values is large (say 1000+) this can get extremely slow since each input value 
gets evaluated on every aggregate even though it only affects the value of one 
of them.

I'm proposing an alternate strategy for when there are 10+ (somewhat arbitrary 
threshold) distinct pivot values. We do two phases of aggregation. In the first 
we group by the grouping columns plus the pivot column and perform the 
specified aggregations (one or sometimes more). In the second aggregation we 
group by the grouping columns and use the new (non public) PivotFirst aggregate 
that rearranges the outputs of the first aggregation into an array indexed by 
the pivot value. Finally we do a project to extract the array entries into the 
appropriate output column.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39728) Test for parity of SQL functions between Python and JVM DataFrame API's

2022-07-09 Thread Andrew Ray (Jira)
Andrew Ray created SPARK-39728:
--

 Summary: Test for parity of SQL functions between Python and JVM 
DataFrame API's
 Key: SPARK-39728
 URL: https://issues.apache.org/jira/browse/SPARK-39728
 Project: Spark
  Issue Type: Improvement
  Components: PySpark, Tests
Affects Versions: 3.3.0
Reporter: Andrew Ray


Add a unit test that compares the available list of Python DataFrame functions 
in pyspark.sql.functions with those available in the Scala/Java DataFrame API 
in org.apache.spark.sql.functions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-39728) Test for parity of SQL functions between Python and JVM DataFrame API's

2022-07-09 Thread Andrew Ray (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Ray updated SPARK-39728:
---
Priority: Minor  (was: Major)

> Test for parity of SQL functions between Python and JVM DataFrame API's
> ---
>
> Key: SPARK-39728
> URL: https://issues.apache.org/jira/browse/SPARK-39728
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Tests
>Affects Versions: 3.3.0
>Reporter: Andrew Ray
>Priority: Minor
>
> Add a unit test that compares the available list of Python DataFrame 
> functions in pyspark.sql.functions with those available in the Scala/Java 
> DataFrame API in org.apache.spark.sql.functions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39733) Add map_contains_key to pyspark.sql.functions

2022-07-10 Thread Andrew Ray (Jira)
Andrew Ray created SPARK-39733:
--

 Summary: Add map_contains_key to pyspark.sql.functions
 Key: SPARK-39733
 URL: https://issues.apache.org/jira/browse/SPARK-39733
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 3.3.0
Reporter: Andrew Ray


SPARK-37584 added the function map_contains_key to SQL and Scala/Java 
functions. This JIRA is to track its addition to the PySpark function set for 
parity.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39734) Add call_udf to pyspark.sql.functions

2022-07-10 Thread Andrew Ray (Jira)
Andrew Ray created SPARK-39734:
--

 Summary: Add call_udf to pyspark.sql.functions
 Key: SPARK-39734
 URL: https://issues.apache.org/jira/browse/SPARK-39734
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 3.3.0
Reporter: Andrew Ray


Add the call_udf function to PySpark for parity with the Scala/Java function 
org.apache.spark.sql.functions#call_udf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39883) Add DataFrame function parity check

2022-07-26 Thread Andrew Ray (Jira)
Andrew Ray created SPARK-39883:
--

 Summary: Add DataFrame function parity check
 Key: SPARK-39883
 URL: https://issues.apache.org/jira/browse/SPARK-39883
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: Andrew Ray


Add a test that compares the available list of DataFrame functions in 
org.apache.spark.sql.functions with the SQL function registry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39897) StackOverflowError in TaskMemoryManager

2022-07-27 Thread Andrew Ray (Jira)
Andrew Ray created SPARK-39897:
--

 Summary: StackOverflowError in TaskMemoryManager
 Key: SPARK-39897
 URL: https://issues.apache.org/jira/browse/SPARK-39897
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.7
Reporter: Andrew Ray


I have observed the following error that looks to stem from 
TaskMemoryManager.allocatePage making a recursive call to itself when a page 
can not be allocated. I'm observing this in Spark 2.4 but since the relevant 
code is still the same in master this is likely still a potential point of 
failure in current versions. Prioritizing this as minor as this looks to be a 
very uncommon outcome as I can not find any other reports of a similar nature.
{code:java}
Py4JJavaError: An error occurred while calling o625.saveAsTable.
: org.apache.spark.SparkException: Job aborted.
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:170)
at 
org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:503)
at 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:217)
at 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:177)
at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at 
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at 
org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:474)
at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:453)
at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:409)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.StackOverflowError
at 
java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1012)
at 
java.util.concurrent.ConcurrentHashMap.putIfAbsent(ConcurrentHashMap.java:1535)
at java.lang.ClassLoader.getClassLoadingLock(ClassLoader.java:457)
at java.lang.ClassLoader.loadClass(ClassLoader.java:398)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.util.ResourceBundle$RBClassLoader.loadClass(ResourceBundle.java:512)
at java.util.ResourceBundle$Control.newBundle(ResourceBundle.java:2657)
at java.util.ResourceBundle.loadBundle(ResourceBundle.java:1518)
at java.util.ResourceBundle.findBundle(ResourceBundle.java:1482)
at java.util.ResourceBundle.findBundle(ResourceBundle.java:1436)
at java.util.ResourceBundle.findBundle(ResourceBundle.java:1436)
at java.util.ResourceBundle.getBundleImpl(ResourceBundle.java:1370)
at java.util.ResourceBundle.getBundle(ResourceBundle.java:899)
at sun.util.resources.LocaleData$1.run(LocaleData.java:167)
a