[jira] [Updated] (SPARK-41277) Leverage shuffle key as bucketing properties

2022-12-26 Thread Ohad Raviv (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv updated SPARK-41277:
---
Summary: Leverage shuffle key as bucketing properties  (was: Save and 
leverage shuffle key in tblproperties)

> Leverage shuffle key as bucketing properties
> 
>
> Key: SPARK-41277
> URL: https://issues.apache.org/jira/browse/SPARK-41277
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> I'm not sure if I'm not missing anything trivial.
> In a typical process, many datasets get materialized and many of them after a 
> shuffle (e.g join). then they would again be involved in further actions and 
> often use the same key.
> Wouldn't it make sense to save the shuffle key along with the table to avoid 
> unnecessary shuffles?
> Also, the implementation seems quite straightforward - to just leverage the 
> bucketing mechanism.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41277) Save and leverage shuffle key in tblproperties

2022-12-25 Thread Ohad Raviv (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651877#comment-17651877
 ] 

Ohad Raviv commented on SPARK-41277:


I managed to do some quick-and-dirty solution, just to be able to check it on 
existing processes.

I had to change `{_}spark.sql.legacy.createHiveTableByDefault=false{_}` as Hive 
provider, Spark and bucketing do not play nicely together (Spark uses a 
different hash function from Hive).

then I added a custom optimization rule:
{code:java}
object BucketingRule extends Rule[LogicalPlan] {

  override def apply(plan: LogicalPlan): LogicalPlan = {
plan transform {
  case c @ CreateDataSourceTableAsSelectCommand(table, 
SaveMode.ErrorIfExists, query, _)
if query.resolved =>
query match {
  case Aggregate(grouping, _, _) =>
val numBuckets = SQLConf.get.numShufflePartitions
val bucketSpec = BucketSpec(numBuckets, 
grouping.map(_.asInstanceOf[AttributeReference].name), Nil)
c.copy(table = table.copy(bucketSpec = Some(bucketSpec)))
  case _ => c
}
}
  }
} 

spark.sessionState.experimentalMethods.extraOptimizations ++= BucketingRule :: 
Nil{code}
And it works on this mock:
{code:java}
(1 to 30).map(i => ("k_" + (i-(1-i%2)), "v_" + i))
  .toDF("id", "val")
  .createOrReplaceTempView("t")

spark.sql(s"create table tbl1 select id,max(val) val, count(1) cnt from t group 
by id")
spark.table("t").write.bucketBy(3, "id").saveAsTable("tbl2")

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

val dfPlan = spark.sql("create table tbl3 as select tbl1.* from tbl1" +
  " join tbl2 on tbl1.id=tbl2.id")

dfPlan.explain(true)
spark.table("tbl3").show()
{code}
you could see that `tbl1` gets created as a bucketed table.

I will try to see if we get any noticeable performance gain.

meanwhile, could you suggest/direct to a better solution?

 

 

 

> Save and leverage shuffle key in tblproperties
> --
>
> Key: SPARK-41277
> URL: https://issues.apache.org/jira/browse/SPARK-41277
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> I'm not sure if I'm not missing anything trivial.
> In a typical process, many datasets get materialized and many of them after a 
> shuffle (e.g join). then they would again be involved in further actions and 
> often use the same key.
> Wouldn't it make sense to save the shuffle key along with the table to avoid 
> unnecessary shuffles?
> Also, the implementation seems quite straightforward - to just leverage the 
> bucketing mechanism.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41277) Save and leverage shuffle key in tblproperties

2022-12-19 Thread Ohad Raviv (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17649300#comment-17649300
 ] 

Ohad Raviv commented on SPARK-41277:


[~gurwls223] - can I please get your opinion here?

> Save and leverage shuffle key in tblproperties
> --
>
> Key: SPARK-41277
> URL: https://issues.apache.org/jira/browse/SPARK-41277
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> I'm not sure if I'm not missing anything trivial.
> In a typical process, many datasets get materialized and many of them after a 
> shuffle (e.g join). then they would again be involved in further actions and 
> often use the same key.
> Wouldn't it make sense to save the shuffle key along with the table to avoid 
> unnecessary shuffles?
> Also, the implementation seems quite straightforward - to just leverage the 
> bucketing mechanism.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Ohad Raviv (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646973#comment-17646973
 ] 

Ohad Raviv commented on SPARK-41510:


ok.. after diving into the code I think I found what I was looking for:
{code:java}
spark._sc._python_includes.append("/shared_nfs/my_folder") {code}
but it is kind of hacky. tell me what you think, and if we could make it more 
official/documented.

> Support easy way for user defined PYTHONPATH in workers
> ---
>
> Key: SPARK-41510
> URL: https://issues.apache.org/jira/browse/SPARK-41510
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> When working interactively with Spark through notebooks in various envs - 
> Databricks/YARN I often encounter a very frustrating process of trying to add 
> new python modules and even change their code without starting a new spark 
> session/cluster.
> In the driver side it is easy to add things like `sys.path.append()` but if 
> for example, if a UDF code is importing a function from a local module, then 
> the pickle boundaries will assume that the module exists in the workers, and 
> fail on "python module does not exist..".
> To update the code "online" I can add NFS volume to the workers' PYTHONPATH.
> However, setting the PYTHONPATH in the workers is not easy as it gets 
> overridden by someone (databricks/spark) along the way. a few ugly 
> workarounds are suggested like running a "dummy" UDF on the workers to add 
> the folder to the sys.path.
> I think all of that could easily be solved if we just add a dedicated 
> `spark.conf` the will get merged into the worker's PYTHONPATH, just here:
> [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]
>  
> please tell me what you think, and I will make the PR.
> thanks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Ohad Raviv (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646734#comment-17646734
 ] 

Ohad Raviv commented on SPARK-41510:


the conda solution is more for a "static" packages.

the scenario is this:

we're developing a python library (more than one .py file) and want to do it 
interactively in the notebooks. so we have all the modules in some folder and 
we add this folder to the driver's sys.path.

Then, if we for example use a function from the module inside a UDF, we get: 
"ModuleNotFoundError: No module named 'some_module' ".

The reason is that some_module is not in the PYTHONPATH/sys.path of the 
workers. the code itself is accessible to the workers for example in a shared 
NFS folder.

so all we now need is to add the path.

we can do it inside the UDF something like:

```

if "/shared_nfs/my_folder" not in sys.path: sys.path.insert(0, 
"/shared_nfs/my_folder")

```

but that is both very ugly and only a partial solution as it works only in UDF 
case.

the suggestion is to have some kind of mechanism to easily add a folder to the 
workers' sys.path.

the option of wrapping the code in zip/egg and add it makes a very long 
development cycle and requires restarting the spark session and the Notebook to 
lose its state.

with the suggestion above we could actually edit the python package 
interactively and see the changes almost immediately.

hope it is clearer now.

 

 

> Support easy way for user defined PYTHONPATH in workers
> ---
>
> Key: SPARK-41510
> URL: https://issues.apache.org/jira/browse/SPARK-41510
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> When working interactively with Spark through notebooks in various envs - 
> Databricks/YARN I often encounter a very frustrating process of trying to add 
> new python modules and even change their code without starting a new spark 
> session/cluster.
> In the driver side it is easy to add things like `sys.path.append()` but if 
> for example, if a UDF code is importing a function from a local module, then 
> the pickle boundaries will assume that the module exists in the workers, and 
> fail on "python module does not exist..".
> To update the code "online" I can add NFS volume to the workers' PYTHONPATH.
> However, setting the PYTHONPATH in the workers is not easy as it gets 
> overridden by someone (databricks/spark) along the way. a few ugly 
> workarounds are suggested like running a "dummy" UDF on the workers to add 
> the folder to the sys.path.
> I think all of that could easily be solved if we just add a dedicated 
> `spark.conf` the will get merged into the worker's PYTHONPATH, just here:
> [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]
>  
> please tell me what you think, and I will make the PR.
> thanks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Ohad Raviv (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646606#comment-17646606
 ] 

Ohad Raviv edited comment on SPARK-41510 at 12/13/22 12:23 PM:
---

[~hvanhovell], [~gurwls223] - can you please look at this/refer that to someone?


was (Author: uzadude):
[~hvanhovell] - can you please refer that to someone?

> Support easy way for user defined PYTHONPATH in workers
> ---
>
> Key: SPARK-41510
> URL: https://issues.apache.org/jira/browse/SPARK-41510
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> When working interactively with Spark through notebooks in various envs - 
> Databricks/YARN I often encounter a very frustrating process of trying to add 
> new python modules and even change their code without starting a new spark 
> session/cluster.
> In the driver side it is easy to add things like `sys.path.append()` but if 
> for example, if a UDF code is importing a function from a local module, then 
> the pickle boundaries will assume that the module exists in the workers, and 
> fail on "python module does not exist..".
> To update the code "online" I can add NFS volume to the workers' PYTHONPATH.
> However, setting the PYTHONPATH in the workers is not easy as it gets 
> overridden by someone (databricks/spark) along the way. a few ugly 
> workarounds are suggested like running a "dummy" UDF on the workers to add 
> the folder to the sys.path.
> I think all of that could easily be solved if we just add a dedicated 
> `spark.conf` the will get merged into the worker's PYTHONPATH, just here:
> [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]
>  
> please tell me what you think, and I will make the PR.
> thanks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Ohad Raviv (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646606#comment-17646606
 ] 

Ohad Raviv commented on SPARK-41510:


[~hvanhovell] - can you please refer that to someone?

> Support easy way for user defined PYTHONPATH in workers
> ---
>
> Key: SPARK-41510
> URL: https://issues.apache.org/jira/browse/SPARK-41510
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> When working interactively with Spark through notebooks in various envs - 
> Databricks/YARN I often encounter a very frustrating process of trying to add 
> new python modules and even change their code without starting a new spark 
> session/cluster.
> In the driver side it is easy to add things like `sys.path.append()` but if 
> for example, if a UDF code is importing a function from a local module, then 
> the pickle boundaries will assume that the module exists in the workers, and 
> fail on "python module does not exist..".
> To update the code "online" I can add NFS volume to the workers' PYTHONPATH.
> However, setting the PYTHONPATH in the workers is not easy as it gets 
> overridden by someone (databricks/spark) along the way. a few ugly 
> workarounds are suggested like running a "dummy" UDF on the workers to add 
> the folder to the sys.path.
> I think all of that could easily be solved if we just add a dedicated 
> `spark.conf` the will get merged into the worker's PYTHONPATH, just here:
> [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]
>  
> please tell me what you think, and I will make the PR.
> thanks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Ohad Raviv (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv updated SPARK-41510:
---
Description: 
When working interactively with Spark through notebooks in various envs - 
Databricks/YARN I often encounter a very frustrating process of trying to add 
new python modules and even change their code without starting a new spark 
session/cluster.

In the driver side it is easy to add things like `sys.path.append()` but if for 
example, if a UDF code is importing a function from a local module, then the 
pickle boundaries will assume that the module exists in the workers, and fail 
on "python module does not exist..".

To update the code "online" I can add NFS volume to the workers' PYTHONPATH.

However, setting the PYTHONPATH in the workers is not easy as it gets 
overridden by someone (databricks/spark) along the way. a few ugly workarounds 
are suggested like running a "dummy" UDF on the workers to add the folder to 
the sys.path.

I think all of that could easily be solved if we just add a dedicated 
`spark.conf` the will get merged into the worker's PYTHONPATH, just here:

[https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]

 

please tell me what you think, and I will make the PR.

thanks.

 

 

  was:
When working interactively with Spark through notebooks in various envs - 
Databricks/YARN I often encounter a very frustrating process of trying to add 
new python modules and even change their code without starting a new spark 
session/cluster.

in the driver side it is easy to add things like `sys.path.append()` but if for 
example UDF code is importing function from a local module, then the pickle 
boundaries will assume that the module exists in the workers. and then I fail 
on "python module does not exist..".

adding NFS volumes to the workers PYTHONPATH could solve it, but it requires 
restarting the session/cluster and worse doesn't work in all envs as the 
PYTHONPATH gets overridden by someone (databricks/spark) along the way. a few 
ugly work around are suggested like running a "dummy" udf on workers to add the 
folder to the sys.path.

I think all of that could easily be solved if we add a spark.conf to add to the 
worker PYTHONPATH. here:

[https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]

 

please tell me what you think, and I will make the PR.

thanks.

 

 


> Support easy way for user defined PYTHONPATH in workers
> ---
>
> Key: SPARK-41510
> URL: https://issues.apache.org/jira/browse/SPARK-41510
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> When working interactively with Spark through notebooks in various envs - 
> Databricks/YARN I often encounter a very frustrating process of trying to add 
> new python modules and even change their code without starting a new spark 
> session/cluster.
> In the driver side it is easy to add things like `sys.path.append()` but if 
> for example, if a UDF code is importing a function from a local module, then 
> the pickle boundaries will assume that the module exists in the workers, and 
> fail on "python module does not exist..".
> To update the code "online" I can add NFS volume to the workers' PYTHONPATH.
> However, setting the PYTHONPATH in the workers is not easy as it gets 
> overridden by someone (databricks/spark) along the way. a few ugly 
> workarounds are suggested like running a "dummy" UDF on the workers to add 
> the folder to the sys.path.
> I think all of that could easily be solved if we just add a dedicated 
> `spark.conf` the will get merged into the worker's PYTHONPATH, just here:
> [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]
>  
> please tell me what you think, and I will make the PR.
> thanks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Ohad Raviv (Jira)
Ohad Raviv created SPARK-41510:
--

 Summary: Support easy way for user defined PYTHONPATH in workers
 Key: SPARK-41510
 URL: https://issues.apache.org/jira/browse/SPARK-41510
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.3.1
Reporter: Ohad Raviv


When working interactively with Spark through notebooks in various envs - 
Databricks/YARN I often encounter a very frustrating process of trying to add 
new python modules and even change their code without starting a new spark 
session/cluster.

in the driver side it is easy to add things like `sys.path.append()` but if for 
example UDF code is importing function from a local module, then the pickle 
boundaries will assume that the module exists in the workers. and then I fail 
on "python module does not exist..".

adding NFS volumes to the workers PYTHONPATH could solve it, but it requires 
restarting the session/cluster and worse doesn't work in all envs as the 
PYTHONPATH gets overridden by someone (databricks/spark) along the way. a few 
ugly work around are suggested like running a "dummy" udf on workers to add the 
folder to the sys.path.

I think all of that could easily be solved if we add a spark.conf to add to the 
worker PYTHONPATH. here:

[https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]

 

please tell me what you think, and I will make the PR.

thanks.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41277) Save and leverage shuffle key in tblproperties

2022-11-27 Thread Ohad Raviv (Jira)
Ohad Raviv created SPARK-41277:
--

 Summary: Save and leverage shuffle key in tblproperties
 Key: SPARK-41277
 URL: https://issues.apache.org/jira/browse/SPARK-41277
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.1
Reporter: Ohad Raviv


I'm not sure if I'm not missing anything trivial.

In a typical process, many datasets get materialized and many of them after a 
shuffle (e.g join). then they would again be involved in further actions and 
often use the same key.

Wouldn't it make sense to save the shuffle key along with the table to avoid 
unnecessary shuffles?

Also, the implementation seems quite straightforward - to just leverage the 
bucketing mechanism.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37752) Python UDF fails when it should not get evaluated

2021-12-27 Thread Ohad Raviv (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17465972#comment-17465972
 ] 

Ohad Raviv commented on SPARK-37752:


That is what I deduced. thanks for the answer!

> Python UDF fails when it should not get evaluated
> -
>
> Key: SPARK-37752
> URL: https://issues.apache.org/jira/browse/SPARK-37752
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.4
>Reporter: Ohad Raviv
>Priority: Minor
>
> Haven't checked on newer versions yet.
> If i define in Python:
> {code:java}
> def udf1(col1):
>     print(col1[2])
>     return "blah"
> spark.udf.register("udf1", udf1) {code}
> and then use it in SQL:
> {code:java}
> select case when length(c)>2 then udf1(c) end
> from (
>     select explode(array("123","234","12")) as c
> ) {code}
> it fails on:
> {noformat}
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, 
> in main
> process()
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, 
> in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 155, 
> in 
> func = lambda _, it: map(mapper, it)
>   File "", line 1, in 
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 76, in 
> 
> return lambda *a: f(*a)
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in 
> wrapper
> return f(*args, **kwargs)
>   File "", line 3, in udf1
> IndexError: string index out of range{noformat}
> Although in the out-of-range row it should not get evaluated at all as the 
> case-when filters for lengths of more than 2 letters.
> the same scenario works great when we define instead a Scala UDF.
> will check now if it happens also for newer versions.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37752) Python UDF fails when it should not get evaluated

2021-12-27 Thread Ohad Raviv (Jira)
Ohad Raviv created SPARK-37752:
--

 Summary: Python UDF fails when it should not get evaluated
 Key: SPARK-37752
 URL: https://issues.apache.org/jira/browse/SPARK-37752
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.4
Reporter: Ohad Raviv


Haven't checked on newer versions yet.

If i define in Python:
{code:java}
def udf1(col1):
    print(col1[2])
    return "blah"

spark.udf.register("udf1", udf1) {code}
and then use it in SQL:
{code:java}
select case when length(c)>2 then udf1(c) end
from (
    select explode(array("123","234","12")) as c
) {code}
it fails on:
{noformat}
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in 
main
process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in 
process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 155, in 

func = lambda _, it: map(mapper, it)
  File "", line 1, in 
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 76, in 

return lambda *a: f(*a)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in 
wrapper
return f(*args, **kwargs)
  File "", line 3, in udf1
IndexError: string index out of range{noformat}
Although in the out-of-range row it should not get evaluated at all as the 
case-when filters for lengths of more than 2 letters.

the same scenario works great when we define instead a Scala UDF.

will check now if it happens also for newer versions.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34416) Support avroSchemaUrl in addition to avroSchema

2021-02-10 Thread Ohad Raviv (Jira)
Ohad Raviv created SPARK-34416:
--

 Summary: Support avroSchemaUrl in addition to avroSchema
 Key: SPARK-34416
 URL: https://issues.apache.org/jira/browse/SPARK-34416
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0, 2.3.0, 3.2.0
Reporter: Ohad Raviv


We have a use case in which we read a huge table in Avro format. About 30k 
columns.

using the default Hive reader - `AvroGenericRecordReader` it is just hangs 
forever. after 4 hours not even one task has finished.

We tried instead to use 
`spark.read.format("com.databricks.spark.avro").load(..)` but we failed on:

```

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data 
schema

..

at 
org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:85)
 at 
org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:67)
 at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:421)
 at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
 ... 53 elided

```

 

because files schema contain duplicate column names (when considering 
case-insensitive).

So we wanted to provide a user schema with non-duplicated fields, but the 
schema is huge. a few MBs. it is not practical to provide it in json format.

 

So we patched spark-avro to be able to get also `avroSchemaUrl` in addition to 
`avroSchema` and it worked perfectly.

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30739) unable to turn off Hadoop's trash feature

2020-02-06 Thread Ohad Raviv (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17031437#comment-17031437
 ] 

Ohad Raviv commented on SPARK-30739:


Closing as I realized this is actually the documented behaviour 
[here|https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/core-default.xml].

_fs.trash.interval_

_Number of minutes between trash checkpoints. Should be smaller or equal to 
fs.trash.interval. If zero, the value is set to the value of fs.trash.interval. 
Every time the checkpointer runs it creates a new checkpoint out of current and 
removes checkpoints created more than fs.trash.interval minutes ago._

so decided to use the _fs.trash.classname_ approach.

> unable to turn off Hadoop's trash feature
> -
>
> Key: SPARK-30739
> URL: https://issues.apache.org/jira/browse/SPARK-30739
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> We're trying to turn off the `TrashPolicyDefault` in one of our Spark 
> applications by setting `spark.hadoop.fs.trash.interval=0`, but it just stays 
> `360` as configured in our cluster's `core-site.xml`.
> Trying to debug it we managed to set 
> `spark.hadoop.fs.trash.classname=OtherTrashPolicy` and it worked. the main 
> difference seems to be that `spark.hadoop.fs.trash.classname` does not appear 
> in any of the `*-site.xml` files.
> when we print the conf that get initialized in `TrashPolicyDefault` we get:
> ```
> Configuration: core-default.xml, core-site.xml, yarn-default.xml, 
> yarn-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, 
> hdfs-site.xml, 
> org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@561f0431, 
> file:/hadoop03/yarn/local/usercache/.../hive-site.xml
> ```
> and:
> `fs.trash.interval=360 [programatically]`
> `fs.trash.classname=OtherTrashPolicy [programatically]`
>  
> any idea why `fs.trash.classname` works but `fs.trash.interval` doesn't?
> this seems maybe related to: -SPARK-9825.-
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-30739) unable to turn off Hadoop's trash feature

2020-02-06 Thread Ohad Raviv (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv resolved SPARK-30739.

Resolution: Workaround

> unable to turn off Hadoop's trash feature
> -
>
> Key: SPARK-30739
> URL: https://issues.apache.org/jira/browse/SPARK-30739
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> We're trying to turn off the `TrashPolicyDefault` in one of our Spark 
> applications by setting `spark.hadoop.fs.trash.interval=0`, but it just stays 
> `360` as configured in our cluster's `core-site.xml`.
> Trying to debug it we managed to set 
> `spark.hadoop.fs.trash.classname=OtherTrashPolicy` and it worked. the main 
> difference seems to be that `spark.hadoop.fs.trash.classname` does not appear 
> in any of the `*-site.xml` files.
> when we print the conf that get initialized in `TrashPolicyDefault` we get:
> ```
> Configuration: core-default.xml, core-site.xml, yarn-default.xml, 
> yarn-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, 
> hdfs-site.xml, 
> org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@561f0431, 
> file:/hadoop03/yarn/local/usercache/.../hive-site.xml
> ```
> and:
> `fs.trash.interval=360 [programatically]`
> `fs.trash.classname=OtherTrashPolicy [programatically]`
>  
> any idea why `fs.trash.classname` works but `fs.trash.interval` doesn't?
> this seems maybe related to: -SPARK-9825.-
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30739) unable to turn off Hadoop's trash feature

2020-02-05 Thread Ohad Raviv (Jira)
Ohad Raviv created SPARK-30739:
--

 Summary: unable to turn off Hadoop's trash feature
 Key: SPARK-30739
 URL: https://issues.apache.org/jira/browse/SPARK-30739
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Ohad Raviv


We're trying to turn off the `TrashPolicyDefault` in one of our Spark 
applications by setting `spark.hadoop.fs.trash.interval=0`, but it just stays 
`360` as configured in our cluster's `core-site.xml`.

Trying to debug it we managed to set 
`spark.hadoop.fs.trash.classname=OtherTrashPolicy` and it worked. the main 
difference seems to be that `spark.hadoop.fs.trash.classname` does not appear 
in any of the `*-site.xml` files.

when we print the conf that get initialized in `TrashPolicyDefault` we get:

```

Configuration: core-default.xml, core-site.xml, yarn-default.xml, 
yarn-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, 
hdfs-site.xml, 
org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@561f0431, 
file:/hadoop03/yarn/local/usercache/.../hive-site.xml

```

and:

`fs.trash.interval=360 [programatically]`

`fs.trash.classname=OtherTrashPolicy [programatically]`

 

any idea why `fs.trash.classname` works but `fs.trash.interval` doesn't?

this seems maybe related to: -SPARK-9825.-

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18748) UDF multiple evaluations causes very poor performance

2019-05-22 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845559#comment-16845559
 ] 

Ohad Raviv commented on SPARK-18748:


[~kelemen] - thanks for sharing.

> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18748
> URL: https://issues.apache.org/jira/browse/SPARK-18748
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-16820) Sparse - Sparse matrix multiplication

2019-05-22 Thread Ohad Raviv (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv closed SPARK-16820.
--

resolved.

> Sparse - Sparse matrix multiplication
> -
>
> Key: SPARK-16820
> URL: https://issues.apache.org/jira/browse/SPARK-16820
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 2.0.0
>Reporter: Ohad Raviv
>Priority: Major
>  Labels: bulk-closed
>
> While working on MCL implementation on Spark we have encountered some 
> difficulties.
> The main part of this process is distributed sparse matrix multiplication 
> that has two main steps:
> 1.Simulate multiply – preparation before the real multiplication in order 
> to see which blocks should be multiplied.
> 2.The actual blocks multiplication and summation.
> In our case the sparse matrix has 50M rows and columns, and 2B non-zeros.
> The current multiplication suffers from these issues:
> 1.A relatively trivial bug already fixed in the first step the caused the 
> process to be very slow [SPARK-16469]
> 2.Still after the bug fix, if we have too many blocks the Simulate 
> multiply will take very long time and will multiply the data many times. 
> (O(n^3) where n is the number of blocks)
> 3.Spark supports only multiplication with Dense matrices. Thus, it 
> converts a Sparse matrix into a dense matrix before the multiplication.
> 4.For summing the intermediate block results Spark uses Breeze’s CSC 
> matrix operations – here the problem is that it is very inefficient to update 
> a CSC matrix in a zero value.
> That means that with many blocks (default block size is 1024) – in our case 
> 50M/1024 ~= 50K, the simulate multiply will effectively never finish or will 
> generate 50K*16GB ~= 1000TB of data. On the other hand, if we use bigger 
> block size e.g. 100k we get OutOfMemoryException in the “toDense” method of 
> the multiply. We have worked around that by implementing our-selves both the 
> Sparse multiplication and addition in a very naïve way – but at least it 
> works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16820) Sparse - Sparse matrix multiplication

2019-05-22 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845557#comment-16845557
 ] 

Ohad Raviv commented on SPARK-16820:


 this issue was resolved by SPARK-19368 and SPARK-16469.

> Sparse - Sparse matrix multiplication
> -
>
> Key: SPARK-16820
> URL: https://issues.apache.org/jira/browse/SPARK-16820
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 2.0.0
>Reporter: Ohad Raviv
>Priority: Major
>  Labels: bulk-closed
>
> While working on MCL implementation on Spark we have encountered some 
> difficulties.
> The main part of this process is distributed sparse matrix multiplication 
> that has two main steps:
> 1.Simulate multiply – preparation before the real multiplication in order 
> to see which blocks should be multiplied.
> 2.The actual blocks multiplication and summation.
> In our case the sparse matrix has 50M rows and columns, and 2B non-zeros.
> The current multiplication suffers from these issues:
> 1.A relatively trivial bug already fixed in the first step the caused the 
> process to be very slow [SPARK-16469]
> 2.Still after the bug fix, if we have too many blocks the Simulate 
> multiply will take very long time and will multiply the data many times. 
> (O(n^3) where n is the number of blocks)
> 3.Spark supports only multiplication with Dense matrices. Thus, it 
> converts a Sparse matrix into a dense matrix before the multiplication.
> 4.For summing the intermediate block results Spark uses Breeze’s CSC 
> matrix operations – here the problem is that it is very inefficient to update 
> a CSC matrix in a zero value.
> That means that with many blocks (default block size is 1024) – in our case 
> 50M/1024 ~= 50K, the simulate multiply will effectively never finish or will 
> generate 50K*16GB ~= 1000TB of data. On the other hand, if we use bigger 
> block size e.g. 100k we get OutOfMemoryException in the “toDense” method of 
> the multiply. We have worked around that by implementing our-selves both the 
> Sparse multiplication and addition in a very naïve way – but at least it 
> works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27707) Performance issue using explode

2019-05-14 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839544#comment-16839544
 ] 

Ohad Raviv commented on SPARK-27707:


[~cloud_fan] - any chance you can take a look?

> Performance issue using explode
> ---
>
> Key: SPARK-27707
> URL: https://issues.apache.org/jira/browse/SPARK-27707
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0, 2.4.3
>Reporter: Ohad Raviv
>Priority: Major
>
> this is a corner case of SPARK-21657.
> we have a case where we want to explode array inside a struct and also keep 
> some other columns of the struct. we again encounter a huge performance issue.
> reconstruction code:
> {code}
> val df = spark.sparkContext.parallelize(Seq(("1",
>   Array.fill(M)({
> val i = math.random
> (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
>   }.toDF("col", "arr")
>   .selectExpr("col", "struct(col, arr) as st")
>   .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col")
> df.write.mode("overwrite").save("/tmp/blah")
> {code}
> a workaround is projecting before the explode:
> {code}
> val df = spark.sparkContext.parallelize(Seq(("1",
>   Array.fill(M)({
> val i = math.random
> (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
>   }.toDF("col", "arr")
>   .selectExpr("col", "struct(col, arr) as st")
>   .withColumn("col1", $"st.col")
>   .selectExpr("col", "col1", "explode(st.arr) as arr_col")
> df.write.mode("overwrite").save("/tmp/blah")
> {code}
> in this case the optimization done in SPARK-21657:
> {code}
> // prune unrequired references
> case p @ Project(_, g: Generate) if p.references != g.outputSet =>
>   val requiredAttrs = p.references -- g.producedAttributes ++ 
> g.generator.references
>   val newChild = prunedChild(g.child, requiredAttrs)
>   val unrequired = g.generator.references -- p.references
>   val unrequiredIndices = newChild.output.zipWithIndex.filter(t => 
> unrequired.contains(t._1))
> .map(_._2)
>   p.copy(child = g.copy(child = newChild, unrequiredChildIndex = 
> unrequiredIndices))
> {code}
> doesn't work because `p.references` has whole the `st` struct as reference 
> and not just the projected field.
> this causes the entire struct including the huge array field to get 
> duplicated as the number of array elements.
> I know this is kind of a corner case but was really non trivial to 
> understand..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27707) Performance issue using explode

2019-05-14 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-27707:
--

 Summary: Performance issue using explode
 Key: SPARK-27707
 URL: https://issues.apache.org/jira/browse/SPARK-27707
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.3, 3.0.0
Reporter: Ohad Raviv


this is a corner case of SPARK-21657.
we have a case where we want to explode array inside a struct and also keep 
some other columns of the struct. we again encounter a huge performance issue.
reconstruction code:
{code}
val df = spark.sparkContext.parallelize(Seq(("1",
  Array.fill(M)({
val i = math.random
(i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
  }.toDF("col", "arr")
  .selectExpr("col", "struct(col, arr) as st")
  .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col")

df.write.mode("overwrite").save("/tmp/blah")
{code}

a workaround is projecting before the explode:
{code}
val df = spark.sparkContext.parallelize(Seq(("1",
  Array.fill(M)({
val i = math.random
(i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
  }.toDF("col", "arr")
  .selectExpr("col", "struct(col, arr) as st")
  .withColumn("col1", $"st.col")
  .selectExpr("col", "col1", "explode(st.arr) as arr_col")

df.write.mode("overwrite").save("/tmp/blah")
{code}

in this case the optimization done in SPARK-21657:
{code}
// prune unrequired references
case p @ Project(_, g: Generate) if p.references != g.outputSet =>
  val requiredAttrs = p.references -- g.producedAttributes ++ 
g.generator.references
  val newChild = prunedChild(g.child, requiredAttrs)
  val unrequired = g.generator.references -- p.references
  val unrequiredIndices = newChild.output.zipWithIndex.filter(t => 
unrequired.contains(t._1))
.map(_._2)
  p.copy(child = g.copy(child = newChild, unrequiredChildIndex = 
unrequiredIndices))
{code}

doesn't work because `p.references` has whole the `st` struct as reference and 
not just the projected field.
this causes the entire struct including the huge array field to get duplicated 
as the number of array elements.

I know this is kind of a corner case but was really non trivial to understand..




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18748) UDF multiple evaluations causes very poor performance

2019-03-22 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16799163#comment-16799163
 ] 

Ohad Raviv commented on SPARK-18748:


[~nimfadora] - thanks, we actually also ended up using this workaround. 
However, that's really not a good long term solution. We also tried to disable 
wholestage code generation and that also works.

 

> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18748
> URL: https://issues.apache.org/jira/browse/SPARK-18748
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26645) CSV infer schema bug infers decimal(9,-1)

2019-01-17 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-26645:
--

 Summary: CSV infer schema bug infers decimal(9,-1)
 Key: SPARK-26645
 URL: https://issues.apache.org/jira/browse/SPARK-26645
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Ohad Raviv


we have a file /tmp/t1/file.txt that contains only one line "1.18927098E9".
running:
{code:python}
df = spark.read.csv('/tmp/t1', header=False, inferSchema=True, sep='\t')
print df.dtypes
{code}

causes:
{noformat}
ValueError: Could not parse datatype: decimal(9,-1)
{noformat}

I'm not sure where the bug is - inferSchema or dtypes?
I saw it is legal to have a decimal with negative scale in the code 
(CSVInferSchema.scala):
{code:python}
if (bigDecimal.scale <= 0) {
// `DecimalType` conversion can fail when
//   1. The precision is bigger than 38.
//   2. scale is bigger than precision.
DecimalType(bigDecimal.precision, bigDecimal.scale)
  } 
{code}
but what does it mean?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18748) UDF multiple evaluations causes very poor performance

2019-01-06 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735492#comment-16735492
 ] 

Ohad Raviv commented on SPARK-18748:


We're encountering this same problem once again with Spark structrued streaming.
the typical way to read and parse is something like:

{code:java}
 spark.read
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", topic)
.load()
.select(parsingUDF(col("value")).as("parsed_struct"))
.selectExpr("parsed_struct.*")
{code}

and the ".*" expansion causes the udf to run as many times as the number of 
columns in the struct. we typicallly have dosens of columns meaning dosens of 
parses per incoming message.
here we can't use any of the bypass solutions mentioned above and in 
SPARK-17728 as ".cache" and ".rdd" are unusupported operations on structured 
streaming dataframe.
[~cloud_fan],[~hvanhovell] - maybe you have an idea for a workaround also in 
the case of streaming?



> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18748
> URL: https://issues.apache.org/jira/browse/SPARK-18748
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18748) UDF multiple evaluations causes very poor performance

2019-01-06 Thread Ohad Raviv (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv updated SPARK-18748:
---
Affects Version/s: (was: 1.6.1)
   2.3.0
   2.4.0

> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18748
> URL: https://issues.apache.org/jira/browse/SPARK-18748
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26070) another implicit type coercion bug

2018-11-15 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-26070:
--

 Summary: another implicit type coercion bug
 Key: SPARK-26070
 URL: https://issues.apache.org/jira/browse/SPARK-26070
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0, 2.3.0
Reporter: Ohad Raviv


looks like SPARK-22469 introduced a major bug to our system:
{code}
spark.sql("select '' = BD").show()
spark.sql("select '2224' = 2223BD").show()
{code}
which results:
{noformat}
+-+
|(CAST( AS DOUBLE) = CAST( AS DOUBLE))|
+-+
| true|
+-+

+-+
|(CAST(2224 AS DOUBLE) = CAST(2223 AS DOUBLE))|
+-+
| true|
+-+
{noformat}
this causes downstream transformations to join together unrelated rows just 
because their ids are close.
[~cloud_fan],[~liutang123] - could you please explain futher this remark:
{noformat}
// There is no proper decimal type we can pick,
// using double type is the best we can do.
// See SPARK-22469 for details.
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25963) Optimize generate followed by window

2018-11-07 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-25963:
--

 Summary: Optimize generate followed by window
 Key: SPARK-25963
 URL: https://issues.apache.org/jira/browse/SPARK-25963
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0, 2.4.0
Reporter: Ohad Raviv


We've noticed that for our use-cases when we have explode followed by a window 
function we can almost always optimize it by adding repartition by the windows' 
partition before the explode.

for example:
{code:java}
import org.apache.spark.sql.functions._
val N = 1 << 12

spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")

val tokens = spark.range(N).selectExpr(
"floor(id/4) as key", "'asd/asd/asd/asd/asd/asd' as url")
// .repartition("cust_id")
.selectExpr("*", "explode(split(url, '/')) as token")

import org.apache.spark.sql.expressions._

val w = Window.partitionBy("key", "token")
val res = tokens.withColumn("cnt", count("token").over(w))

res.explain(true)
{code}

{noformat}
== Optimized Logical Plan ==
Window [count(token#11) windowspecdefinition(key#6L, token#11, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS cnt#17L], [key#6L, token#11]
+- Generate explode([asd,asd,asd,asd,asd,asd]), false, [token#11]
   +- Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L, 
asd/asd/asd/asd/asd/asd AS url#7]
  +- Range (0, 4096, step=1, splits=Some(1))
{noformat}

currently all the data will be exploded in the first stage, then shuffled and 
then aggregated.
we can achieve exactly the same computation if we first shuffle the data and in 
the second stage explode and aggregate.

I have a PR that tries to resolve this. I'm just not sure I thought about all 
the cases.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25951) Redundant shuffle if column is renamed

2018-11-06 Thread Ohad Raviv (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv updated SPARK-25951:
---
Description: 
we've noticed that sometimes a column rename causes extra shuffle:
{code:java}
val N = 1 << 12

spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")

val t1 = spark.range(N).selectExpr("floor(id/4) as key1")
val t2 = spark.range(N).selectExpr("floor(id/4) as key2")

import org.apache.spark.sql.functions._
t1.groupBy("key1").agg(count(lit("1")).as("cnt1"))
.join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2",
 "key3"),
col("key1")===col("key3"))
.explain()
{code}
results in:
{noformat}
== Physical Plan ==
*(6) SortMergeJoin [key1#6L], [key3#22L], Inner
:- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0
:  +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], output=[key1#6L, 
cnt1#14L])
: +- Exchange hashpartitioning(key1#6L, 2)
:+- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], 
output=[key1#6L, count#39L])
:   +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L]
:  +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0)))
: +- *(1) Range (0, 4096, step=1, splits=1)
+- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key3#22L, 2)
  +- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], 
output=[key3#22L, cnt2#19L])
 +- Exchange hashpartitioning(key2#10L, 2)
+- *(3) HashAggregate(keys=[key2#10L], 
functions=[partial_count(1)], output=[key2#10L, count#41L])
   +- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS 
key2#10L]
  +- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 4.0)))
 +- *(3) Range (0, 4096, step=1, splits=1)
{noformat}
I was able to track it down to this code in class HashPartitioning:
{code:java}
case h: HashClusteredDistribution =>
  expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
  case (l, r) => l.semanticEquals(r)
 }
{code}
the semanticEquals returns false as it compares key2 and key3 eventhough key3 
is just a rename of key2

  was:
we've noticed that sometimes a column rename causes extra shuffle:
{code}
val N = 1 << 12

spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")

val t1 = spark.range(N).selectExpr("floor(id/4) as key1")
val t2 = spark.range(N).selectExpr("floor(id/4) as key2")

t1.groupBy("key1").agg(count(lit("1")).as("cnt1"))
.join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2",
 "key3"),
col("key1")===col("key3"))
.explain()
{code}

results in:

{noformat}
== Physical Plan ==
*(6) SortMergeJoin [key1#6L], [key3#22L], Inner
:- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0
:  +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], output=[key1#6L, 
cnt1#14L])
: +- Exchange hashpartitioning(key1#6L, 2)
:+- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], 
output=[key1#6L, count#39L])
:   +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L]
:  +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0)))
: +- *(1) Range (0, 4096, step=1, splits=1)
+- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key3#22L, 2)
  +- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], 
output=[key3#22L, cnt2#19L])
 +- Exchange hashpartitioning(key2#10L, 2)
+- *(3) HashAggregate(keys=[key2#10L], 
functions=[partial_count(1)], output=[key2#10L, count#41L])
   +- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS 
key2#10L]
  +- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 4.0)))
 +- *(3) Range (0, 4096, step=1, splits=1)
{noformat}
I was able to track it down to this code in class HashPartitioning:
{code}
case h: HashClusteredDistribution =>
  expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
  case (l, r) => l.semanticEquals(r)
 }
{code}
the semanticEquals returns false as it compares key2 and key3 eventhough key3 
is just a rename of key2


> Redundant shuffle if column is renamed
> --
>
> Key: SPARK-25951
> URL: https://issues.apache.org/jira/browse/SPARK-25951
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> we've noticed that sometimes a column rename causes extra shuffle:
> {code:java}
> val N = 1 << 12
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")
> val t1 = spark.range(N).selectExpr("floor(id/4) as key1")
> val t2 = spark.range(N).selectExpr("floor(id/4) as key2")
> import org.apache.spark.sql.functions._

[jira] [Updated] (SPARK-25951) Redundant shuffle if column is renamed

2018-11-06 Thread Ohad Raviv (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv updated SPARK-25951:
---
Description: 
we've noticed that sometimes a column rename causes extra shuffle:
{code}
val N = 1 << 12

spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")

val t1 = spark.range(N).selectExpr("floor(id/4) as key1")
val t2 = spark.range(N).selectExpr("floor(id/4) as key2")

t1.groupBy("key1").agg(count(lit("1")).as("cnt1"))
.join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2",
 "key3"),
col("key1")===col("key3"))
.explain()
{code}

results in:

{noformat}
== Physical Plan ==
*(6) SortMergeJoin [key1#6L], [key3#22L], Inner
:- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0
:  +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], output=[key1#6L, 
cnt1#14L])
: +- Exchange hashpartitioning(key1#6L, 2)
:+- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], 
output=[key1#6L, count#39L])
:   +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L]
:  +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0)))
: +- *(1) Range (0, 4096, step=1, splits=1)
+- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key3#22L, 2)
  +- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], 
output=[key3#22L, cnt2#19L])
 +- Exchange hashpartitioning(key2#10L, 2)
+- *(3) HashAggregate(keys=[key2#10L], 
functions=[partial_count(1)], output=[key2#10L, count#41L])
   +- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS 
key2#10L]
  +- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 4.0)))
 +- *(3) Range (0, 4096, step=1, splits=1)
{noformat}
I was able to track it down to this code in class HashPartitioning:
{code}
case h: HashClusteredDistribution =>
  expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
  case (l, r) => l.semanticEquals(r)
 }
{code}
the semanticEquals returns false as it compares key2 and key3 eventhough key3 
is just a rename of key2

  was:
we've noticed that sometimes a column rename causes extra shuffle:
{code}
val N = 1 << 12

spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")

val t1 = spark.range(N).selectExpr("floor(id/4) as key1")
val t2 = spark.range(N).selectExpr("floor(id/4) as key2")

t1.groupBy("key1").agg(count(lit("1")).as("cnt1"))
.join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2",
 "key3"),
col("key1")===col("key3"))
.explain(true)
{code}

results in:

{code}
== Physical Plan ==
*(6) SortMergeJoin [key1#6L], [key3#22L], Inner
:- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0
: +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], output=[key1#6L, 
cnt1#14L])
: +- Exchange hashpartitioning(key1#6L, 2)
: +- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], 
output=[key1#6L, count#39L])
: +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L]
: +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0)))
: +- *(1) Range (0, 4096, step=1, splits=1)
+- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key3#22L, 2)
+- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], output=[key3#22L, 
cnt2#19L])
+- Exchange hashpartitioning(key2#10L, 2)
+- *(3) HashAggregate(keys=[key2#10L], functions=[partial_count(1)], 
output=[key2#10L, count#41L])
+- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS key2#10L]
+- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 4.0)))
+- *(3) Range (0, 4096, step=1, splits=1)
{code}
I was able to track it down to this code in class HashPartitioning:
{code}
case h: HashClusteredDistribution =>
  expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
  case (l, r) => l.semanticEquals(r)
 }
{code}
the semanticEquals returns false as it compares key2 and key3 eventhough key3 
is just a rename of key2


> Redundant shuffle if column is renamed
> --
>
> Key: SPARK-25951
> URL: https://issues.apache.org/jira/browse/SPARK-25951
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> we've noticed that sometimes a column rename causes extra shuffle:
> {code}
> val N = 1 << 12
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")
> val t1 = spark.range(N).selectExpr("floor(id/4) as key1")
> val t2 = spark.range(N).selectExpr("floor(id/4) as key2")
> t1.groupBy("key1").agg(count(lit("1")).as("cnt1"))
> .join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2",
>  "key3"),
> col("key1")===col("key3"))
> .explain()
> {code}
> results in:
> {noformat}
> == 

[jira] [Created] (SPARK-25951) Redundant shuffle if column is renamed

2018-11-06 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-25951:
--

 Summary: Redundant shuffle if column is renamed
 Key: SPARK-25951
 URL: https://issues.apache.org/jira/browse/SPARK-25951
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Ohad Raviv


we've noticed that sometimes a column rename causes extra shuffle:
{code}
val N = 1 << 12

spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")

val t1 = spark.range(N).selectExpr("floor(id/4) as key1")
val t2 = spark.range(N).selectExpr("floor(id/4) as key2")

t1.groupBy("key1").agg(count(lit("1")).as("cnt1"))
.join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2",
 "key3"),
col("key1")===col("key3"))
.explain(true)
{code}

results in:

{code}
== Physical Plan ==
*(6) SortMergeJoin [key1#6L], [key3#22L], Inner
:- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0
: +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], output=[key1#6L, 
cnt1#14L])
: +- Exchange hashpartitioning(key1#6L, 2)
: +- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], 
output=[key1#6L, count#39L])
: +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L]
: +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0)))
: +- *(1) Range (0, 4096, step=1, splits=1)
+- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key3#22L, 2)
+- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], output=[key3#22L, 
cnt2#19L])
+- Exchange hashpartitioning(key2#10L, 2)
+- *(3) HashAggregate(keys=[key2#10L], functions=[partial_count(1)], 
output=[key2#10L, count#41L])
+- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS key2#10L]
+- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 4.0)))
+- *(3) Range (0, 4096, step=1, splits=1)
{code}
I was able to track it down to this code in class HashPartitioning:
{code}
case h: HashClusteredDistribution =>
  expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
  case (l, r) => l.semanticEquals(r)
 }
{code}
the semanticEquals returns false as it compares key2 and key3 eventhough key3 
is just a rename of key2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-09-24 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625448#comment-16625448
 ] 

Ohad Raviv edited comment on SPARK-23985 at 9/24/18 7:15 AM:
-

{quote}You should move where("a>'1'") before withColumn:
{quote}
this is exactly the issue I've opened.
 the Optimizer should understand this on its own.

 

I understand my original mistake in the example, and have changed it. try now.


was (Author: uzadude):
{quote}You should move where("a>'1'") before withColumn:{quote}

this is exactly the issue I've opened.
the Optimizer should understand this on its own.

> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select * from (
>select *, row_number() over (partition by a order by b) from t1
> )z 
> where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select * from (
>select *, row_number() over (partition by concat(a,'lit') order by b) from 
> t1
> )z 
> where a>1
> {code}
>  
>  I added a test to FilterPushdownSuite which I think recreates the problem:
> {code}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-09-24 Thread Ohad Raviv (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv updated SPARK-23985:
---
Description: 
while predicate push down works with this query: 
{code:sql}
select * from (
   select *, row_number() over (partition by a order by b) from t1
)z 
where a>1
{code}
it dowsn't work with:
{code:sql}
select * from (
   select *, row_number() over (partition by concat(a,'lit') order by b) from t1
)z 
where a>1
{code}
 
 I added a test to FilterPushdownSuite which I think recreates the problem:
{code}
  test("Window: predicate push down -- ohad") {
val winExpr = windowExpr(count('b),
  windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))

val originalQuery = testRelation.select('a, 'b, 'c, 
winExpr.as('window)).where('a > 1)
val correctAnswer = testRelation
  .where('a > 1).select('a, 'b, 'c)
  .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
  .select('a, 'b, 'c, 'window).analyze

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
  }
{code}
will try to create a PR with a correction

  was:
while predicate push down works with this query: 
{code:sql}
select *, row_number() over (partition by a order by b) from t1 where a>1
{code}
it dowsn't work with:
{code:sql}
select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
where a>1
{code}
 
I added a test to FilterPushdownSuite which I think recreates the problem:
{code:scala}
  test("Window: predicate push down -- ohad") {
val winExpr = windowExpr(count('b),
  windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))

val originalQuery = testRelation.select('a, 'b, 'c, 
winExpr.as('window)).where('a > 1)
val correctAnswer = testRelation
  .where('a > 1).select('a, 'b, 'c)
  .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
  .select('a, 'b, 'c, 'window).analyze

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
  }
{code}

will try to create a PR with a correction



> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select * from (
>select *, row_number() over (partition by a order by b) from t1
> )z 
> where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select * from (
>select *, row_number() over (partition by concat(a,'lit') order by b) from 
> t1
> )z 
> where a>1
> {code}
>  
>  I added a test to FilterPushdownSuite which I think recreates the problem:
> {code}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-09-24 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625448#comment-16625448
 ] 

Ohad Raviv commented on SPARK-23985:


{quote}You should move where("a>'1'") before withColumn:{quote}

this is exactly the issue I've opened.
the Optimizer should understand this on its own.

> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select *, row_number() over (partition by a order by b) from t1 where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
> where a>1
> {code}
>  
> I added a test to FilterPushdownSuite which I think recreates the problem:
> {code:scala}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-09-24 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625436#comment-16625436
 ] 

Ohad Raviv edited comment on SPARK-23985 at 9/24/18 7:07 AM:
-

the same is true for Spark 2.4:
{code}
sparkSession.range(10).selectExpr("cast(id as string) as a", "id as b", 
"id").write.saveAsTable("t1")
val w = sparkSession.sql(
  "select *, row_number() over (partition by concat(a,'lit') order by b) from 
t1 where a>'1'")
w.explain

val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b")
sparkSession.table("t1").withColumn("d", row_number() over windowSpec)
  .where("a>'1'")
  .explain
{code}
plans:
{noformat}
== Physical Plan ==
*(3) Project [a#11, b#12L, id#13L, row_number() OVER (PARTITION BY concat(a, 
lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW)#22]
+- Window [row_number() windowspecdefinition(_w0#23, b#12L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22], [_w0#23], [b#12L ASC NULLS 
FIRST]
   +- *(2) Sort [_w0#23 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(_w0#23, 1)
 +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#23]
+- *(1) Filter (isnotnull(a#11) && (a#11 > 1))
   +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], 
PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: 
struct


== Physical Plan ==
*(3) Project [a#11, b#12L, id#13L, d#28]
+- *(3) Filter (isnotnull(a#11) && (a#11 > 1))
   +- Window [row_number() windowspecdefinition(_w0#29, b#12L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#28], 
[_w0#29], [b#12L ASC NULLS FIRST]
  +- *(2) Sort [_w0#29 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(_w0#29, 1)
+- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#29]
   +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], 
PushedFilters: [], ReadSchema: struct
{noformat}


was (Author: uzadude):
the same is true for Spark 2.4:
{code}
sparkSession.range(10).selectExpr("cast(id as string) as a", "id as b", 
"id").write.saveAsTable("t1")
val w = sparkSession.sql(
  "select *, row_number() over (partition by concat(a,'lit') order by b) from 
t1 where a>'1'")
w.explain

val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b")
sparkSession.table("t1").withColumn("d", row_number() over windowSpec)
  .where("a>'1'")
  .explain
{code}
plans:
{code}
== Physical Plan ==
*(3) Project [a#11, b#12L, id#13L, row_number() OVER (PARTITION BY concat(a, 
lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW)#22]
+- Window [row_number() windowspecdefinition(_w0#23, b#12L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22], [_w0#23], [b#12L ASC NULLS 
FIRST]
   +- *(2) Sort [_w0#23 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(_w0#23, 1)
 +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#23]
+- *(1) Filter (isnotnull(a#11) && (a#11 > 1))
   +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], 
PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: 
struct


== Physical Plan ==
*(3) Project [a#11, b#12L, id#13L, d#28]
+- *(3) Filter (isnotnull(a#11) && (a#11 > 1))
   +- Window [row_number() windowspecdefinition(_w0#29, b#12L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#28], 
[_w0#29], [b#12L ASC NULLS FIRST]
  +- *(2) Sort [_w0#29 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(_w0#29, 1)
+- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#29]
   +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], 
PushedFilters: [], ReadSchema: struct
{code}

> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> 

[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-09-24 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625436#comment-16625436
 ] 

Ohad Raviv commented on SPARK-23985:


the same is true for Spark 2.4:
{code}
sparkSession.range(10).selectExpr("cast(id as string) as a", "id as b", 
"id").write.saveAsTable("t1")
val w = sparkSession.sql(
  "select *, row_number() over (partition by concat(a,'lit') order by b) from 
t1 where a>'1'")
w.explain

val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b")
sparkSession.table("t1").withColumn("d", row_number() over windowSpec)
  .where("a>'1'")
  .explain
{code}
plans:
{code}
== Physical Plan ==
*(3) Project [a#11, b#12L, id#13L, row_number() OVER (PARTITION BY concat(a, 
lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW)#22]
+- Window [row_number() windowspecdefinition(_w0#23, b#12L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22], [_w0#23], [b#12L ASC NULLS 
FIRST]
   +- *(2) Sort [_w0#23 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(_w0#23, 1)
 +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#23]
+- *(1) Filter (isnotnull(a#11) && (a#11 > 1))
   +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], 
PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: 
struct


== Physical Plan ==
*(3) Project [a#11, b#12L, id#13L, d#28]
+- *(3) Filter (isnotnull(a#11) && (a#11 > 1))
   +- Window [row_number() windowspecdefinition(_w0#29, b#12L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#28], 
[_w0#29], [b#12L ASC NULLS FIRST]
  +- *(2) Sort [_w0#29 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(_w0#29, 1)
+- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#29]
   +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], 
PushedFilters: [], ReadSchema: struct
{code}

> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select *, row_number() over (partition by a order by b) from t1 where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
> where a>1
> {code}
>  
> I added a test to FilterPushdownSuite which I think recreates the problem:
> {code:scala}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-09-24 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625422#comment-16625422
 ] 

Ohad Raviv commented on SPARK-23985:


you're right. that's very strange. looks like something got lost in 
translation. 
when I'm running you're example (which is actually mine..) indeed I get the 
right plan. However, if I try my original code it is still the un-optimized 
plan (with Spark 2.3):
{code}
import org.apache.spark.sql.functions._
spark.range(10).selectExpr(
  "cast(id as string) a",
  "id as b").write.saveAsTable("t1")
val windowSpec = Window.partitionBy(concat(col("a"), 
lit("lit"))).orderBy("b")
spark.table("t1").withColumn("d", row_number() over windowSpec)
  .where("a>'1'")
  .explain
{code}
{code}
== Physical Plan ==
*(3) Project [a#8, b#9L, d#13]
+- *(3) Filter (isnotnull(a#8) && (a#8 > 1))
   +- Window [row_number() windowspecdefinition(_w0#14, b#9L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#13], 
[_w0#14], [b#9L ASC NULLS FIRST]
  +- *(2) Sort [_w0#14 ASC NULLS FIRST, b#9L ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(_w0#14, 2)
+- *(1) Project [a#8, b#9L, concat(a#8, lit) AS _w0#14]
   +- *(1) FileScan parquet unitest.t1[a#8,b#9L] Batched: true, 
Format: Parquet, Location: InMemoryFileIndex[../t1], PartitionFilters: [], 
PushedFilters: [], ReadSchema: struct
{code}
can you understand the diff?

 

> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select *, row_number() over (partition by a order by b) from t1 where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
> where a>1
> {code}
>  
> I added a test to FilterPushdownSuite which I think recreates the problem:
> {code:scala}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table

2018-07-08 Thread Ohad Raviv (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv updated SPARK-24528:
---
Description: 
https://issues.apache.org/jira/browse/SPARK-24528#Closely related to  
SPARK-24410, we're trying to optimize a very common use case we have of getting 
the most updated row by id from a fact table.

We're saving the table bucketed to skip the shuffle stage, but we're still 
"waste" time on the Sort operator evethough the data is already sorted.

here's a good example:
{code:java}
sparkSession.range(N).selectExpr(
  "id as key",
  "id % 2 as t1",
  "id % 3 as t2")
.repartition(col("key"))
.write
  .mode(SaveMode.Overwrite)
.bucketBy(3, "key")
.sortBy("key", "t1")
.saveAsTable("a1"){code}
{code:java}
sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain

== Physical Plan ==
SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
key#24L, t1, t1#25L, t2, t2#26L))])
+- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, t1#25L, 
key, key#24L, t1, t1#25L, t2, t2#26L))])
+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
Format: Parquet, Location: ...{code}
 

and here's a bad example, but more realistic:
{code:java}
sparkSession.sql("set spark.sql.shuffle.partitions=2")
sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain

== Physical Plan ==
SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
key#32L, t1, t1#33L, t2, t2#34L))])
+- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, t1#33L, 
key, key#32L, t1, t1#33L, t2, t2#34L))])
+- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
+- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
Format: Parquet, Location: ...

{code}
 

I've traced the problem to DataSourceScanExec#235:
{code:java}
val sortOrder = if (sortColumns.nonEmpty) {
  // In case of bucketing, its possible to have multiple files belonging to the
  // same bucket in a given relation. Each of these files are locally sorted
  // but those files combined together are not globally sorted. Given that,
  // the RDD partition will not be sorted even if the relation has sort columns 
set
  // Current solution is to check if all the buckets have a single file in it

  val files = selectedPartitions.flatMap(partition => partition.files)
  val bucketToFilesGrouping =
files.map(_.getPath.getName).groupBy(file => 
BucketingUtils.getBucketId(file))
  val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
1){code}
so obviously the code avoids dealing with this situation now..

could you think of a way to solve this or bypass it?

  was:
Closely related to  SPARK-24410, we're trying to optimize a very common use 
case we have of getting the most updated row by id from a fact table.

We're saving the table bucketed to skip the shuffle stage, but we're still 
"waste" time on the Sort operator evethough the data is already sorted.

here's a good example:
{code:java}
sparkSession.range(N).selectExpr(
  "id as key",
  "id % 2 as t1",
  "id % 3 as t2")
.repartition(col("key"))
.write
  .mode(SaveMode.Overwrite)
.bucketBy(3, "key")
.sortBy("key", "t1")
.saveAsTable("a1"){code}
{code:java}
sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain

== Physical Plan ==
SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
key#24L, t1, t1#25L, t2, t2#26L))])
+- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, t1#25L, 
key, key#24L, t1, t1#25L, t2, t2#26L))])
+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
Format: Parquet, Location: ...{code}
 

and here's a bad example, but more realistic:
{code:java}
sparkSession.sql("set spark.sql.shuffle.partitions=2")
sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain

== Physical Plan ==
SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
key#32L, t1, t1#33L, t2, t2#34L))])
+- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, t1#33L, 
key, key#32L, t1, t1#33L, t2, t2#34L))])
+- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
+- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
Format: Parquet, Location: ...

{code}
 

I've traced the problem to DataSourceScanExec#235:
{code:java}
val sortOrder = if (sortColumns.nonEmpty) {
  // In case of bucketing, its possible to have multiple files belonging to the
  // same bucket in a given relation. Each of these files are locally sorted
  // but those files combined together are not globally sorted. Given that,
  // the RDD partition will not be sorted even if the relation has sort columns 
set
  // Current solution is to check if all the buckets have a single file in it

  val files = selectedPartitions.flatMap(partition => partition.files)
  

[jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table

2018-07-05 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1659#comment-1659
 ] 

Ohad Raviv commented on SPARK-24528:


After digging a little bit in the code and Jira I understand that this is just 
a special case of SPARK-2926, just that the performance boost is greater.

over there they deal with moving the sort work from reducers to mappers and 
show reducers performance boost of ~10x and an overall performance boost of ~2x 
(I'm not sure why it has never got merged). In our case because the data is 
already sorted in the buckets we should expect this great 10x boost!

because most of the needed code is already in there I guess it will be wise to 
migrate it (altough it contains some more fancy things like Tiered Merger that 
I'm not sure we need).

> Missing optimization for Aggregations/Windowing on a bucketed table
> ---
>
> Key: SPARK-24528
> URL: https://issues.apache.org/jira/browse/SPARK-24528
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> Closely related to  SPARK-24410, we're trying to optimize a very common use 
> case we have of getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still 
> "waste" time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("key", "t1")
> .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
> key#24L, t1, t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, 
> t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
> Format: Parquet, Location: ...{code}
>  
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
> key#32L, t1, t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, 
> t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
> Format: Parquet, Location: ...
> {code}
>  
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
>   // In case of bucketing, its possible to have multiple files belonging to 
> the
>   // same bucket in a given relation. Each of these files are locally sorted
>   // but those files combined together are not globally sorted. Given that,
>   // the RDD partition will not be sorted even if the relation has sort 
> columns set
>   // Current solution is to check if all the buckets have a single file in it
>   val files = selectedPartitions.flatMap(partition => partition.files)
>   val bucketToFilesGrouping =
> files.map(_.getPath.getName).groupBy(file => 
> BucketingUtils.getBucketId(file))
>   val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
> 1){code}
> so obviously the code avoids dealing with this situation now..
> could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table

2018-06-26 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523344#comment-16523344
 ] 

Ohad Raviv commented on SPARK-24528:


Hi,

well it took me some time to get to it, but here are my design conclusions:
 # currently all the file scans are done with FileScanRDD. in its current 
implementation it gets a list of files in each partition and iterates the one 
after the other.
 # that means we probably need another FileScanRDD that can "open" all the 
files and iterate them in a merge sort manner (like maintaing a heap to know 
what's the next file to iterate from).
 # the FileScanRDD is created in FileSourceScanExec.createBucketedReadRDD if 
the data is bucketed.
 # FileSourceScanExec is created in FileSourceStrategy.
 # that means we could understand if the data read output is required to be 
sorted in FileSourceStrategy and percolate this knowledge to the creation of 
the new FileScan(Sorted?)RDD.
 # thing to note here is to enable this sorted reading only if it's required 
otherwise it will cause performance issue.

please tell me WDYT.

> Missing optimization for Aggregations/Windowing on a bucketed table
> ---
>
> Key: SPARK-24528
> URL: https://issues.apache.org/jira/browse/SPARK-24528
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> Closely related to  SPARK-24410, we're trying to optimize a very common use 
> case we have of getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still 
> "waste" time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("key", "t1")
> .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
> key#24L, t1, t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, 
> t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
> Format: Parquet, Location: ...{code}
>  
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
> key#32L, t1, t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, 
> t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
> Format: Parquet, Location: ...
> {code}
>  
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
>   // In case of bucketing, its possible to have multiple files belonging to 
> the
>   // same bucket in a given relation. Each of these files are locally sorted
>   // but those files combined together are not globally sorted. Given that,
>   // the RDD partition will not be sorted even if the relation has sort 
> columns set
>   // Current solution is to check if all the buckets have a single file in it
>   val files = selectedPartitions.flatMap(partition => partition.files)
>   val bucketToFilesGrouping =
> files.map(_.getPath.getName).groupBy(file => 
> BucketingUtils.getBucketId(file))
>   val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
> 1){code}
> so obviously the code avoids dealing with this situation now..
> could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table

2018-06-13 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511578#comment-16511578
 ] 

Ohad Raviv commented on SPARK-24528:


I think the 2nd point better suits my usecase. i'll try to look into it.

thanks.

> Missing optimization for Aggregations/Windowing on a bucketed table
> ---
>
> Key: SPARK-24528
> URL: https://issues.apache.org/jira/browse/SPARK-24528
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> Closely related to  SPARK-24410, we're trying to optimize a very common use 
> case we have of getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still 
> "waste" time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("key", "t1")
> .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
> key#24L, t1, t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, 
> t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
> Format: Parquet, Location: ...{code}
>  
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
> key#32L, t1, t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, 
> t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
> Format: Parquet, Location: ...
> {code}
>  
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
>   // In case of bucketing, its possible to have multiple files belonging to 
> the
>   // same bucket in a given relation. Each of these files are locally sorted
>   // but those files combined together are not globally sorted. Given that,
>   // the RDD partition will not be sorted even if the relation has sort 
> columns set
>   // Current solution is to check if all the buckets have a single file in it
>   val files = selectedPartitions.flatMap(partition => partition.files)
>   val bucketToFilesGrouping =
> files.map(_.getPath.getName).groupBy(file => 
> BucketingUtils.getBucketId(file))
>   val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
> 1){code}
> so obviously the code avoids dealing with this situation now..
> could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table

2018-06-13 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511483#comment-16511483
 ] 

Ohad Raviv commented on SPARK-24528:


I understand the tradeoff, the question is how could we leverage the local file 
sorting. I'm sure the extra sort adds some significant overhead.. we still have 
to read all the data to memory and spill, etc.

if we could push-down the sorting already to the DataSourceScanExec - instead 
of reading the files one after one we could merge stream the by the right order,

I'm sure it would be much more effective.

by that I'm trying to imitate HBase - the way it dedupes by key.

> Missing optimization for Aggregations/Windowing on a bucketed table
> ---
>
> Key: SPARK-24528
> URL: https://issues.apache.org/jira/browse/SPARK-24528
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> Closely related to  SPARK-24410, we're trying to optimize a very common use 
> case we have of getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still 
> "waste" time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("key", "t1")
> .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
> key#24L, t1, t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, 
> t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
> Format: Parquet, Location: ...{code}
>  
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
> key#32L, t1, t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, 
> t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
> Format: Parquet, Location: ...
> {code}
>  
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
>   // In case of bucketing, its possible to have multiple files belonging to 
> the
>   // same bucket in a given relation. Each of these files are locally sorted
>   // but those files combined together are not globally sorted. Given that,
>   // the RDD partition will not be sorted even if the relation has sort 
> columns set
>   // Current solution is to check if all the buckets have a single file in it
>   val files = selectedPartitions.flatMap(partition => partition.files)
>   val bucketToFilesGrouping =
> files.map(_.getPath.getName).groupBy(file => 
> BucketingUtils.getBucketId(file))
>   val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
> 1){code}
> so obviously the code avoids dealing with this situation now..
> could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table

2018-06-12 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509465#comment-16509465
 ] 

Ohad Raviv commented on SPARK-24528:


[~cloud_fan], [~viirya] - Hi I found somewhat similar issue to [SPARK-24410], 
would really appreciate if you could tell me what you think..

> Missing optimization for Aggregations/Windowing on a bucketed table
> ---
>
> Key: SPARK-24528
> URL: https://issues.apache.org/jira/browse/SPARK-24528
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> Closely related to  SPARK-24410, we're trying to optimize a very common use 
> case we have of getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still 
> "waste" time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("key", "t1")
> .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
> key#24L, t1, t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, 
> t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
> Format: Parquet, Location: ...{code}
>  
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
> key#32L, t1, t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, 
> t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
> Format: Parquet, Location: ...
> {code}
>  
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
>   // In case of bucketing, its possible to have multiple files belonging to 
> the
>   // same bucket in a given relation. Each of these files are locally sorted
>   // but those files combined together are not globally sorted. Given that,
>   // the RDD partition will not be sorted even if the relation has sort 
> columns set
>   // Current solution is to check if all the buckets have a single file in it
>   val files = selectedPartitions.flatMap(partition => partition.files)
>   val bucketToFilesGrouping =
> files.map(_.getPath.getName).groupBy(file => 
> BucketingUtils.getBucketId(file))
>   val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
> 1){code}
> so obviously the code avoids dealing with this situation now..
> could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table

2018-06-12 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-24528:
--

 Summary: Missing optimization for Aggregations/Windowing on a 
bucketed table
 Key: SPARK-24528
 URL: https://issues.apache.org/jira/browse/SPARK-24528
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0, 2.4.0
Reporter: Ohad Raviv


Closely related to  SPARK-24410, we're trying to optimize a very common use 
case we have of getting the most updated row by id from a fact table.

We're saving the table bucketed to skip the shuffle stage, but we're still 
"waste" time on the Sort operator evethough the data is already sorted.

here's a good example:
{code:java}
sparkSession.range(N).selectExpr(
  "id as key",
  "id % 2 as t1",
  "id % 3 as t2")
.repartition(col("key"))
.write
  .mode(SaveMode.Overwrite)
.bucketBy(3, "key")
.sortBy("key", "t1")
.saveAsTable("a1"){code}
{code:java}
sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain

== Physical Plan ==
SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
key#24L, t1, t1#25L, t2, t2#26L))])
+- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, t1#25L, 
key, key#24L, t1, t1#25L, t2, t2#26L))])
+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
Format: Parquet, Location: ...{code}
 

and here's a bad example, but more realistic:
{code:java}
sparkSession.sql("set spark.sql.shuffle.partitions=2")
sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain

== Physical Plan ==
SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
key#32L, t1, t1#33L, t2, t2#34L))])
+- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, t1#33L, 
key, key#32L, t1, t1#33L, t2, t2#34L))])
+- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
+- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
Format: Parquet, Location: ...

{code}
 

I've traced the problem to DataSourceScanExec#235:
{code:java}
val sortOrder = if (sortColumns.nonEmpty) {
  // In case of bucketing, its possible to have multiple files belonging to the
  // same bucket in a given relation. Each of these files are locally sorted
  // but those files combined together are not globally sorted. Given that,
  // the RDD partition will not be sorted even if the relation has sort columns 
set
  // Current solution is to check if all the buckets have a single file in it

  val files = selectedPartitions.flatMap(partition => partition.files)
  val bucketToFilesGrouping =
files.map(_.getPath.getName).groupBy(file => 
BucketingUtils.getBucketId(file))
  val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
1){code}
so obviously the code avoids dealing with this situation now..

could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-29 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493561#comment-16493561
 ] 

Ohad Raviv commented on SPARK-24410:


[~sowen], [~cloud_fan] - could you please check if my assessment is correct? 
thanks!

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-29 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-24410:
--

 Summary: Missing optimization for Union on bucketed tables
 Key: SPARK-24410
 URL: https://issues.apache.org/jira/browse/SPARK-24410
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Ohad Raviv


A common use-case we have is of a partially aggregated table and daily 
increments that we need to further aggregate. we do this my unioning the two 
tables and aggregating again.
we tried to optimize this process by bucketing the tables, but currently it 
seems that the union operator doesn't leverage the tables being bucketed (like 
the join operator).

for example, for two bucketed tables a1,a2:

{code}
sparkSession.range(N).selectExpr(
  "id as key",
  "id % 2 as t1",
  "id % 3 as t2")
.repartition(col("key"))
.write
  .mode(SaveMode.Overwrite)
.bucketBy(3, "key")
.sortBy("t1")
.saveAsTable("a1")

sparkSession.range(N).selectExpr(
  "id as key",
  "id % 2 as t1",
  "id % 3 as t2")
  .repartition(col("key"))
  .write.mode(SaveMode.Overwrite)
  .bucketBy(3, "key")
  .sortBy("t1")
  .saveAsTable("a2")

{code}
for the join query we get the "SortMergeJoin"
{code}
select * from a1 join a2 on (a1.key=a2.key)

== Physical Plan ==
*(3) SortMergeJoin [key#24L], [key#27L], Inner
:- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
:  +- *(1) Project [key#24L, t1#25L, t2#26L]
: +- *(1) Filter isnotnull(key#24L)
:+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
PushedFilters: [IsNotNull(key)], ReadSchema: 
struct
+- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
   +- *(2) Project [key#27L, t1#28L, t2#29L]
  +- *(2) Filter isnotnull(key#27L)
 +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
PushedFilters: [IsNotNull(key)], ReadSchema: 
struct
{code}

but for aggregation after union we get a shuffle:
{code}
select key,count(*) from (select * from a1 union all select * from a2)z group 
by key

== Physical Plan ==
*(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
count(1)#36L])
+- Exchange hashpartitioning(key#25L, 1)
   +- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
output=[key#25L, count#38L])
  +- Union
 :- *(1) Project [key#25L]
 :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, Format: 
Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
 +- *(2) Project [key#28L]
+- *(2) FileScan parquet default.a2[key#28L] Batched: true, Format: 
Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
{code}





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-04-16 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16439114#comment-16439114
 ] 

Ohad Raviv commented on SPARK-23985:


I see in the Optimizer that filters are getting pushed only if they appear in 
the partitionSpec as they are.
Looks like we need to add to Expression some kind of property that indicates 
weather we can push through it.
More trivial example than Concat could bu Struct.
[~cloud_fan] - I see you have dealt with this code about a year ago, could you 
please take a look?

Ohad.

> predicate push down doesn't work with simple compound partition spec
> 
>
> Key: SPARK-23985
> URL: https://issues.apache.org/jira/browse/SPARK-23985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Ohad Raviv
>Priority: Minor
>
> while predicate push down works with this query: 
> {code:sql}
> select *, row_number() over (partition by a order by b) from t1 where a>1
> {code}
> it dowsn't work with:
> {code:sql}
> select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
> where a>1
> {code}
>  
> I added a test to FilterPushdownSuite which I think recreates the problem:
> {code:scala}
>   test("Window: predicate push down -- ohad") {
> val winExpr = windowExpr(count('b),
>   windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
> val originalQuery = testRelation.select('a, 'b, 'c, 
> winExpr.as('window)).where('a > 1)
> val correctAnswer = testRelation
>   .where('a > 1).select('a, 'b, 'c)
>   .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
>   .select('a, 'b, 'c, 'window).analyze
> comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
>   }
> {code}
> will try to create a PR with a correction



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-04-15 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-23985:
--

 Summary: predicate push down doesn't work with simple compound 
partition spec
 Key: SPARK-23985
 URL: https://issues.apache.org/jira/browse/SPARK-23985
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Ohad Raviv


while predicate push down works with this query: 
{code:sql}
select *, row_number() over (partition by a order by b) from t1 where a>1
{code}
it dowsn't work with:
{code:sql}
select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
where a>1
{code}
 
I added a test to FilterPushdownSuite which I think recreates the problem:
{code:scala}
  test("Window: predicate push down -- ohad") {
val winExpr = windowExpr(count('b),
  windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))

val originalQuery = testRelation.select('a, 'b, 'c, 
winExpr.as('window)).where('a > 1)
val correctAnswer = testRelation
  .where('a > 1).select('a, 'b, 'c)
  .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
  .select('a, 'b, 'c, 'window).analyze

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
  }
{code}

will try to create a PR with a correction




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22910) Wrong results in Spark Job because failed to move to Trash

2017-12-27 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-22910:
--

 Summary: Wrong results in Spark Job because failed to move to Trash
 Key: SPARK-22910
 URL: https://issues.apache.org/jira/browse/SPARK-22910
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0, 2.1.0
Reporter: Ohad Raviv


Our Spark job has completed with status successful although the data save was 
corrupted.

What happened is that we have a monthly job. each run overwrites the output of 
the previous run. we happened to change the sql.shuffle.partitions number 
between the runs from 2000 to 1000, and what happened was that the new run had 
Warn failure of moving the old data to the user's .Trash because it was full. 
because it was only a warning the process continued and overwritten the new 
1000 files - while leaving most of the old remaining 1000 files in their place. 
this resulted that in the final output we had a folder with mix of old and new 
data and that caused corruption in the process.

the post mortem is relatively easy to understand.
{code}
hadoop fs -ls /the/folder
-rwxr-xr-x 3 spark_user spark_user 209012005 2017-12-10 14:20 
/the/folder/part-0.gz 
.
.
-rwxr-xr-x 3 spark_user spark_user 34899 2017-11-17 06:39 
/the/folder/part-01990.gz 
{code}
and in the driver's log:
{code}
17/12/10 15:10:00 WARN Hive: Directory hdfs:///the/folder cannot be removed: 
java.io.IOException: Failed to move to trash: hdfs:///the/folder/part-0.gz
java.io.IOException: Failed to move to trash: hdfs:///the/folder/part-0.gz
at 
org.apache.hadoop.fs.TrashPolicyDefault.moveToTrash(TrashPolicyDefault.java:160)
at org.apache.hadoop.fs.Trash.moveToTrash(Trash.java:109)
at org.apache.hadoop.fs.Trash.moveToAppropriateTrash(Trash.java:90)
at 
org.apache.hadoop.hive.shims.Hadoop23Shims.moveToAppropriateTrash(Hadoop23Shims.java:272)
at 
org.apache.hadoop.hive.common.FileUtils.moveToTrash(FileUtils.java:603)
at 
org.apache.hadoop.hive.common.FileUtils.trashFilesUnderDir(FileUtils.java:586)
at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:2851)
at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1640)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.sql.hive.client.Shim_v0_14.loadTable(HiveShim.scala:716)
at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply$mcV$sp(HiveClientImpl.scala:672)
at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:672)
at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:672)
at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:283)
at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:230)
at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:229)
at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:272)
at 
org.apache.spark.sql.hive.client.HiveClientImpl.loadTable(HiveClientImpl.scala:671)
at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply$mcV$sp(HiveExternalCatalog.scala:741)
at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:739)
at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:739)
at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:95)
at 
org.apache.spark.sql.hive.HiveExternalCatalog.loadTable(HiveExternalCatalog.scala:739)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:323)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:170)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:347)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at 

[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)

2017-11-07 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241887#comment-16241887
 ] 

Ohad Raviv commented on SPARK-21657:


Hi,
I created a pull request: https://github.com/apache/spark/pull/19683
would appreciate if you could take a look.

> Spark has exponential time complexity to explode(array of structs)
> --
>
> Key: SPARK-21657
> URL: https://issues.apache.org/jira/browse/SPARK-21657
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0
>Reporter: Ruslan Dautkhanov
>  Labels: cache, caching, collections, nested_types, performance, 
> pyspark, sparksql, sql
> Attachments: ExponentialTimeGrowth.PNG, 
> nested-data-generator-and-test.py
>
>
> It can take up to half a day to explode a modest-sized nested collection 
> (0.5m).
> On a recent Xeon processors.
> See attached pyspark script that reproduces this problem.
> {code}
> cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + 
> table_name).cache()
> print sqlc.count()
> {code}
> This script generate a number of tables, with the same total number of 
> records across all nested collection (see `scaling` variable in loops). 
> `scaling` variable scales up how many nested elements in each record, but by 
> the same factor scales down number of records in the table. So total number 
> of records stays the same.
> Time grows exponentially (notice log-10 vertical axis scale):
> !ExponentialTimeGrowth.PNG!
> At scaling of 50,000 (see attached pyspark script), it took 7 hours to 
> explode the nested collections (\!) of 8k records.
> After 1000 elements in nested collection, time grows exponentially.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)

2017-10-30 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224420#comment-16224420
 ] 

Ohad Raviv commented on SPARK-21657:


After some debugging, I think I understand the tricky part here.
because there are outer fields in the query we set join=true for the Generate 
class, and because the Generator uses the array as child it can't be removed 
from the Generate output.
I that because omitting the original column is so common it would make sense to 
add another attribute to the Generate class, like {{omitChild: Boolean}} and 
let the Optimizer turn it on with appropriate Rule.
what do you think?

> Spark has exponential time complexity to explode(array of structs)
> --
>
> Key: SPARK-21657
> URL: https://issues.apache.org/jira/browse/SPARK-21657
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0
>Reporter: Ruslan Dautkhanov
>  Labels: cache, caching, collections, nested_types, performance, 
> pyspark, sparksql, sql
> Attachments: ExponentialTimeGrowth.PNG, 
> nested-data-generator-and-test.py
>
>
> It can take up to half a day to explode a modest-sized nested collection 
> (0.5m).
> On a recent Xeon processors.
> See attached pyspark script that reproduces this problem.
> {code}
> cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + 
> table_name).cache()
> print sqlc.count()
> {code}
> This script generate a number of tables, with the same total number of 
> records across all nested collection (see `scaling` variable in loops). 
> `scaling` variable scales up how many nested elements in each record, but by 
> the same factor scales down number of records in the table. So total number 
> of records stays the same.
> Time grows exponentially (notice log-10 vertical axis scale):
> !ExponentialTimeGrowth.PNG!
> At scaling of 50,000 (see attached pyspark script), it took 7 hours to 
> explode the nested collections (\!) of 8k records.
> After 1000 elements in nested collection, time grows exponentially.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)

2017-10-29 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224374#comment-16224374
 ] 

Ohad Raviv commented on SPARK-21657:


ok i found the relevant rule:
{code:java|title=Optimizer.scala.java|borderStyle=solid}
// Turn off `join` for Generate if no column from it's child is used
case p @ Project(_, g: Generate)
if g.join && !g.outer && p.references.subsetOf(g.generatedSet) =>
  p.copy(child = g.copy(join = false))
{code}
I'm not sure yet why it doesn't work.

> Spark has exponential time complexity to explode(array of structs)
> --
>
> Key: SPARK-21657
> URL: https://issues.apache.org/jira/browse/SPARK-21657
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0
>Reporter: Ruslan Dautkhanov
>  Labels: cache, caching, collections, nested_types, performance, 
> pyspark, sparksql, sql
> Attachments: ExponentialTimeGrowth.PNG, 
> nested-data-generator-and-test.py
>
>
> It can take up to half a day to explode a modest-sized nested collection 
> (0.5m).
> On a recent Xeon processors.
> See attached pyspark script that reproduces this problem.
> {code}
> cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + 
> table_name).cache()
> print sqlc.count()
> {code}
> This script generate a number of tables, with the same total number of 
> records across all nested collection (see `scaling` variable in loops). 
> `scaling` variable scales up how many nested elements in each record, but by 
> the same factor scales down number of records in the table. So total number 
> of records stays the same.
> Time grows exponentially (notice log-10 vertical axis scale):
> !ExponentialTimeGrowth.PNG!
> At scaling of 50,000 (see attached pyspark script), it took 7 hours to 
> explode the nested collections (\!) of 8k records.
> After 1000 elements in nested collection, time grows exponentially.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)

2017-10-29 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16224365#comment-16224365
 ] 

Ohad Raviv commented on SPARK-21657:


After futher investigating I believe that my assesment is correct, the former 
case creates a generator with join=true while the later with join=false, as you 
can see in plans above (I also debugged). this causes the very long array of 
size 100k to be duplicated 100k times and afterwards get pruned because its 
column is not in the final projection. 
I'm not sure what's the best way to address this issue - ammend the generate 
operator according to the projection.
in the meanwhile, in our case, I worked around that by manually adding the 
outer fields into each of structs of the array and then exploded only the 
array. it's an ugly solution but reduces our query time from 6 hours to about 2 
mins.

> Spark has exponential time complexity to explode(array of structs)
> --
>
> Key: SPARK-21657
> URL: https://issues.apache.org/jira/browse/SPARK-21657
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0
>Reporter: Ruslan Dautkhanov
>  Labels: cache, caching, collections, nested_types, performance, 
> pyspark, sparksql, sql
> Attachments: ExponentialTimeGrowth.PNG, 
> nested-data-generator-and-test.py
>
>
> It can take up to half a day to explode a modest-sized nested collection 
> (0.5m).
> On a recent Xeon processors.
> See attached pyspark script that reproduces this problem.
> {code}
> cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + 
> table_name).cache()
> print sqlc.count()
> {code}
> This script generate a number of tables, with the same total number of 
> records across all nested collection (see `scaling` variable in loops). 
> `scaling` variable scales up how many nested elements in each record, but by 
> the same factor scales down number of records in the table. So total number 
> of records stays the same.
> Time grows exponentially (notice log-10 vertical axis scale):
> !ExponentialTimeGrowth.PNG!
> At scaling of 50,000 (see attached pyspark script), it took 7 hours to 
> explode the nested collections (\!) of 8k records.
> After 1000 elements in nested collection, time grows exponentially.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)

2017-10-29 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223946#comment-16223946
 ] 

Ohad Raviv commented on SPARK-21657:


Sure,
the plan for
{code:java}
val df_exploded = df.select(expr("c1"), 
explode($"c_arr").as("c2")).selectExpr("c1" ,"c2.*")
{code}
is 
{noformat}
== Parsed Logical Plan ==
'Project [unresolvedalias('c1, None), ArrayBuffer(c2).*]
+- Project [c1#6, c2#25]
   +- Generate explode(c_arr#7), true, false, [c2#25]
  +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7]
 +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#3, 
mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class 
scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else 
named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#4]
+- ExternalRDD [obj#2]

== Analyzed Logical Plan ==
c1: string, _1: string, _2: string, _3: string, _4: string
Project [c1#6, c2#25._1 AS _1#40, c2#25._2 AS _2#41, c2#25._3 AS _3#42, 
c2#25._4 AS _4#43]
+- Project [c1#6, c2#25]
   +- Generate explode(c_arr#7), true, false, [c2#25]
  +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7]
 +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#3, 
mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class 
scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else 
named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#4]
+- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [c1#6, c2#25._1 AS _1#40, c2#25._2 AS _2#41, c2#25._3 AS _3#42, 
c2#25._4 AS _4#43]
+- Generate explode(c_arr#7), true, false, [c2#25]
   +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7]
  +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, 
mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class 
scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else 
named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), 

[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)

2017-10-29 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223902#comment-16223902
 ] 

Ohad Raviv commented on SPARK-21657:


I Switched to toArray instead of toList in the above code and I did get an 
improvement by factor of 2. but we still remain with the main bottleneck.
now the diff in the above example between:
{code:java}
val df_exploded = df.select(expr("c1"), explode($"c_arr").as("c2"))
{code}
and:
{code:java}
val df_exploded = df.select(explode($"c_arr").as("c2"))
{code}
is 128 secs vs. 3 secs.

Again I profiled the former and saw that all the time got consumed in:
org.apache.spark.unsafe.Platform.copyMemory()   97.548096   23,991 ms 
(97.5%)   

the obvious diff between the execution plans is that the former has two 
WholeStageCodeGen plans and the later just one.
I didn't exactly understood the generated code but I would guess that what 
happens is that in the problematic case the generated explode code is actually 
multiplying the long array to all the exploded rows and only filters it in the 
end.
Please see if you can verify it or think on a workaround for it.



> Spark has exponential time complexity to explode(array of structs)
> --
>
> Key: SPARK-21657
> URL: https://issues.apache.org/jira/browse/SPARK-21657
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0
>Reporter: Ruslan Dautkhanov
>  Labels: cache, caching, collections, nested_types, performance, 
> pyspark, sparksql, sql
> Attachments: ExponentialTimeGrowth.PNG, 
> nested-data-generator-and-test.py
>
>
> It can take up to half a day to explode a modest-sized nested collection 
> (0.5m).
> On a recent Xeon processors.
> See attached pyspark script that reproduces this problem.
> {code}
> cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + 
> table_name).cache()
> print sqlc.count()
> {code}
> This script generate a number of tables, with the same total number of 
> records across all nested collection (see `scaling` variable in loops). 
> `scaling` variable scales up how many nested elements in each record, but by 
> the same factor scales down number of records in the table. So total number 
> of records stays the same.
> Time grows exponentially (notice log-10 vertical axis scale):
> !ExponentialTimeGrowth.PNG!
> At scaling of 50,000 (see attached pyspark script), it took 7 hours to 
> explode the nested collections (\!) of 8k records.
> After 1000 elements in nested collection, time grows exponentially.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)

2017-10-27 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222312#comment-16222312
 ] 

Ohad Raviv edited comment on SPARK-21657 at 10/27/17 12:53 PM:
---

Hi,
Just ran a profiler for this code:
{code:java}
val BASE = 1
val N = 10
val df = sc.parallelize(List(("1234567890", (BASE to (BASE+N)).map(x => 
(x.toString, (x+1).toString, (x+2).toString, (x+3).toString)).toList 
))).toDF("c1", "c_arr")
val df_exploded = df.select(expr("c1"), explode($"c_arr").as("c2"))
df_exploded.write.mode("overwrite").format("json").save("/tmp/blah_explode")
{code}

and it looks like [~srowen] is right, most of the time is spent in 
scala.collection.immutable.List.apply()   (72.1%). inside:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext()
  (100%)

I logged the generated code and found the problematic code:
{code:java}
 if (serializefromobject_funcResult1 != null) {
 serializefromobject_value5 = (scala.collection.immutable.List) 
serializefromobject_funcResult1;
   } else {
 serializefromobject_isNull5 = true;
 }
.
.
.
 while (serializefromobject_loopIndex < serializefromobject_dataLength) {
   MapObjects_loopValue0 = (scala.Tuple4) 
(serializefromobject_value5.apply(serializefromobject_loopIndex));
{code}

so that causes the quadratic time complexity.
However, I'm not sure where is the code that generates this list instead of 
array for the exploded array.


was (Author: uzadude):
Hi,
Just ran a profiler for this code:
{code:scala}
val BASE = 1
val N = 10
val df = sc.parallelize(List(("1234567890", (BASE to (BASE+N)).map(x => 
(x.toString, (x+1).toString, (x+2).toString, (x+3).toString)).toList 
))).toDF("c1", "c_arr")
val df_exploded = df.select(expr("c1"), explode($"c_arr").as("c2"))
df_exploded.write.mode("overwrite").format("json").save("/tmp/blah_explode")
{code}

and it looks like [~srowen] is right, most of the time is spent in 
scala.collection.immutable.List.apply()   (72.1%). inside:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext()
  (100%)

I logged the generated code and found the problematic code:
{code:scala}
 if (serializefromobject_funcResult1 != null) {
 serializefromobject_value5 = (scala.collection.immutable.List) 
serializefromobject_funcResult1;
   } else {
 serializefromobject_isNull5 = true;
 }
.
.
.
 while (serializefromobject_loopIndex < serializefromobject_dataLength) {
   MapObjects_loopValue0 = (scala.Tuple4) 
(serializefromobject_value5.apply(serializefromobject_loopIndex));
{code}

so that causes the quadratic time complexity.
However, I'm not sure where is the code that generates this list instead of 
array for the exploded array.

> Spark has exponential time complexity to explode(array of structs)
> --
>
> Key: SPARK-21657
> URL: https://issues.apache.org/jira/browse/SPARK-21657
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0
>Reporter: Ruslan Dautkhanov
>  Labels: cache, caching, collections, nested_types, performance, 
> pyspark, sparksql, sql
> Attachments: ExponentialTimeGrowth.PNG, 
> nested-data-generator-and-test.py
>
>
> It can take up to half a day to explode a modest-sized nested collection 
> (0.5m).
> On a recent Xeon processors.
> See attached pyspark script that reproduces this problem.
> {code}
> cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + 
> table_name).cache()
> print sqlc.count()
> {code}
> This script generate a number of tables, with the same total number of 
> records across all nested collection (see `scaling` variable in loops). 
> `scaling` variable scales up how many nested elements in each record, but by 
> the same factor scales down number of records in the table. So total number 
> of records stays the same.
> Time grows exponentially (notice log-10 vertical axis scale):
> !ExponentialTimeGrowth.PNG!
> At scaling of 50,000 (see attached pyspark script), it took 7 hours to 
> explode the nested collections (\!) of 8k records.
> After 1000 elements in nested collection, time grows exponentially.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)

2017-10-27 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222312#comment-16222312
 ] 

Ohad Raviv commented on SPARK-21657:


Hi,
Just ran a profiler for this code:
{code:scala}
val BASE = 1
val N = 10
val df = sc.parallelize(List(("1234567890", (BASE to (BASE+N)).map(x => 
(x.toString, (x+1).toString, (x+2).toString, (x+3).toString)).toList 
))).toDF("c1", "c_arr")
val df_exploded = df.select(expr("c1"), explode($"c_arr").as("c2"))
df_exploded.write.mode("overwrite").format("json").save("/tmp/blah_explode")
{code}

and it looks like [~srowen] is right, most of the time is spent in 
scala.collection.immutable.List.apply()   (72.1%). inside:
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext()
  (100%)

I logged the generated code and found the problematic code:
{code:scala}
 if (serializefromobject_funcResult1 != null) {
 serializefromobject_value5 = (scala.collection.immutable.List) 
serializefromobject_funcResult1;
   } else {
 serializefromobject_isNull5 = true;
 }
.
.
.
 while (serializefromobject_loopIndex < serializefromobject_dataLength) {
   MapObjects_loopValue0 = (scala.Tuple4) 
(serializefromobject_value5.apply(serializefromobject_loopIndex));
{code}

so that causes the quadratic time complexity.
However, I'm not sure where is the code that generates this list instead of 
array for the exploded array.

> Spark has exponential time complexity to explode(array of structs)
> --
>
> Key: SPARK-21657
> URL: https://issues.apache.org/jira/browse/SPARK-21657
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0
>Reporter: Ruslan Dautkhanov
>  Labels: cache, caching, collections, nested_types, performance, 
> pyspark, sparksql, sql
> Attachments: ExponentialTimeGrowth.PNG, 
> nested-data-generator-and-test.py
>
>
> It can take up to half a day to explode a modest-sized nested collection 
> (0.5m).
> On a recent Xeon processors.
> See attached pyspark script that reproduces this problem.
> {code}
> cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + 
> table_name).cache()
> print sqlc.count()
> {code}
> This script generate a number of tables, with the same total number of 
> records across all nested collection (see `scaling` variable in loops). 
> `scaling` variable scales up how many nested elements in each record, but by 
> the same factor scales down number of records in the table. So total number 
> of records stays the same.
> Time grows exponentially (notice log-10 vertical axis scale):
> !ExponentialTimeGrowth.PNG!
> At scaling of 50,000 (see attached pyspark script), it took 7 hours to 
> explode the nested collections (\!) of 8k records.
> After 1000 elements in nested collection, time grows exponentially.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)

2017-10-26 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220450#comment-16220450
 ] 

Ohad Raviv commented on SPARK-21657:


Hi,
Wanted to add that we're facing exactly the same issue. 6 hours work for one 
row that contains 250k array (of struct of 4 strings).
Just wanted to state that if we explode only the array, e.g, in your example:
cached_df = sqlc.sql('select explode(amft) from ' + table_name)

it finishes in about 3 mins. 
it happens in Spark 2.1 and also 2.2, eventhough SPARK-16998 was resolved.

> Spark has exponential time complexity to explode(array of structs)
> --
>
> Key: SPARK-21657
> URL: https://issues.apache.org/jira/browse/SPARK-21657
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0
>Reporter: Ruslan Dautkhanov
>  Labels: cache, caching, collections, nested_types, performance, 
> pyspark, sparksql, sql
> Attachments: ExponentialTimeGrowth.PNG, 
> nested-data-generator-and-test.py
>
>
> It can take up to half a day to explode a modest-sized nested collection 
> (0.5m).
> On a recent Xeon processors.
> See attached pyspark script that reproduces this problem.
> {code}
> cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + 
> table_name).cache()
> print sqlc.count()
> {code}
> This script generate a number of tables, with the same total number of 
> records across all nested collection (see `scaling` variable in loops). 
> `scaling` variable scales up how many nested elements in each record, but by 
> the same factor scales down number of records in the table. So total number 
> of records stays the same.
> Time grows exponentially (notice log-10 vertical axis scale):
> !ExponentialTimeGrowth.PNG!
> At scaling of 50,000 (see attached pyspark script), it took 7 hours to 
> explode the nested collections (\!) of 8k records.
> After 1000 elements in nested collection, time grows exponentially.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19368) Very bad performance in BlockMatrix.toIndexedRowMatrix()

2017-01-27 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15842581#comment-15842581
 ] 

Ohad Raviv commented on SPARK-19368:


well, not with the same elegant code. the main problem is that Sparse Vector is 
very inefficient to manipulate. from Breeze's site:
{quote}
You should not be adding lots of values to a SparseVector if you want good 
speed. SparseVectors have to maintain the invariant that the index array is 
always sorted, which makes insertions expensive.
{quote}
and then they suggest to use VectorBuilder for instead, but that is only good 
for SparseVector. with DenseVector the current implementation is better.
so if you want I can just create two different functions for Sparse/Desne cases.

> Very bad performance in BlockMatrix.toIndexedRowMatrix()
> 
>
> Key: SPARK-19368
> URL: https://issues.apache.org/jira/browse/SPARK-19368
> Project: Spark
>  Issue Type: Improvement
>  Components: MLlib
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Ohad Raviv
>Priority: Minor
> Attachments: profiler snapshot.png
>
>
> In SPARK-12869, this function was optimized for the case of dense matrices 
> using Breeze. However, I have a case with very very sparse matrices which 
> suffers a great deal from this optimization. A process we have that took 
> about 20 mins now takes about 6.5 hours.
> Here is a sample code to see the difference:
> {quote}
> val n = 4
> val density = 0.0002
> val rnd = new Random(123)
> val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield 
> (rnd.nextInt\(n\), rnd.nextInt\(n\), rnd.nextDouble()))
>   .groupBy(t => (t._1,t._2)).map\(t => t._2.last).map\{ case 
> (i,j,d) => (i,(j,d)) }.toSeq
> val entries: RDD\[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10)
> val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, 
> Vectors.sparse(n, e._2.toSeq)))
> val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n)
> val t1 = System.nanoTime()
> 
> println(mat.toBlockMatrix(1,1).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
> val t2 = System.nanoTime()
> println("took: " + (t2 - t1) / 1000 / 1000 + " ms")
> println("")
> 
> println(mat.toBlockMatrix(1,1).toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
> val t3 = System.nanoTime()
> println("took: " + (t3 - t2) / 1000 / 1000 + " ms")
> println("")
> {quote}
> I get:
> {quote}
> took: 9404 ms
> 
> took: 57350 ms
> 
> {quote}
> Looking at it a little with a profiler, I see that the problem is with the 
> SliceVector.update() and SparseVector.apply.
> I currently work-around this by doing:
> {quote}
> blockMatrix.toCoordinateMatrix().toIndexedRowMatrix()
> {quote}
> like it was in version 1.6.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19368) Very bad performance in BlockMatrix.toIndexedRowMatrix()

2017-01-26 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839673#comment-15839673
 ] 

Ohad Raviv commented on SPARK-19368:


caused by..

> Very bad performance in BlockMatrix.toIndexedRowMatrix()
> 
>
> Key: SPARK-19368
> URL: https://issues.apache.org/jira/browse/SPARK-19368
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Ohad Raviv
> Attachments: profiler snapshot.png
>
>
> In SPARK-12869, this function was optimized for the case of dense matrices 
> using Breeze. However, I have a case with very very sparse matrices which 
> suffers a great deal from this optimization. A process we have that took 
> about 20 mins now takes about 6.5 hours.
> Here is a sample code to see the difference:
> {quote}
> val n = 4
> val density = 0.0002
> val rnd = new Random(123)
> val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield 
> (rnd.nextInt\(n\), rnd.nextInt\(n\), rnd.nextDouble()))
>   .groupBy(t => (t._1,t._2)).map\(t => t._2.last).map\{ case 
> (i,j,d) => (i,(j,d)) }.toSeq
> val entries: RDD\[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10)
> val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, 
> Vectors.sparse(n, e._2.toSeq)))
> val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n)
> val t1 = System.nanoTime()
> 
> println(mat.toBlockMatrix(1,1).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
> val t2 = System.nanoTime()
> println("took: " + (t2 - t1) / 1000 / 1000 + " ms")
> println("")
> 
> println(mat.toBlockMatrix(1,1).toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
> val t3 = System.nanoTime()
> println("took: " + (t3 - t2) / 1000 / 1000 + " ms")
> println("")
> {quote}
> I get:
> {quote}
> took: 9404 ms
> 
> took: 57350 ms
> 
> {quote}
> Looking at it a little with a profiler, I see that the problem is with the 
> SliceVector.update() and SparseVector.apply.
> I currently work-around this by doing:
> {quote}
> blockMatrix.toCoordinateMatrix().toIndexedRowMatrix()
> {quote}
> like it was in version 1.6.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19368) Very bad performance in BlockMatrix.toIndexedRowMatrix()

2017-01-26 Thread Ohad Raviv (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv updated SPARK-19368:
---
Attachment: profiler snapshot.png

> Very bad performance in BlockMatrix.toIndexedRowMatrix()
> 
>
> Key: SPARK-19368
> URL: https://issues.apache.org/jira/browse/SPARK-19368
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Ohad Raviv
> Attachments: profiler snapshot.png
>
>
> In SPARK-12869, this function was optimized for the case of dense matrices 
> using Breeze. However, I have a case with very very sparse matrices which 
> suffers a great deal from this optimization. A process we have that took 
> about 20 mins now takes about 6.5 hours.
> Here is a sample code to see the difference:
> {quote}
> val n = 4
> val density = 0.0002
> val rnd = new Random(123)
> val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield 
> (rnd.nextInt\(n\), rnd.nextInt\(n\), rnd.nextDouble()))
>   .groupBy(t => (t._1,t._2)).map\(t => t._2.last).map\{ case 
> (i,j,d) => (i,(j,d)) }.toSeq
> val entries: RDD\[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10)
> val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, 
> Vectors.sparse(n, e._2.toSeq)))
> val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n)
> val t1 = System.nanoTime()
> 
> println(mat.toBlockMatrix(1,1).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
> val t2 = System.nanoTime()
> println("took: " + (t2 - t1) / 1000 / 1000 + " ms")
> println("")
> 
> println(mat.toBlockMatrix(1,1).toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
> val t3 = System.nanoTime()
> println("took: " + (t3 - t2) / 1000 / 1000 + " ms")
> println("")
> {quote}
> I get:
> {quote}
> took: 9404 ms
> 
> took: 57350 ms
> 
> {quote}
> Looking at it a little with a profiler, I see that the problem is with the 
> SliceVector.update() and SparseVector.apply.
> I currently work-around this by doing:
> {quote}
> blockMatrix.toCoordinateMatrix().toIndexedRowMatrix()
> {quote}
> like it was in version 1.6.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19368) Very bad performance in BlockMatrix.toIndexedRowMatrix()

2017-01-26 Thread Ohad Raviv (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv updated SPARK-19368:
---
Description: 
In SPARK-12869, this function was optimized for the case of dense matrices 
using Breeze. However, I have a case with very very sparse matrices which 
suffers a great deal from this optimization. A process we have that took about 
20 mins now takes about 6.5 hours.
Here is a sample code to see the difference:
{quote}
val n = 4
val density = 0.0002
val rnd = new Random(123)
val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield 
(rnd.nextInt\(n\), rnd.nextInt\(n\), rnd.nextDouble()))
  .groupBy(t => (t._1,t._2)).map\(t => t._2.last).map\{ case 
(i,j,d) => (i,(j,d)) }.toSeq
val entries: RDD\[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10)
val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, 
Vectors.sparse(n, e._2.toSeq)))
val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n)

val t1 = System.nanoTime()

println(mat.toBlockMatrix(1,1).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
val t2 = System.nanoTime()
println("took: " + (t2 - t1) / 1000 / 1000 + " ms")
println("")

println(mat.toBlockMatrix(1,1).toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
val t3 = System.nanoTime()
println("took: " + (t3 - t2) / 1000 / 1000 + " ms")
println("")
{quote}

I get:
{quote}
took: 9404 ms

took: 57350 ms

{quote}

Looking at it a little with a profiler, I see that the problem is with the 
SliceVector.update() and SparseVector.apply.

I currently work-around this by doing:
{quote}
blockMatrix.toCoordinateMatrix().toIndexedRowMatrix()
{quote}
like it was in version 1.6.




  was:
In SPARK-12869, this function was optimized for the case of dense matrices 
using Breeze. However, I have a case with very very sparse matrices which 
suffers a great deal from this optimization. A process we have that took about 
20 mins now takes about 6.5 hours.
Here is a sample code to see the difference:
{quote}
val n = 4
val density = 0.0002
val rnd = new Random(123)
val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield 
(rnd.nextInt(n), rnd.nextInt(n), rnd.nextDouble()))
  .groupBy(t => (t._1,t._2)).map(t => t._2.last).map { case 
(i,j,d) => (i,(j,d)) }.toSeq
val entries: RDD[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10)
val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, 
Vectors.sparse(n, e._2.toSeq)))
val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n)

val t1 = System.nanoTime()

println(mat.toBlockMatrix(1,1).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
val t2 = System.nanoTime()
println("took: " + (t2 - t1) / 1000 / 1000 + " ms")
println("")

println(mat.toBlockMatrix(1,1).toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
val t3 = System.nanoTime()
println("took: " + (t3 - t2) / 1000 / 1000 + " ms")
println("")
{quote}

I get:
{quote}
took: 9404 ms

took: 57350 ms

{quote}

Looking at it a little with a profiler, I see that the problem is with the 
SliceVector.update() and SparseVector.apply.

I currently work-around this by doing:
BlockMatrix.toCoordinateMatrix().toIndexedRowMatrix()
like it was in the previous version.





> Very bad performance in BlockMatrix.toIndexedRowMatrix()
> 
>
> Key: SPARK-19368
> URL: https://issues.apache.org/jira/browse/SPARK-19368
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Ohad Raviv
>
> In SPARK-12869, this function was optimized for the case of dense matrices 
> using Breeze. However, I have a case with very very sparse matrices which 
> suffers a great deal from this optimization. A process we have that took 
> about 20 mins now takes about 6.5 hours.
> Here is a sample code to see the difference:
> {quote}
> val n = 4
> val density = 0.0002
> val rnd = new Random(123)
> val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield 
> (rnd.nextInt\(n\), rnd.nextInt\(n\), rnd.nextDouble()))
>   .groupBy(t => (t._1,t._2)).map\(t 

[jira] [Created] (SPARK-19368) Very bad performance in BlockMatrix.toIndexedRowMatrix()

2017-01-26 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-19368:
--

 Summary: Very bad performance in BlockMatrix.toIndexedRowMatrix()
 Key: SPARK-19368
 URL: https://issues.apache.org/jira/browse/SPARK-19368
 Project: Spark
  Issue Type: Bug
  Components: MLlib
Affects Versions: 2.1.0, 2.0.0
Reporter: Ohad Raviv


In SPARK-12869, this function was optimized for the case of dense matrices 
using Breeze. However, I have a case with very very sparse matrices which 
suffers a great deal from this optimization. A process we have that took about 
20 mins now takes about 6.5 hours.
Here is a sample code to see the difference:
{quote}
val n = 4
val density = 0.0002
val rnd = new Random(123)
val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield 
(rnd.nextInt(n), rnd.nextInt(n), rnd.nextDouble()))
  .groupBy(t => (t._1,t._2)).map(t => t._2.last).map { case 
(i,j,d) => (i,(j,d)) }.toSeq
val entries: RDD[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10)
val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, 
Vectors.sparse(n, e._2.toSeq)))
val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n)

val t1 = System.nanoTime()

println(mat.toBlockMatrix(1,1).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
val t2 = System.nanoTime()
println("took: " + (t2 - t1) / 1000 / 1000 + " ms")
println("")

println(mat.toBlockMatrix(1,1).toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
val t3 = System.nanoTime()
println("took: " + (t3 - t2) / 1000 / 1000 + " ms")
println("")
{quote}

I get:
{quote}
took: 9404 ms

took: 57350 ms

{quote}

Looking at it a little with a profiler, I see that the problem is with the 
SliceVector.update() and SparseVector.apply.

I currently work-around this by doing:
BlockMatrix.toCoordinateMatrix().toIndexedRowMatrix()
like it was in the previous version.






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19230) View creation in Derby gets SQLDataException because definition gets very big

2017-01-16 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823644#comment-15823644
 ] 

Ohad Raviv commented on SPARK-19230:


looks like it happens since SPARK-13827 - it just caused the attribute names 
get much longer.
As a not-so-bullet-proof work around I am now doing is to change the attribute 
normalization from "gen_attr_" -> "ga_" in:
{quote}
  private def normalizedName(n: NamedExpression): String = synchronized \{
"gen_attr_" + exprIdMap.getOrElseUpdate(n.exprId.id, 
nextGenAttrId.getAndIncrement())
  }
{quote}
 ->
{quote}
  private def normalizedName(n: NamedExpression): String = synchronized \{
"ga_" + exprIdMap.getOrElseUpdate(n.exprId.id, 
nextGenAttrId.getAndIncrement())
  }
{quote}

I guess a better solution will be to not always invoke the Canonicalizer's 
NormalizedAttribute rule, just when there is an ambiguity.


> View creation in Derby gets SQLDataException because definition gets very big
> -
>
> Key: SPARK-19230
> URL: https://issues.apache.org/jira/browse/SPARK-19230
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Ohad Raviv
>
> somewhat related to SPARK-6024.
> In our tests mockups we have a process that creates a pretty big table 
> definition:
> {quote}
> create table t1 (
> field_name_1 string,
> field_name_2 string,
> field_name_3 string,
> .
> .
> .
> field_name_1000 string
> )
> {quote}
> which succeeds. But then we add some calculated fields on top of it with a 
> view:
> {quote}
> create view v1 as 
> select *, 
>   some_udf(field_name_1) as field_calc1,
>   some_udf(field_name_2) as field_calc2,
>   .
>   .
>   some_udf(field_name_10) as field_calc10
> from t1
> {quote}
> And we get this exception:
> {quote}
> java.sql.SQLDataException: A truncation error was encountered trying to 
> shrink LONG VARCHAR 'SELECT `gen_attr_0` AS `field_name_1`, `gen_attr_1` AS 
> `field_name_2&' to length 32700.
>   at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown 
> Source)
>   at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown 
> Source)
>   at 
> org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown 
> Source)
>   at 
> org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown 
> Source)
>   at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown 
> Source)
>   at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown 
> Source)
>   at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown 
> Source)
>   at 
> org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeStatement(Unknown 
> Source)
>   at 
> org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeLargeUpdate(Unknown 
> Source)
>   at 
> org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeUpdate(Unknown 
> Source)
>   at 
> com.jolbox.bonecp.PreparedStatementHandle.executeUpdate(PreparedStatementHandle.java:205)
>   at 
> org.datanucleus.store.rdbms.ParamLoggingPreparedStatement.executeUpdate(ParamLoggingPreparedStatement.java:399)
>   at 
> org.datanucleus.store.rdbms.SQLController.executeStatementUpdate(SQLController.java:439)
>   at 
> org.datanucleus.store.rdbms.request.InsertRequest.execute(InsertRequest.java:410)
>   at 
> org.datanucleus.store.rdbms.RDBMSPersistenceHandler.insertTable(RDBMSPersistenceHandler.java:167)
>   at 
> org.datanucleus.store.rdbms.RDBMSPersistenceHandler.insertObject(RDBMSPersistenceHandler.java:143)
>   at 
> org.datanucleus.state.JDOStateManager.internalMakePersistent(JDOStateManager.java:3784)
>   at 
> org.datanucleus.state.JDOStateManager.makePersistent(JDOStateManager.java:3760)
>   at 
> org.datanucleus.ExecutionContextImpl.persistObjectInternal(ExecutionContextImpl.java:2219)
>   at 
> org.datanucleus.ExecutionContextImpl.persistObjectWork(ExecutionContextImpl.java:2065)
>   at 
> org.datanucleus.ExecutionContextImpl.persistObject(ExecutionContextImpl.java:1913)
>   at 
> org.datanucleus.ExecutionContextThreadedImpl.persistObject(ExecutionContextThreadedImpl.java:217)
>   at 
> org.datanucleus.api.jdo.JDOPersistenceManager.jdoMakePersistent(JDOPersistenceManager.java:727)
>   at 
> org.datanucleus.api.jdo.JDOPersistenceManager.makePersistent(JDOPersistenceManager.java:752)
>   at 
> org.apache.hadoop.hive.metastore.ObjectStore.createTable(ObjectStore.java:814)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> 

[jira] [Created] (SPARK-19230) View creation in Derby gets SQLDataException because definition gets very big

2017-01-15 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-19230:
--

 Summary: View creation in Derby gets SQLDataException because 
definition gets very big
 Key: SPARK-19230
 URL: https://issues.apache.org/jira/browse/SPARK-19230
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.0
Reporter: Ohad Raviv


somewhat related to SPARK-6024.
In our tests mockups we have a process that creates a pretty big table 
definition:
{quote}
create table t1 (
field_name_1 string,
field_name_2 string,
field_name_3 string,
.
.
.
field_name_1000 string
)
{quote}
which succeeds. But then we add some calculated fields on top of it with a view:
{quote}
create view v1 as 
select *, 
  some_udf(field_name_1) as field_calc1,
  some_udf(field_name_2) as field_calc2,
  .
  .
  some_udf(field_name_10) as field_calc10
from t1
{quote}
And we get this exception:
{quote}
java.sql.SQLDataException: A truncation error was encountered trying to shrink 
LONG VARCHAR 'SELECT `gen_attr_0` AS `field_name_1`, `gen_attr_1` AS 
`field_name_2&' to length 32700.
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown 
Source)
at 
org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown 
Source)
at 
org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown 
Source)
at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown 
Source)
at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown 
Source)
at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown 
Source)
at 
org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeStatement(Unknown 
Source)
at 
org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeLargeUpdate(Unknown 
Source)
at 
org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeUpdate(Unknown Source)
at 
com.jolbox.bonecp.PreparedStatementHandle.executeUpdate(PreparedStatementHandle.java:205)
at 
org.datanucleus.store.rdbms.ParamLoggingPreparedStatement.executeUpdate(ParamLoggingPreparedStatement.java:399)
at 
org.datanucleus.store.rdbms.SQLController.executeStatementUpdate(SQLController.java:439)
at 
org.datanucleus.store.rdbms.request.InsertRequest.execute(InsertRequest.java:410)
at 
org.datanucleus.store.rdbms.RDBMSPersistenceHandler.insertTable(RDBMSPersistenceHandler.java:167)
at 
org.datanucleus.store.rdbms.RDBMSPersistenceHandler.insertObject(RDBMSPersistenceHandler.java:143)
at 
org.datanucleus.state.JDOStateManager.internalMakePersistent(JDOStateManager.java:3784)
at 
org.datanucleus.state.JDOStateManager.makePersistent(JDOStateManager.java:3760)
at 
org.datanucleus.ExecutionContextImpl.persistObjectInternal(ExecutionContextImpl.java:2219)
at 
org.datanucleus.ExecutionContextImpl.persistObjectWork(ExecutionContextImpl.java:2065)
at 
org.datanucleus.ExecutionContextImpl.persistObject(ExecutionContextImpl.java:1913)
at 
org.datanucleus.ExecutionContextThreadedImpl.persistObject(ExecutionContextThreadedImpl.java:217)
at 
org.datanucleus.api.jdo.JDOPersistenceManager.jdoMakePersistent(JDOPersistenceManager.java:727)
at 
org.datanucleus.api.jdo.JDOPersistenceManager.makePersistent(JDOPersistenceManager.java:752)
at 
org.apache.hadoop.hive.metastore.ObjectStore.createTable(ObjectStore.java:814)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:114)
at com.sun.proxy.$Proxy17.createTable(Unknown Source)
at 
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1416)
at 
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1449)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
at com.sun.proxy.$Proxy19.create_table_with_environment_context(Unknown 
Source)
at 

[jira] [Commented] (SPARK-18861) Spark-SQL unconsistent behavior with "struct" expressions

2016-12-15 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15750729#comment-15750729
 ] 

Ohad Raviv commented on SPARK-18861:


I think it was a problem at v2.0.0.
it is better to resolve it as fixed at version 2.1

> Spark-SQL unconsistent behavior with "struct" expressions
> -
>
> Key: SPARK-18861
> URL: https://issues.apache.org/jira/browse/SPARK-18861
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Ohad Raviv
>
> We are getting strangly inconsistent behavior with expressions involving 
> "struct". Let's start with this simple table:
> {quote}
> Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c").createOrReplaceTempView("t1")
> sql("desc t1").show()
> {quote}
> Then we get this DF:
> {quote}
> |col_name|data_type|comment|
> |   a|  int|   |
> |   b|  int|   |
> |   c|  int|   |
> {quote}
> Now, although we can clearly see that all the fields are of type int, we we 
> run:
> {quote}
> sql("SELECT case when a>b then struct(a,b) else struct(c,c) end from t1")
> {quote}
> we get this error:
> {quote}
> org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (t1.`a` > 
> t1.`b`) THEN struct(t1.`a`, t1.`b`) ELSE struct(t1.`c`, t1.`c`) END' due to 
> data type mismatch: THEN and ELSE expressions should all be same type or 
> coercible to a common type; line 1 pos 7
> {quote}
> if we try this:
> {quote}
> sql("SELECT case when a>b then struct(cast(a as int), cast(b as int)) else 
> struct(cast(c as int), cast(c as int)) end from t1")
> {quote}
> we get another exception:
> {quote}
> requirement failed: Unresolved attributes found when constructing 
> LocalRelation.
> java.lang.IllegalArgumentException: requirement failed: Unresolved attributes 
> found when constructing LocalRelation.
>   at scala.Predef$.require(Predef.scala:224)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LocalRelation.(LocalRelation.scala:49)
> {quote}
> However, these do work:
> {quote}
> sql("SELECT case when a>b then struct(cast(a as double), cast(b as double)) 
> else struct(cast(c as double), cast(c as double)) end from t1")
> sql("SELECT case when a>b then struct(cast(a as string), cast(b as string)) 
> else struct(cast(c as string), cast(c as string)) end from t1")
> {quote}
> any ideas?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18861) Spark-SQL unconsistent behavior with "struct" expressions

2016-12-14 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-18861:
--

 Summary: Spark-SQL unconsistent behavior with "struct" expressions
 Key: SPARK-18861
 URL: https://issues.apache.org/jira/browse/SPARK-18861
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0
Reporter: Ohad Raviv


We are getting strangly inconsistent behavior with expressions involving 
"struct". Let's start with this simple table:
{quote}
Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c").createOrReplaceTempView("t1")
sql("desc t1").show()
{quote}
Then we get this DF:
{quote}
|col_name|data_type|comment|
|   a|  int|   |
|   b|  int|   |
|   c|  int|   |
{quote}
Now, although we can clearly see that all the fields are of type int, we we run:
{quote}
sql("SELECT case when a>b then struct(a,b) else struct(c,c) end from t1")
{quote}
we get this error:
{quote}
org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (t1.`a` > 
t1.`b`) THEN struct(t1.`a`, t1.`b`) ELSE struct(t1.`c`, t1.`c`) END' due to 
data type mismatch: THEN and ELSE expressions should all be same type or 
coercible to a common type; line 1 pos 7
{quote}
if we try this:
{quote}
sql("SELECT case when a>b then struct(cast(a as int), cast(b as int)) else 
struct(cast(c as int), cast(c as int)) end from t1")
{quote}
we get another exception:
{quote}
requirement failed: Unresolved attributes found when constructing LocalRelation.
java.lang.IllegalArgumentException: requirement failed: Unresolved attributes 
found when constructing LocalRelation.
at scala.Predef$.require(Predef.scala:224)
at 
org.apache.spark.sql.catalyst.plans.logical.LocalRelation.(LocalRelation.scala:49)
{quote}
However, these do work:
{quote}
sql("SELECT case when a>b then struct(cast(a as double), cast(b as double)) 
else struct(cast(c as double), cast(c as double)) end from t1")
sql("SELECT case when a>b then struct(cast(a as string), cast(b as string)) 
else struct(cast(c as string), cast(c as string)) end from t1")
{quote}

any ideas?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17662) Dedup UDAF

2016-12-14 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15748264#comment-15748264
 ] 

Ohad Raviv commented on SPARK-17662:


When I tried to use you suggestion I have encountered some problems, so I have 
opened a StackOverflow question and you can find all the details there:
http://stackoverflow.com/questions/41143001/spark-sql-dedup-rows/

> Dedup UDAF
> --
>
> Key: SPARK-17662
> URL: https://issues.apache.org/jira/browse/SPARK-17662
> Project: Spark
>  Issue Type: New Feature
>Reporter: Ohad Raviv
>
> We have a common use case od deduping a table in a creation order.
> For example, we have an event log of user actions. A user marks his favorite 
> category from time to time.
> In our analytics we would like to know only the user's last favorite category.
> The data:
> user_idaction_typevaluedate
> 123  fav category   1   2016-02-01
> 123  fav category   4   2016-02-02
> 123  fav category   8   2016-02-03
> 123  fav category   2   2016-02-04
> we would like to get only the last update by the date column.
> we could of-course do it in sql:
> select * from (
> select *, row_number() over (partition by user_id,action_type order by date 
> desc) as rnum from tbl)
> where rnum=1;
> but then, I believe it can't be optimized on the mappers side and we'll get 
> all the data shuffled to the reducers instead of partially aggregated in the 
> map side.
> We have written a UDAF for this, but then we have other issues - like 
> blocking push-down-predicate for columns.
> do you have any idea for a proper solution?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18748) UDF multiple evaluations causes very poor performance

2016-12-07 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15731203#comment-15731203
 ] 

Ohad Raviv commented on SPARK-18748:


accidently. I already closed the other ticket as duplicate

> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18748
> URL: https://issues.apache.org/jira/browse/SPARK-18748
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Ohad Raviv
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-18747) UDF multiple evaluations causes very poor performance

2016-12-06 Thread Ohad Raviv (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv closed SPARK-18747.
--
Resolution: Duplicate

> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18747
> URL: https://issues.apache.org/jira/browse/SPARK-18747
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Ohad Raviv
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18748) UDF multiple evaluations causes very poor performance

2016-12-06 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-18748:
--

 Summary: UDF multiple evaluations causes very poor performance
 Key: SPARK-18748
 URL: https://issues.apache.org/jira/browse/SPARK-18748
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.1
Reporter: Ohad Raviv


We have a use case where we have a relatively expensive UDF that needs to be 
calculated. The problem is that instead of being calculated once, it gets 
calculated over and over again.
for example:
{quote}
def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
not null and c<>''").show
{quote}
with the output:
{quote}
blahblah1
blahblah1
blahblah1
+---+
|  c|
+---+
|nothing|
+---+
{quote}
You can see that for each reference of column "c" you will get the println.
that causes very poor performance for our real use case.
This also came out on StackOverflow:
http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/

with two problematic work-arounds:
1. cache() after the first time. e.g.
{quote}
hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
null and c<>''").show
{quote}
while it works, in our case we can't do that because the table is too big to 
cache.

2. move back and forth to rdd:
{quote}
val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
c<>''").show
{quote}
which works but then we loose some of the optimizations like push down 
predicate features, etc. and its very ugly.

Any ideas on how we can make the UDF get calculated just once in a reasonable 
way?





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18747) UDF multiple evaluations causes very poor performance

2016-12-06 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-18747:
--

 Summary: UDF multiple evaluations causes very poor performance
 Key: SPARK-18747
 URL: https://issues.apache.org/jira/browse/SPARK-18747
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.1
Reporter: Ohad Raviv


We have a use case where we have a relatively expensive UDF that needs to be 
calculated. The problem is that instead of being calculated once, it gets 
calculated over and over again.
for example:
{quote}
def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
not null and c<>''").show
{quote}
with the output:
{quote}
blahblah1
blahblah1
blahblah1
+---+
|  c|
+---+
|nothing|
+---+
{quote}
You can see that for each reference of column "c" you will get the println.
that causes very poor performance for our real use case.
This also came out on StackOverflow:
http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/

with two problematic work-arounds:
1. cache() after the first time. e.g.
{quote}
hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
null and c<>''").show
{quote}
while it works, in our case we can't do that because the table is too big to 
cache.

2. move back and forth to rdd:
{quote}
val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
c<>''").show
{quote}
which works but then we loose some of the optimizations like push down 
predicate features, etc. and its very ugly.

Any ideas on how we can make the UDF get calculated just once in a reasonable 
way?





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17662) Dedup UDAF

2016-11-16 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15672928#comment-15672928
 ] 

Ohad Raviv commented on SPARK-17662:


you're right,
great solution!
I didn't know about the "max(struct(date, *))" syntax.

thanks!

> Dedup UDAF
> --
>
> Key: SPARK-17662
> URL: https://issues.apache.org/jira/browse/SPARK-17662
> Project: Spark
>  Issue Type: New Feature
>Reporter: Ohad Raviv
>
> We have a common use case od deduping a table in a creation order.
> For example, we have an event log of user actions. A user marks his favorite 
> category from time to time.
> In our analytics we would like to know only the user's last favorite category.
> The data:
> user_idaction_typevaluedate
> 123  fav category   1   2016-02-01
> 123  fav category   4   2016-02-02
> 123  fav category   8   2016-02-03
> 123  fav category   2   2016-02-04
> we would like to get only the last update by the date column.
> we could of-course do it in sql:
> select * from (
> select *, row_number() over (partition by user_id,action_type order by date 
> desc) as rnum from tbl)
> where rnum=1;
> but then, I believe it can't be optimized on the mappers side and we'll get 
> all the data shuffled to the reducers instead of partially aggregated in the 
> map side.
> We have written a UDAF for this, but then we have other issues - like 
> blocking push-down-predicate for columns.
> do you have any idea for a proper solution?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17662) Dedup UDAF

2016-09-25 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-17662:
--

 Summary: Dedup UDAF
 Key: SPARK-17662
 URL: https://issues.apache.org/jira/browse/SPARK-17662
 Project: Spark
  Issue Type: New Feature
Reporter: Ohad Raviv


We have a common use case od deduping a table in a creation order.
For example, we have an event log of user actions. A user marks his favorite 
category from time to time.
In our analytics we would like to know only the user's last favorite category.
The data:
user_idaction_typevaluedate
123  fav category   1   2016-02-01
123  fav category   4   2016-02-02
123  fav category   8   2016-02-03
123  fav category   2   2016-02-04

we would like to get only the last update by the date column.

we could of-course do it in sql:
select * from (
select *, row_number() over (partition by user_id,action_type order by date 
desc) as rnum from tbl)
where rnum=1;

but then, I believe it can't be optimized on the mappers side and we'll get all 
the data shuffled to the reducers instead of partially aggregated in the map 
side.

We have written a UDAF for this, but then we have other issues - like blocking 
push-down-predicate for columns.

do you have any idea for a proper solution?




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16976) KCore implementation

2016-08-09 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15413461#comment-15413461
 ] 

Ohad Raviv commented on SPARK-16976:


well,
it's not for MLlib but for GraphX and seems very much in the spirit of 
ConnectedComponents 

> KCore implementation
> 
>
> Key: SPARK-16976
> URL: https://issues.apache.org/jira/browse/SPARK-16976
> Project: Spark
>  Issue Type: New Feature
>  Components: GraphX
>Reporter: Ohad Raviv
>Priority: Minor
>
> Added K-Core implementation to GraphX. Looks like a quick win.. very simple 
> algorithm and can be quite handy in noise filtering. We have used it already 
> a few times.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16976) KCore implementation

2016-08-09 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-16976:
--

 Summary: KCore implementation
 Key: SPARK-16976
 URL: https://issues.apache.org/jira/browse/SPARK-16976
 Project: Spark
  Issue Type: New Feature
  Components: GraphX
Reporter: Ohad Raviv
Priority: Minor


Added K-Core implementation to GraphX. Looks like a quick win.. very simple 
algorithm and can be quite handy in noise filtering. We have used it already a 
few times.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16820) Sparse - Sparse matrix multiplication

2016-07-31 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401039#comment-15401039
 ] 

Ohad Raviv commented on SPARK-16820:


I will create a PR soon with a suggested fix, but tell me what you think about 
that..

> Sparse - Sparse matrix multiplication
> -
>
> Key: SPARK-16820
> URL: https://issues.apache.org/jira/browse/SPARK-16820
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 2.0.0
>Reporter: Ohad Raviv
>
> While working on MCL implementation on Spark we have encountered some 
> difficulties.
> The main part of this process is distributed sparse matrix multiplication 
> that has two main steps:
> 1.Simulate multiply – preparation before the real multiplication in order 
> to see which blocks should be multiplied.
> 2.The actual blocks multiplication and summation.
> In our case the sparse matrix has 50M rows and columns, and 2B non-zeros.
> The current multiplication suffers from these issues:
> 1.A relatively trivial bug already fixed in the first step the caused the 
> process to be very slow [SPARK-16469]
> 2.Still after the bug fix, if we have too many blocks the Simulate 
> multiply will take very long time and will multiply the data many times. 
> (O(n^3) where n is the number of blocks)
> 3.Spark supports only multiplication with Dense matrices. Thus, it 
> converts a Sparse matrix into a dense matrix before the multiplication.
> 4.For summing the intermediate block results Spark uses Breeze’s CSC 
> matrix operations – here the problem is that it is very inefficient to update 
> a CSC matrix in a zero value.
> That means that with many blocks (default block size is 1024) – in our case 
> 50M/1024 ~= 50K, the simulate multiply will effectively never finish or will 
> generate 50K*16GB ~= 1000TB of data. On the other hand, if we use bigger 
> block size e.g. 100k we get OutOfMemoryException in the “toDense” method of 
> the multiply. We have worked around that by implementing our-selves both the 
> Sparse multiplication and addition in a very naïve way – but at least it 
> works.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16821) GraphX MCL algorithm

2016-07-31 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-16821:
--

 Summary: GraphX MCL algorithm
 Key: SPARK-16821
 URL: https://issues.apache.org/jira/browse/SPARK-16821
 Project: Spark
  Issue Type: New Feature
  Components: GraphX
Affects Versions: 2.0.0
Reporter: Ohad Raviv


we have had the need to use MCL clustering algorithm in a project we are 
working on. We have based our implementation on joandre's code:
https://github.com/joandre/MCL_spark
We had a few scaling problems that we had to work around our selves and opened 
a seperate Jira on them.
Since we started to work on the algorithm we have been approached a few times 
by different people that also have the need for this algorithm.
Do you think you can add this algorithm to your base code? it looks like now 
there isn't any graph clustering algorithm yet..




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16820) Sparse - Sparse matrix multiplication

2016-07-31 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-16820:
--

 Summary: Sparse - Sparse matrix multiplication
 Key: SPARK-16820
 URL: https://issues.apache.org/jira/browse/SPARK-16820
 Project: Spark
  Issue Type: New Feature
  Components: ML
Affects Versions: 2.0.0
Reporter: Ohad Raviv


While working on MCL implementation on Spark we have encountered some 
difficulties.
The main part of this process is distributed sparse matrix multiplication that 
has two main steps:
1.  Simulate multiply – preparation before the real multiplication in order 
to see which blocks should be multiplied.
2.  The actual blocks multiplication and summation.

In our case the sparse matrix has 50M rows and columns, and 2B non-zeros.

The current multiplication suffers from these issues:

1.  A relatively trivial bug already fixed in the first step the caused the 
process to be very slow [SPARK-16469]
2.  Still after the bug fix, if we have too many blocks the Simulate 
multiply will take very long time and will multiply the data many times. 
(O(n^3) where n is the number of blocks)
3.  Spark supports only multiplication with Dense matrices. Thus, it 
converts a Sparse matrix into a dense matrix before the multiplication.
4.  For summing the intermediate block results Spark uses Breeze’s CSC 
matrix operations – here the problem is that it is very inefficient to update a 
CSC matrix in a zero value.

That means that with many blocks (default block size is 1024) – in our case 
50M/1024 ~= 50K, the simulate multiply will effectively never finish or will 
generate 50K*16GB ~= 1000TB of data. On the other hand, if we use bigger block 
size e.g. 100k we get OutOfMemoryException in the “toDense” method of the 
multiply. We have worked around that by implementing our-selves both the Sparse 
multiplication and addition in a very naïve way – but at least it works.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16469) Long running Driver task while multiplying big matrices

2016-07-10 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-16469:
--

 Summary: Long running Driver task while multiplying big matrices
 Key: SPARK-16469
 URL: https://issues.apache.org/jira/browse/SPARK-16469
 Project: Spark
  Issue Type: Improvement
  Components: MLlib
Affects Versions: 2.0.0
Reporter: Ohad Raviv
Priority: Minor
 Fix For: 2.0.0


We have a use case of multiplying very big sparse matrices. we have about 
1000x1000 distributed block matrices multiplication and the simulate multiply 
goes like O(n^4) (n being 1000). it takes about 1.5 hours. We modified it 
slightly with classical hashmap and now run in about 30 seconds O(n^2).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13313) Strongly connected components doesn't find all strongly connected components

2016-03-19 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15199811#comment-15199811
 ] 

Ohad Raviv commented on SPARK-13313:


Hi,
I am trying to use graphx's SCC and was very concerned with this issue, so I 
have taken this dataset and ran it with python's networkx 
strongly_connected_components function and got exactly the same results of 519 
SCCs with maximal size = 4051.
So although I don't know what is the real result, the fact that both algorithms 
agree make me believe that they are correct.
I have also looked at the code and it looks fine to me, I don't agree that you 
should change the edge direction on line 89.

> Strongly connected components doesn't find all strongly connected components
> 
>
> Key: SPARK-13313
> URL: https://issues.apache.org/jira/browse/SPARK-13313
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 1.6.0
>Reporter: Petar Zecevic
>
> Strongly connected components algorithm doesn't find all strongly connected 
> components. I was using Wikispeedia dataset 
> (http://snap.stanford.edu/data/wikispeedia.html) and the algorithm found 519 
> SCCs and one of them had 4051 vertices, which in reality don't have any edges 
> between them. 
> I think the problem could be on line 89 of StronglyConnectedComponents.scala 
> file where EdgeDirection.In should be changed to EdgeDirection.Out. I believe 
> the second Pregel call should use Out edge direction, the same as the first 
> call because the direction is reversed in the provided sendMsg function 
> (message is sent to source vertex and not destination vertex).
> If that is changed (line 89), the algorithm starts finding much more SCCs, 
> but eventually stack overflow exception occurs. I believe graph objects that 
> are changed through iterations should not be cached, but checkpointed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org