[jira] [Updated] (SPARK-41277) Leverage shuffle key as bucketing properties
[ https://issues.apache.org/jira/browse/SPARK-41277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv updated SPARK-41277: --- Summary: Leverage shuffle key as bucketing properties (was: Save and leverage shuffle key in tblproperties) > Leverage shuffle key as bucketing properties > > > Key: SPARK-41277 > URL: https://issues.apache.org/jira/browse/SPARK-41277 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > I'm not sure if I'm not missing anything trivial. > In a typical process, many datasets get materialized and many of them after a > shuffle (e.g join). then they would again be involved in further actions and > often use the same key. > Wouldn't it make sense to save the shuffle key along with the table to avoid > unnecessary shuffles? > Also, the implementation seems quite straightforward - to just leverage the > bucketing mechanism. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41277) Save and leverage shuffle key in tblproperties
[ https://issues.apache.org/jira/browse/SPARK-41277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651877#comment-17651877 ] Ohad Raviv commented on SPARK-41277: I managed to do some quick-and-dirty solution, just to be able to check it on existing processes. I had to change `{_}spark.sql.legacy.createHiveTableByDefault=false{_}` as Hive provider, Spark and bucketing do not play nicely together (Spark uses a different hash function from Hive). then I added a custom optimization rule: {code:java} object BucketingRule extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { plan transform { case c @ CreateDataSourceTableAsSelectCommand(table, SaveMode.ErrorIfExists, query, _) if query.resolved => query match { case Aggregate(grouping, _, _) => val numBuckets = SQLConf.get.numShufflePartitions val bucketSpec = BucketSpec(numBuckets, grouping.map(_.asInstanceOf[AttributeReference].name), Nil) c.copy(table = table.copy(bucketSpec = Some(bucketSpec))) case _ => c } } } } spark.sessionState.experimentalMethods.extraOptimizations ++= BucketingRule :: Nil{code} And it works on this mock: {code:java} (1 to 30).map(i => ("k_" + (i-(1-i%2)), "v_" + i)) .toDF("id", "val") .createOrReplaceTempView("t") spark.sql(s"create table tbl1 select id,max(val) val, count(1) cnt from t group by id") spark.table("t").write.bucketBy(3, "id").saveAsTable("tbl2") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val dfPlan = spark.sql("create table tbl3 as select tbl1.* from tbl1" + " join tbl2 on tbl1.id=tbl2.id") dfPlan.explain(true) spark.table("tbl3").show() {code} you could see that `tbl1` gets created as a bucketed table. I will try to see if we get any noticeable performance gain. meanwhile, could you suggest/direct to a better solution? > Save and leverage shuffle key in tblproperties > -- > > Key: SPARK-41277 > URL: https://issues.apache.org/jira/browse/SPARK-41277 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > I'm not sure if I'm not missing anything trivial. > In a typical process, many datasets get materialized and many of them after a > shuffle (e.g join). then they would again be involved in further actions and > often use the same key. > Wouldn't it make sense to save the shuffle key along with the table to avoid > unnecessary shuffles? > Also, the implementation seems quite straightforward - to just leverage the > bucketing mechanism. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41277) Save and leverage shuffle key in tblproperties
[ https://issues.apache.org/jira/browse/SPARK-41277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17649300#comment-17649300 ] Ohad Raviv commented on SPARK-41277: [~gurwls223] - can I please get your opinion here? > Save and leverage shuffle key in tblproperties > -- > > Key: SPARK-41277 > URL: https://issues.apache.org/jira/browse/SPARK-41277 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > I'm not sure if I'm not missing anything trivial. > In a typical process, many datasets get materialized and many of them after a > shuffle (e.g join). then they would again be involved in further actions and > often use the same key. > Wouldn't it make sense to save the shuffle key along with the table to avoid > unnecessary shuffles? > Also, the implementation seems quite straightforward - to just leverage the > bucketing mechanism. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
[ https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646973#comment-17646973 ] Ohad Raviv commented on SPARK-41510: ok.. after diving into the code I think I found what I was looking for: {code:java} spark._sc._python_includes.append("/shared_nfs/my_folder") {code} but it is kind of hacky. tell me what you think, and if we could make it more official/documented. > Support easy way for user defined PYTHONPATH in workers > --- > > Key: SPARK-41510 > URL: https://issues.apache.org/jira/browse/SPARK-41510 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > When working interactively with Spark through notebooks in various envs - > Databricks/YARN I often encounter a very frustrating process of trying to add > new python modules and even change their code without starting a new spark > session/cluster. > In the driver side it is easy to add things like `sys.path.append()` but if > for example, if a UDF code is importing a function from a local module, then > the pickle boundaries will assume that the module exists in the workers, and > fail on "python module does not exist..". > To update the code "online" I can add NFS volume to the workers' PYTHONPATH. > However, setting the PYTHONPATH in the workers is not easy as it gets > overridden by someone (databricks/spark) along the way. a few ugly > workarounds are suggested like running a "dummy" UDF on the workers to add > the folder to the sys.path. > I think all of that could easily be solved if we just add a dedicated > `spark.conf` the will get merged into the worker's PYTHONPATH, just here: > [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] > > please tell me what you think, and I will make the PR. > thanks. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
[ https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646734#comment-17646734 ] Ohad Raviv commented on SPARK-41510: the conda solution is more for a "static" packages. the scenario is this: we're developing a python library (more than one .py file) and want to do it interactively in the notebooks. so we have all the modules in some folder and we add this folder to the driver's sys.path. Then, if we for example use a function from the module inside a UDF, we get: "ModuleNotFoundError: No module named 'some_module' ". The reason is that some_module is not in the PYTHONPATH/sys.path of the workers. the code itself is accessible to the workers for example in a shared NFS folder. so all we now need is to add the path. we can do it inside the UDF something like: ``` if "/shared_nfs/my_folder" not in sys.path: sys.path.insert(0, "/shared_nfs/my_folder") ``` but that is both very ugly and only a partial solution as it works only in UDF case. the suggestion is to have some kind of mechanism to easily add a folder to the workers' sys.path. the option of wrapping the code in zip/egg and add it makes a very long development cycle and requires restarting the spark session and the Notebook to lose its state. with the suggestion above we could actually edit the python package interactively and see the changes almost immediately. hope it is clearer now. > Support easy way for user defined PYTHONPATH in workers > --- > > Key: SPARK-41510 > URL: https://issues.apache.org/jira/browse/SPARK-41510 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > When working interactively with Spark through notebooks in various envs - > Databricks/YARN I often encounter a very frustrating process of trying to add > new python modules and even change their code without starting a new spark > session/cluster. > In the driver side it is easy to add things like `sys.path.append()` but if > for example, if a UDF code is importing a function from a local module, then > the pickle boundaries will assume that the module exists in the workers, and > fail on "python module does not exist..". > To update the code "online" I can add NFS volume to the workers' PYTHONPATH. > However, setting the PYTHONPATH in the workers is not easy as it gets > overridden by someone (databricks/spark) along the way. a few ugly > workarounds are suggested like running a "dummy" UDF on the workers to add > the folder to the sys.path. > I think all of that could easily be solved if we just add a dedicated > `spark.conf` the will get merged into the worker's PYTHONPATH, just here: > [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] > > please tell me what you think, and I will make the PR. > thanks. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
[ https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646606#comment-17646606 ] Ohad Raviv edited comment on SPARK-41510 at 12/13/22 12:23 PM: --- [~hvanhovell], [~gurwls223] - can you please look at this/refer that to someone? was (Author: uzadude): [~hvanhovell] - can you please refer that to someone? > Support easy way for user defined PYTHONPATH in workers > --- > > Key: SPARK-41510 > URL: https://issues.apache.org/jira/browse/SPARK-41510 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > When working interactively with Spark through notebooks in various envs - > Databricks/YARN I often encounter a very frustrating process of trying to add > new python modules and even change their code without starting a new spark > session/cluster. > In the driver side it is easy to add things like `sys.path.append()` but if > for example, if a UDF code is importing a function from a local module, then > the pickle boundaries will assume that the module exists in the workers, and > fail on "python module does not exist..". > To update the code "online" I can add NFS volume to the workers' PYTHONPATH. > However, setting the PYTHONPATH in the workers is not easy as it gets > overridden by someone (databricks/spark) along the way. a few ugly > workarounds are suggested like running a "dummy" UDF on the workers to add > the folder to the sys.path. > I think all of that could easily be solved if we just add a dedicated > `spark.conf` the will get merged into the worker's PYTHONPATH, just here: > [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] > > please tell me what you think, and I will make the PR. > thanks. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
[ https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646606#comment-17646606 ] Ohad Raviv commented on SPARK-41510: [~hvanhovell] - can you please refer that to someone? > Support easy way for user defined PYTHONPATH in workers > --- > > Key: SPARK-41510 > URL: https://issues.apache.org/jira/browse/SPARK-41510 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > When working interactively with Spark through notebooks in various envs - > Databricks/YARN I often encounter a very frustrating process of trying to add > new python modules and even change their code without starting a new spark > session/cluster. > In the driver side it is easy to add things like `sys.path.append()` but if > for example, if a UDF code is importing a function from a local module, then > the pickle boundaries will assume that the module exists in the workers, and > fail on "python module does not exist..". > To update the code "online" I can add NFS volume to the workers' PYTHONPATH. > However, setting the PYTHONPATH in the workers is not easy as it gets > overridden by someone (databricks/spark) along the way. a few ugly > workarounds are suggested like running a "dummy" UDF on the workers to add > the folder to the sys.path. > I think all of that could easily be solved if we just add a dedicated > `spark.conf` the will get merged into the worker's PYTHONPATH, just here: > [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] > > please tell me what you think, and I will make the PR. > thanks. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
[ https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv updated SPARK-41510: --- Description: When working interactively with Spark through notebooks in various envs - Databricks/YARN I often encounter a very frustrating process of trying to add new python modules and even change their code without starting a new spark session/cluster. In the driver side it is easy to add things like `sys.path.append()` but if for example, if a UDF code is importing a function from a local module, then the pickle boundaries will assume that the module exists in the workers, and fail on "python module does not exist..". To update the code "online" I can add NFS volume to the workers' PYTHONPATH. However, setting the PYTHONPATH in the workers is not easy as it gets overridden by someone (databricks/spark) along the way. a few ugly workarounds are suggested like running a "dummy" UDF on the workers to add the folder to the sys.path. I think all of that could easily be solved if we just add a dedicated `spark.conf` the will get merged into the worker's PYTHONPATH, just here: [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] please tell me what you think, and I will make the PR. thanks. was: When working interactively with Spark through notebooks in various envs - Databricks/YARN I often encounter a very frustrating process of trying to add new python modules and even change their code without starting a new spark session/cluster. in the driver side it is easy to add things like `sys.path.append()` but if for example UDF code is importing function from a local module, then the pickle boundaries will assume that the module exists in the workers. and then I fail on "python module does not exist..". adding NFS volumes to the workers PYTHONPATH could solve it, but it requires restarting the session/cluster and worse doesn't work in all envs as the PYTHONPATH gets overridden by someone (databricks/spark) along the way. a few ugly work around are suggested like running a "dummy" udf on workers to add the folder to the sys.path. I think all of that could easily be solved if we add a spark.conf to add to the worker PYTHONPATH. here: [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] please tell me what you think, and I will make the PR. thanks. > Support easy way for user defined PYTHONPATH in workers > --- > > Key: SPARK-41510 > URL: https://issues.apache.org/jira/browse/SPARK-41510 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > When working interactively with Spark through notebooks in various envs - > Databricks/YARN I often encounter a very frustrating process of trying to add > new python modules and even change their code without starting a new spark > session/cluster. > In the driver side it is easy to add things like `sys.path.append()` but if > for example, if a UDF code is importing a function from a local module, then > the pickle boundaries will assume that the module exists in the workers, and > fail on "python module does not exist..". > To update the code "online" I can add NFS volume to the workers' PYTHONPATH. > However, setting the PYTHONPATH in the workers is not easy as it gets > overridden by someone (databricks/spark) along the way. a few ugly > workarounds are suggested like running a "dummy" UDF on the workers to add > the folder to the sys.path. > I think all of that could easily be solved if we just add a dedicated > `spark.conf` the will get merged into the worker's PYTHONPATH, just here: > [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] > > please tell me what you think, and I will make the PR. > thanks. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
Ohad Raviv created SPARK-41510: -- Summary: Support easy way for user defined PYTHONPATH in workers Key: SPARK-41510 URL: https://issues.apache.org/jira/browse/SPARK-41510 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.3.1 Reporter: Ohad Raviv When working interactively with Spark through notebooks in various envs - Databricks/YARN I often encounter a very frustrating process of trying to add new python modules and even change their code without starting a new spark session/cluster. in the driver side it is easy to add things like `sys.path.append()` but if for example UDF code is importing function from a local module, then the pickle boundaries will assume that the module exists in the workers. and then I fail on "python module does not exist..". adding NFS volumes to the workers PYTHONPATH could solve it, but it requires restarting the session/cluster and worse doesn't work in all envs as the PYTHONPATH gets overridden by someone (databricks/spark) along the way. a few ugly work around are suggested like running a "dummy" udf on workers to add the folder to the sys.path. I think all of that could easily be solved if we add a spark.conf to add to the worker PYTHONPATH. here: [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] please tell me what you think, and I will make the PR. thanks. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41277) Save and leverage shuffle key in tblproperties
Ohad Raviv created SPARK-41277: -- Summary: Save and leverage shuffle key in tblproperties Key: SPARK-41277 URL: https://issues.apache.org/jira/browse/SPARK-41277 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.1 Reporter: Ohad Raviv I'm not sure if I'm not missing anything trivial. In a typical process, many datasets get materialized and many of them after a shuffle (e.g join). then they would again be involved in further actions and often use the same key. Wouldn't it make sense to save the shuffle key along with the table to avoid unnecessary shuffles? Also, the implementation seems quite straightforward - to just leverage the bucketing mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37752) Python UDF fails when it should not get evaluated
[ https://issues.apache.org/jira/browse/SPARK-37752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17465972#comment-17465972 ] Ohad Raviv commented on SPARK-37752: That is what I deduced. thanks for the answer! > Python UDF fails when it should not get evaluated > - > > Key: SPARK-37752 > URL: https://issues.apache.org/jira/browse/SPARK-37752 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.4 >Reporter: Ohad Raviv >Priority: Minor > > Haven't checked on newer versions yet. > If i define in Python: > {code:java} > def udf1(col1): > print(col1[2]) > return "blah" > spark.udf.register("udf1", udf1) {code} > and then use it in SQL: > {code:java} > select case when length(c)>2 then udf1(c) end > from ( > select explode(array("123","234","12")) as c > ) {code} > it fails on: > {noformat} > File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, > in main > process() > File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, > in process > serializer.dump_stream(func(split_index, iterator), outfile) > File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 155, > in > func = lambda _, it: map(mapper, it) > File "", line 1, in > File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 76, in > > return lambda *a: f(*a) > File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in > wrapper > return f(*args, **kwargs) > File "", line 3, in udf1 > IndexError: string index out of range{noformat} > Although in the out-of-range row it should not get evaluated at all as the > case-when filters for lengths of more than 2 letters. > the same scenario works great when we define instead a Scala UDF. > will check now if it happens also for newer versions. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-37752) Python UDF fails when it should not get evaluated
Ohad Raviv created SPARK-37752: -- Summary: Python UDF fails when it should not get evaluated Key: SPARK-37752 URL: https://issues.apache.org/jira/browse/SPARK-37752 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.4 Reporter: Ohad Raviv Haven't checked on newer versions yet. If i define in Python: {code:java} def udf1(col1): print(col1[2]) return "blah" spark.udf.register("udf1", udf1) {code} and then use it in SQL: {code:java} select case when length(c)>2 then udf1(c) end from ( select explode(array("123","234","12")) as c ) {code} it fails on: {noformat} File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main process() File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 155, in func = lambda _, it: map(mapper, it) File "", line 1, in File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 76, in return lambda *a: f(*a) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper return f(*args, **kwargs) File "", line 3, in udf1 IndexError: string index out of range{noformat} Although in the out-of-range row it should not get evaluated at all as the case-when filters for lengths of more than 2 letters. the same scenario works great when we define instead a Scala UDF. will check now if it happens also for newer versions. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34416) Support avroSchemaUrl in addition to avroSchema
Ohad Raviv created SPARK-34416: -- Summary: Support avroSchemaUrl in addition to avroSchema Key: SPARK-34416 URL: https://issues.apache.org/jira/browse/SPARK-34416 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0, 2.3.0, 3.2.0 Reporter: Ohad Raviv We have a use case in which we read a huge table in Avro format. About 30k columns. using the default Hive reader - `AvroGenericRecordReader` it is just hangs forever. after 4 hours not even one task has finished. We tried instead to use `spark.read.format("com.databricks.spark.avro").load(..)` but we failed on: ``` org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema .. at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:85) at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:67) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:421) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174) ... 53 elided ``` because files schema contain duplicate column names (when considering case-insensitive). So we wanted to provide a user schema with non-duplicated fields, but the schema is huge. a few MBs. it is not practical to provide it in json format. So we patched spark-avro to be able to get also `avroSchemaUrl` in addition to `avroSchema` and it worked perfectly. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30739) unable to turn off Hadoop's trash feature
[ https://issues.apache.org/jira/browse/SPARK-30739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17031437#comment-17031437 ] Ohad Raviv commented on SPARK-30739: Closing as I realized this is actually the documented behaviour [here|https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/core-default.xml]. _fs.trash.interval_ _Number of minutes between trash checkpoints. Should be smaller or equal to fs.trash.interval. If zero, the value is set to the value of fs.trash.interval. Every time the checkpointer runs it creates a new checkpoint out of current and removes checkpoints created more than fs.trash.interval minutes ago._ so decided to use the _fs.trash.classname_ approach. > unable to turn off Hadoop's trash feature > - > > Key: SPARK-30739 > URL: https://issues.apache.org/jira/browse/SPARK-30739 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ohad Raviv >Priority: Minor > > We're trying to turn off the `TrashPolicyDefault` in one of our Spark > applications by setting `spark.hadoop.fs.trash.interval=0`, but it just stays > `360` as configured in our cluster's `core-site.xml`. > Trying to debug it we managed to set > `spark.hadoop.fs.trash.classname=OtherTrashPolicy` and it worked. the main > difference seems to be that `spark.hadoop.fs.trash.classname` does not appear > in any of the `*-site.xml` files. > when we print the conf that get initialized in `TrashPolicyDefault` we get: > ``` > Configuration: core-default.xml, core-site.xml, yarn-default.xml, > yarn-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, > hdfs-site.xml, > org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@561f0431, > file:/hadoop03/yarn/local/usercache/.../hive-site.xml > ``` > and: > `fs.trash.interval=360 [programatically]` > `fs.trash.classname=OtherTrashPolicy [programatically]` > > any idea why `fs.trash.classname` works but `fs.trash.interval` doesn't? > this seems maybe related to: -SPARK-9825.- > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-30739) unable to turn off Hadoop's trash feature
[ https://issues.apache.org/jira/browse/SPARK-30739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv resolved SPARK-30739. Resolution: Workaround > unable to turn off Hadoop's trash feature > - > > Key: SPARK-30739 > URL: https://issues.apache.org/jira/browse/SPARK-30739 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ohad Raviv >Priority: Minor > > We're trying to turn off the `TrashPolicyDefault` in one of our Spark > applications by setting `spark.hadoop.fs.trash.interval=0`, but it just stays > `360` as configured in our cluster's `core-site.xml`. > Trying to debug it we managed to set > `spark.hadoop.fs.trash.classname=OtherTrashPolicy` and it worked. the main > difference seems to be that `spark.hadoop.fs.trash.classname` does not appear > in any of the `*-site.xml` files. > when we print the conf that get initialized in `TrashPolicyDefault` we get: > ``` > Configuration: core-default.xml, core-site.xml, yarn-default.xml, > yarn-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, > hdfs-site.xml, > org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@561f0431, > file:/hadoop03/yarn/local/usercache/.../hive-site.xml > ``` > and: > `fs.trash.interval=360 [programatically]` > `fs.trash.classname=OtherTrashPolicy [programatically]` > > any idea why `fs.trash.classname` works but `fs.trash.interval` doesn't? > this seems maybe related to: -SPARK-9825.- > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-30739) unable to turn off Hadoop's trash feature
Ohad Raviv created SPARK-30739: -- Summary: unable to turn off Hadoop's trash feature Key: SPARK-30739 URL: https://issues.apache.org/jira/browse/SPARK-30739 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Ohad Raviv We're trying to turn off the `TrashPolicyDefault` in one of our Spark applications by setting `spark.hadoop.fs.trash.interval=0`, but it just stays `360` as configured in our cluster's `core-site.xml`. Trying to debug it we managed to set `spark.hadoop.fs.trash.classname=OtherTrashPolicy` and it worked. the main difference seems to be that `spark.hadoop.fs.trash.classname` does not appear in any of the `*-site.xml` files. when we print the conf that get initialized in `TrashPolicyDefault` we get: ``` Configuration: core-default.xml, core-site.xml, yarn-default.xml, yarn-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, hdfs-site.xml, org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@561f0431, file:/hadoop03/yarn/local/usercache/.../hive-site.xml ``` and: `fs.trash.interval=360 [programatically]` `fs.trash.classname=OtherTrashPolicy [programatically]` any idea why `fs.trash.classname` works but `fs.trash.interval` doesn't? this seems maybe related to: -SPARK-9825.- -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18748) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16845559#comment-16845559 ] Ohad Raviv commented on SPARK-18748: [~kelemen] - thanks for sharing. > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18748 > URL: https://issues.apache.org/jira/browse/SPARK-18748 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-16820) Sparse - Sparse matrix multiplication
[ https://issues.apache.org/jira/browse/SPARK-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv closed SPARK-16820. -- resolved. > Sparse - Sparse matrix multiplication > - > > Key: SPARK-16820 > URL: https://issues.apache.org/jira/browse/SPARK-16820 > Project: Spark > Issue Type: New Feature > Components: ML >Affects Versions: 2.0.0 >Reporter: Ohad Raviv >Priority: Major > Labels: bulk-closed > > While working on MCL implementation on Spark we have encountered some > difficulties. > The main part of this process is distributed sparse matrix multiplication > that has two main steps: > 1.Simulate multiply – preparation before the real multiplication in order > to see which blocks should be multiplied. > 2.The actual blocks multiplication and summation. > In our case the sparse matrix has 50M rows and columns, and 2B non-zeros. > The current multiplication suffers from these issues: > 1.A relatively trivial bug already fixed in the first step the caused the > process to be very slow [SPARK-16469] > 2.Still after the bug fix, if we have too many blocks the Simulate > multiply will take very long time and will multiply the data many times. > (O(n^3) where n is the number of blocks) > 3.Spark supports only multiplication with Dense matrices. Thus, it > converts a Sparse matrix into a dense matrix before the multiplication. > 4.For summing the intermediate block results Spark uses Breeze’s CSC > matrix operations – here the problem is that it is very inefficient to update > a CSC matrix in a zero value. > That means that with many blocks (default block size is 1024) – in our case > 50M/1024 ~= 50K, the simulate multiply will effectively never finish or will > generate 50K*16GB ~= 1000TB of data. On the other hand, if we use bigger > block size e.g. 100k we get OutOfMemoryException in the “toDense” method of > the multiply. We have worked around that by implementing our-selves both the > Sparse multiplication and addition in a very naïve way – but at least it > works. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16820) Sparse - Sparse matrix multiplication
[ https://issues.apache.org/jira/browse/SPARK-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16845557#comment-16845557 ] Ohad Raviv commented on SPARK-16820: this issue was resolved by SPARK-19368 and SPARK-16469. > Sparse - Sparse matrix multiplication > - > > Key: SPARK-16820 > URL: https://issues.apache.org/jira/browse/SPARK-16820 > Project: Spark > Issue Type: New Feature > Components: ML >Affects Versions: 2.0.0 >Reporter: Ohad Raviv >Priority: Major > Labels: bulk-closed > > While working on MCL implementation on Spark we have encountered some > difficulties. > The main part of this process is distributed sparse matrix multiplication > that has two main steps: > 1.Simulate multiply – preparation before the real multiplication in order > to see which blocks should be multiplied. > 2.The actual blocks multiplication and summation. > In our case the sparse matrix has 50M rows and columns, and 2B non-zeros. > The current multiplication suffers from these issues: > 1.A relatively trivial bug already fixed in the first step the caused the > process to be very slow [SPARK-16469] > 2.Still after the bug fix, if we have too many blocks the Simulate > multiply will take very long time and will multiply the data many times. > (O(n^3) where n is the number of blocks) > 3.Spark supports only multiplication with Dense matrices. Thus, it > converts a Sparse matrix into a dense matrix before the multiplication. > 4.For summing the intermediate block results Spark uses Breeze’s CSC > matrix operations – here the problem is that it is very inefficient to update > a CSC matrix in a zero value. > That means that with many blocks (default block size is 1024) – in our case > 50M/1024 ~= 50K, the simulate multiply will effectively never finish or will > generate 50K*16GB ~= 1000TB of data. On the other hand, if we use bigger > block size e.g. 100k we get OutOfMemoryException in the “toDense” method of > the multiply. We have worked around that by implementing our-selves both the > Sparse multiplication and addition in a very naïve way – but at least it > works. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27707) Performance issue using explode
[ https://issues.apache.org/jira/browse/SPARK-27707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839544#comment-16839544 ] Ohad Raviv commented on SPARK-27707: [~cloud_fan] - any chance you can take a look? > Performance issue using explode > --- > > Key: SPARK-27707 > URL: https://issues.apache.org/jira/browse/SPARK-27707 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0, 2.4.3 >Reporter: Ohad Raviv >Priority: Major > > this is a corner case of SPARK-21657. > we have a case where we want to explode array inside a struct and also keep > some other columns of the struct. we again encounter a huge performance issue. > reconstruction code: > {code} > val df = spark.sparkContext.parallelize(Seq(("1", > Array.fill(M)({ > val i = math.random > (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString) > }.toDF("col", "arr") > .selectExpr("col", "struct(col, arr) as st") > .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col") > df.write.mode("overwrite").save("/tmp/blah") > {code} > a workaround is projecting before the explode: > {code} > val df = spark.sparkContext.parallelize(Seq(("1", > Array.fill(M)({ > val i = math.random > (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString) > }.toDF("col", "arr") > .selectExpr("col", "struct(col, arr) as st") > .withColumn("col1", $"st.col") > .selectExpr("col", "col1", "explode(st.arr) as arr_col") > df.write.mode("overwrite").save("/tmp/blah") > {code} > in this case the optimization done in SPARK-21657: > {code} > // prune unrequired references > case p @ Project(_, g: Generate) if p.references != g.outputSet => > val requiredAttrs = p.references -- g.producedAttributes ++ > g.generator.references > val newChild = prunedChild(g.child, requiredAttrs) > val unrequired = g.generator.references -- p.references > val unrequiredIndices = newChild.output.zipWithIndex.filter(t => > unrequired.contains(t._1)) > .map(_._2) > p.copy(child = g.copy(child = newChild, unrequiredChildIndex = > unrequiredIndices)) > {code} > doesn't work because `p.references` has whole the `st` struct as reference > and not just the projected field. > this causes the entire struct including the huge array field to get > duplicated as the number of array elements. > I know this is kind of a corner case but was really non trivial to > understand.. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27707) Performance issue using explode
Ohad Raviv created SPARK-27707: -- Summary: Performance issue using explode Key: SPARK-27707 URL: https://issues.apache.org/jira/browse/SPARK-27707 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.3, 3.0.0 Reporter: Ohad Raviv this is a corner case of SPARK-21657. we have a case where we want to explode array inside a struct and also keep some other columns of the struct. we again encounter a huge performance issue. reconstruction code: {code} val df = spark.sparkContext.parallelize(Seq(("1", Array.fill(M)({ val i = math.random (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString) }.toDF("col", "arr") .selectExpr("col", "struct(col, arr) as st") .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col") df.write.mode("overwrite").save("/tmp/blah") {code} a workaround is projecting before the explode: {code} val df = spark.sparkContext.parallelize(Seq(("1", Array.fill(M)({ val i = math.random (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString) }.toDF("col", "arr") .selectExpr("col", "struct(col, arr) as st") .withColumn("col1", $"st.col") .selectExpr("col", "col1", "explode(st.arr) as arr_col") df.write.mode("overwrite").save("/tmp/blah") {code} in this case the optimization done in SPARK-21657: {code} // prune unrequired references case p @ Project(_, g: Generate) if p.references != g.outputSet => val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references val newChild = prunedChild(g.child, requiredAttrs) val unrequired = g.generator.references -- p.references val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1)) .map(_._2) p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices)) {code} doesn't work because `p.references` has whole the `st` struct as reference and not just the projected field. this causes the entire struct including the huge array field to get duplicated as the number of array elements. I know this is kind of a corner case but was really non trivial to understand.. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18748) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799163#comment-16799163 ] Ohad Raviv commented on SPARK-18748: [~nimfadora] - thanks, we actually also ended up using this workaround. However, that's really not a good long term solution. We also tried to disable wholestage code generation and that also works. > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18748 > URL: https://issues.apache.org/jira/browse/SPARK-18748 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26645) CSV infer schema bug infers decimal(9,-1)
Ohad Raviv created SPARK-26645: -- Summary: CSV infer schema bug infers decimal(9,-1) Key: SPARK-26645 URL: https://issues.apache.org/jira/browse/SPARK-26645 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Ohad Raviv we have a file /tmp/t1/file.txt that contains only one line "1.18927098E9". running: {code:python} df = spark.read.csv('/tmp/t1', header=False, inferSchema=True, sep='\t') print df.dtypes {code} causes: {noformat} ValueError: Could not parse datatype: decimal(9,-1) {noformat} I'm not sure where the bug is - inferSchema or dtypes? I saw it is legal to have a decimal with negative scale in the code (CSVInferSchema.scala): {code:python} if (bigDecimal.scale <= 0) { // `DecimalType` conversion can fail when // 1. The precision is bigger than 38. // 2. scale is bigger than precision. DecimalType(bigDecimal.precision, bigDecimal.scale) } {code} but what does it mean? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18748) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16735492#comment-16735492 ] Ohad Raviv commented on SPARK-18748: We're encountering this same problem once again with Spark structrued streaming. the typical way to read and parse is something like: {code:java} spark.read .format("kafka") .option("kafka.bootstrap.servers", brokerAddress) .option("subscribe", topic) .load() .select(parsingUDF(col("value")).as("parsed_struct")) .selectExpr("parsed_struct.*") {code} and the ".*" expansion causes the udf to run as many times as the number of columns in the struct. we typicallly have dosens of columns meaning dosens of parses per incoming message. here we can't use any of the bypass solutions mentioned above and in SPARK-17728 as ".cache" and ".rdd" are unusupported operations on structured streaming dataframe. [~cloud_fan],[~hvanhovell] - maybe you have an idea for a workaround also in the case of streaming? > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18748 > URL: https://issues.apache.org/jira/browse/SPARK-18748 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18748) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv updated SPARK-18748: --- Affects Version/s: (was: 1.6.1) 2.3.0 2.4.0 > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18748 > URL: https://issues.apache.org/jira/browse/SPARK-18748 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26070) another implicit type coercion bug
Ohad Raviv created SPARK-26070: -- Summary: another implicit type coercion bug Key: SPARK-26070 URL: https://issues.apache.org/jira/browse/SPARK-26070 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0, 2.3.0 Reporter: Ohad Raviv looks like SPARK-22469 introduced a major bug to our system: {code} spark.sql("select '' = BD").show() spark.sql("select '2224' = 2223BD").show() {code} which results: {noformat} +-+ |(CAST( AS DOUBLE) = CAST( AS DOUBLE))| +-+ | true| +-+ +-+ |(CAST(2224 AS DOUBLE) = CAST(2223 AS DOUBLE))| +-+ | true| +-+ {noformat} this causes downstream transformations to join together unrelated rows just because their ids are close. [~cloud_fan],[~liutang123] - could you please explain futher this remark: {noformat} // There is no proper decimal type we can pick, // using double type is the best we can do. // See SPARK-22469 for details. {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25963) Optimize generate followed by window
Ohad Raviv created SPARK-25963: -- Summary: Optimize generate followed by window Key: SPARK-25963 URL: https://issues.apache.org/jira/browse/SPARK-25963 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0, 2.4.0 Reporter: Ohad Raviv We've noticed that for our use-cases when we have explode followed by a window function we can almost always optimize it by adding repartition by the windows' partition before the explode. for example: {code:java} import org.apache.spark.sql.functions._ val N = 1 << 12 spark.sql("set spark.sql.autoBroadcastJoinThreshold=0") val tokens = spark.range(N).selectExpr( "floor(id/4) as key", "'asd/asd/asd/asd/asd/asd' as url") // .repartition("cust_id") .selectExpr("*", "explode(split(url, '/')) as token") import org.apache.spark.sql.expressions._ val w = Window.partitionBy("key", "token") val res = tokens.withColumn("cnt", count("token").over(w)) res.explain(true) {code} {noformat} == Optimized Logical Plan == Window [count(token#11) windowspecdefinition(key#6L, token#11, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS cnt#17L], [key#6L, token#11] +- Generate explode([asd,asd,asd,asd,asd,asd]), false, [token#11] +- Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L, asd/asd/asd/asd/asd/asd AS url#7] +- Range (0, 4096, step=1, splits=Some(1)) {noformat} currently all the data will be exploded in the first stage, then shuffled and then aggregated. we can achieve exactly the same computation if we first shuffle the data and in the second stage explode and aggregate. I have a PR that tries to resolve this. I'm just not sure I thought about all the cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25951) Redundant shuffle if column is renamed
[ https://issues.apache.org/jira/browse/SPARK-25951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv updated SPARK-25951: --- Description: we've noticed that sometimes a column rename causes extra shuffle: {code:java} val N = 1 << 12 spark.sql("set spark.sql.autoBroadcastJoinThreshold=0") val t1 = spark.range(N).selectExpr("floor(id/4) as key1") val t2 = spark.range(N).selectExpr("floor(id/4) as key2") import org.apache.spark.sql.functions._ t1.groupBy("key1").agg(count(lit("1")).as("cnt1")) .join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2", "key3"), col("key1")===col("key3")) .explain() {code} results in: {noformat} == Physical Plan == *(6) SortMergeJoin [key1#6L], [key3#22L], Inner :- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0 : +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], output=[key1#6L, cnt1#14L]) : +- Exchange hashpartitioning(key1#6L, 2) :+- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], output=[key1#6L, count#39L]) : +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L] : +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0))) : +- *(1) Range (0, 4096, step=1, splits=1) +- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key3#22L, 2) +- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], output=[key3#22L, cnt2#19L]) +- Exchange hashpartitioning(key2#10L, 2) +- *(3) HashAggregate(keys=[key2#10L], functions=[partial_count(1)], output=[key2#10L, count#41L]) +- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS key2#10L] +- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 4.0))) +- *(3) Range (0, 4096, step=1, splits=1) {noformat} I was able to track it down to this code in class HashPartitioning: {code:java} case h: HashClusteredDistribution => expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { case (l, r) => l.semanticEquals(r) } {code} the semanticEquals returns false as it compares key2 and key3 eventhough key3 is just a rename of key2 was: we've noticed that sometimes a column rename causes extra shuffle: {code} val N = 1 << 12 spark.sql("set spark.sql.autoBroadcastJoinThreshold=0") val t1 = spark.range(N).selectExpr("floor(id/4) as key1") val t2 = spark.range(N).selectExpr("floor(id/4) as key2") t1.groupBy("key1").agg(count(lit("1")).as("cnt1")) .join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2", "key3"), col("key1")===col("key3")) .explain() {code} results in: {noformat} == Physical Plan == *(6) SortMergeJoin [key1#6L], [key3#22L], Inner :- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0 : +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], output=[key1#6L, cnt1#14L]) : +- Exchange hashpartitioning(key1#6L, 2) :+- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], output=[key1#6L, count#39L]) : +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L] : +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0))) : +- *(1) Range (0, 4096, step=1, splits=1) +- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key3#22L, 2) +- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], output=[key3#22L, cnt2#19L]) +- Exchange hashpartitioning(key2#10L, 2) +- *(3) HashAggregate(keys=[key2#10L], functions=[partial_count(1)], output=[key2#10L, count#41L]) +- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS key2#10L] +- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 4.0))) +- *(3) Range (0, 4096, step=1, splits=1) {noformat} I was able to track it down to this code in class HashPartitioning: {code} case h: HashClusteredDistribution => expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { case (l, r) => l.semanticEquals(r) } {code} the semanticEquals returns false as it compares key2 and key3 eventhough key3 is just a rename of key2 > Redundant shuffle if column is renamed > -- > > Key: SPARK-25951 > URL: https://issues.apache.org/jira/browse/SPARK-25951 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ohad Raviv >Priority: Minor > > we've noticed that sometimes a column rename causes extra shuffle: > {code:java} > val N = 1 << 12 > spark.sql("set spark.sql.autoBroadcastJoinThreshold=0") > val t1 = spark.range(N).selectExpr("floor(id/4) as key1") > val t2 = spark.range(N).selectExpr("floor(id/4) as key2") > import org.apache.spark.sql.functions._ >
[jira] [Updated] (SPARK-25951) Redundant shuffle if column is renamed
[ https://issues.apache.org/jira/browse/SPARK-25951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv updated SPARK-25951: --- Description: we've noticed that sometimes a column rename causes extra shuffle: {code} val N = 1 << 12 spark.sql("set spark.sql.autoBroadcastJoinThreshold=0") val t1 = spark.range(N).selectExpr("floor(id/4) as key1") val t2 = spark.range(N).selectExpr("floor(id/4) as key2") t1.groupBy("key1").agg(count(lit("1")).as("cnt1")) .join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2", "key3"), col("key1")===col("key3")) .explain() {code} results in: {noformat} == Physical Plan == *(6) SortMergeJoin [key1#6L], [key3#22L], Inner :- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0 : +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], output=[key1#6L, cnt1#14L]) : +- Exchange hashpartitioning(key1#6L, 2) :+- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], output=[key1#6L, count#39L]) : +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L] : +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0))) : +- *(1) Range (0, 4096, step=1, splits=1) +- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key3#22L, 2) +- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], output=[key3#22L, cnt2#19L]) +- Exchange hashpartitioning(key2#10L, 2) +- *(3) HashAggregate(keys=[key2#10L], functions=[partial_count(1)], output=[key2#10L, count#41L]) +- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS key2#10L] +- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 4.0))) +- *(3) Range (0, 4096, step=1, splits=1) {noformat} I was able to track it down to this code in class HashPartitioning: {code} case h: HashClusteredDistribution => expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { case (l, r) => l.semanticEquals(r) } {code} the semanticEquals returns false as it compares key2 and key3 eventhough key3 is just a rename of key2 was: we've noticed that sometimes a column rename causes extra shuffle: {code} val N = 1 << 12 spark.sql("set spark.sql.autoBroadcastJoinThreshold=0") val t1 = spark.range(N).selectExpr("floor(id/4) as key1") val t2 = spark.range(N).selectExpr("floor(id/4) as key2") t1.groupBy("key1").agg(count(lit("1")).as("cnt1")) .join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2", "key3"), col("key1")===col("key3")) .explain(true) {code} results in: {code} == Physical Plan == *(6) SortMergeJoin [key1#6L], [key3#22L], Inner :- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0 : +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], output=[key1#6L, cnt1#14L]) : +- Exchange hashpartitioning(key1#6L, 2) : +- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], output=[key1#6L, count#39L]) : +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L] : +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0))) : +- *(1) Range (0, 4096, step=1, splits=1) +- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key3#22L, 2) +- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], output=[key3#22L, cnt2#19L]) +- Exchange hashpartitioning(key2#10L, 2) +- *(3) HashAggregate(keys=[key2#10L], functions=[partial_count(1)], output=[key2#10L, count#41L]) +- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS key2#10L] +- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 4.0))) +- *(3) Range (0, 4096, step=1, splits=1) {code} I was able to track it down to this code in class HashPartitioning: {code} case h: HashClusteredDistribution => expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { case (l, r) => l.semanticEquals(r) } {code} the semanticEquals returns false as it compares key2 and key3 eventhough key3 is just a rename of key2 > Redundant shuffle if column is renamed > -- > > Key: SPARK-25951 > URL: https://issues.apache.org/jira/browse/SPARK-25951 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ohad Raviv >Priority: Minor > > we've noticed that sometimes a column rename causes extra shuffle: > {code} > val N = 1 << 12 > spark.sql("set spark.sql.autoBroadcastJoinThreshold=0") > val t1 = spark.range(N).selectExpr("floor(id/4) as key1") > val t2 = spark.range(N).selectExpr("floor(id/4) as key2") > t1.groupBy("key1").agg(count(lit("1")).as("cnt1")) > .join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2", > "key3"), > col("key1")===col("key3")) > .explain() > {code} > results in: > {noformat} > == Phy
[jira] [Created] (SPARK-25951) Redundant shuffle if column is renamed
Ohad Raviv created SPARK-25951: -- Summary: Redundant shuffle if column is renamed Key: SPARK-25951 URL: https://issues.apache.org/jira/browse/SPARK-25951 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Ohad Raviv we've noticed that sometimes a column rename causes extra shuffle: {code} val N = 1 << 12 spark.sql("set spark.sql.autoBroadcastJoinThreshold=0") val t1 = spark.range(N).selectExpr("floor(id/4) as key1") val t2 = spark.range(N).selectExpr("floor(id/4) as key2") t1.groupBy("key1").agg(count(lit("1")).as("cnt1")) .join(t2.groupBy("key2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("key2", "key3"), col("key1")===col("key3")) .explain(true) {code} results in: {code} == Physical Plan == *(6) SortMergeJoin [key1#6L], [key3#22L], Inner :- *(2) Sort [key1#6L ASC NULLS FIRST], false, 0 : +- *(2) HashAggregate(keys=[key1#6L], functions=[count(1)], output=[key1#6L, cnt1#14L]) : +- Exchange hashpartitioning(key1#6L, 2) : +- *(1) HashAggregate(keys=[key1#6L], functions=[partial_count(1)], output=[key1#6L, count#39L]) : +- *(1) Project [FLOOR((cast(id#4L as double) / 4.0)) AS key1#6L] : +- *(1) Filter isnotnull(FLOOR((cast(id#4L as double) / 4.0))) : +- *(1) Range (0, 4096, step=1, splits=1) +- *(5) Sort [key3#22L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key3#22L, 2) +- *(4) HashAggregate(keys=[key2#10L], functions=[count(1)], output=[key3#22L, cnt2#19L]) +- Exchange hashpartitioning(key2#10L, 2) +- *(3) HashAggregate(keys=[key2#10L], functions=[partial_count(1)], output=[key2#10L, count#41L]) +- *(3) Project [FLOOR((cast(id#8L as double) / 4.0)) AS key2#10L] +- *(3) Filter isnotnull(FLOOR((cast(id#8L as double) / 4.0))) +- *(3) Range (0, 4096, step=1, splits=1) {code} I was able to track it down to this code in class HashPartitioning: {code} case h: HashClusteredDistribution => expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { case (l, r) => l.semanticEquals(r) } {code} the semanticEquals returns false as it compares key2 and key3 eventhough key3 is just a rename of key2 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625448#comment-16625448 ] Ohad Raviv edited comment on SPARK-23985 at 9/24/18 7:15 AM: - {quote}You should move where("a>'1'") before withColumn: {quote} this is exactly the issue I've opened. the Optimizer should understand this on its own. I understand my original mistake in the example, and have changed it. try now. was (Author: uzadude): {quote}You should move where("a>'1'") before withColumn:{quote} this is exactly the issue I've opened. the Optimizer should understand this on its own. > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select * from ( >select *, row_number() over (partition by a order by b) from t1 > )z > where a>1 > {code} > it dowsn't work with: > {code:sql} > select * from ( >select *, row_number() over (partition by concat(a,'lit') order by b) from > t1 > )z > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv updated SPARK-23985: --- Description: while predicate push down works with this query: {code:sql} select * from ( select *, row_number() over (partition by a order by b) from t1 )z where a>1 {code} it dowsn't work with: {code:sql} select * from ( select *, row_number() over (partition by concat(a,'lit') order by b) from t1 )z where a>1 {code} I added a test to FilterPushdownSuite which I think recreates the problem: {code} test("Window: predicate push down -- ohad") { val winExpr = windowExpr(count('b), windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1) val correctAnswer = testRelation .where('a > 1).select('a, 'b, 'c) .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) .select('a, 'b, 'c, 'window).analyze comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } {code} will try to create a PR with a correction was: while predicate push down works with this query: {code:sql} select *, row_number() over (partition by a order by b) from t1 where a>1 {code} it dowsn't work with: {code:sql} select *, row_number() over (partition by concat(a,'lit') order by b) from t1 where a>1 {code} I added a test to FilterPushdownSuite which I think recreates the problem: {code:scala} test("Window: predicate push down -- ohad") { val winExpr = windowExpr(count('b), windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1) val correctAnswer = testRelation .where('a > 1).select('a, 'b, 'c) .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) .select('a, 'b, 'c, 'window).analyze comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } {code} will try to create a PR with a correction > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select * from ( >select *, row_number() over (partition by a order by b) from t1 > )z > where a>1 > {code} > it dowsn't work with: > {code:sql} > select * from ( >select *, row_number() over (partition by concat(a,'lit') order by b) from > t1 > )z > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625448#comment-16625448 ] Ohad Raviv commented on SPARK-23985: {quote}You should move where("a>'1'") before withColumn:{quote} this is exactly the issue I've opened. the Optimizer should understand this on its own. > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select *, row_number() over (partition by a order by b) from t1 where a>1 > {code} > it dowsn't work with: > {code:sql} > select *, row_number() over (partition by concat(a,'lit') order by b) from t1 > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code:scala} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625436#comment-16625436 ] Ohad Raviv edited comment on SPARK-23985 at 9/24/18 7:07 AM: - the same is true for Spark 2.4: {code} sparkSession.range(10).selectExpr("cast(id as string) as a", "id as b", "id").write.saveAsTable("t1") val w = sparkSession.sql( "select *, row_number() over (partition by concat(a,'lit') order by b) from t1 where a>'1'") w.explain val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b") sparkSession.table("t1").withColumn("d", row_number() over windowSpec) .where("a>'1'") .explain {code} plans: {noformat} == Physical Plan == *(3) Project [a#11, b#12L, id#13L, row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22] +- Window [row_number() windowspecdefinition(_w0#23, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22], [_w0#23], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#23 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#23, 1) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#23] +- *(1) Filter (isnotnull(a#11) && (a#11 > 1)) +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct == Physical Plan == *(3) Project [a#11, b#12L, id#13L, d#28] +- *(3) Filter (isnotnull(a#11) && (a#11 > 1)) +- Window [row_number() windowspecdefinition(_w0#29, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#28], [_w0#29], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#29 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#29, 1) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#29] +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct {noformat} was (Author: uzadude): the same is true for Spark 2.4: {code} sparkSession.range(10).selectExpr("cast(id as string) as a", "id as b", "id").write.saveAsTable("t1") val w = sparkSession.sql( "select *, row_number() over (partition by concat(a,'lit') order by b) from t1 where a>'1'") w.explain val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b") sparkSession.table("t1").withColumn("d", row_number() over windowSpec) .where("a>'1'") .explain {code} plans: {code} == Physical Plan == *(3) Project [a#11, b#12L, id#13L, row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22] +- Window [row_number() windowspecdefinition(_w0#23, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22], [_w0#23], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#23 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#23, 1) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#23] +- *(1) Filter (isnotnull(a#11) && (a#11 > 1)) +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct == Physical Plan == *(3) Project [a#11, b#12L, id#13L, d#28] +- *(3) Filter (isnotnull(a#11) && (a#11 > 1)) +- Window [row_number() windowspecdefinition(_w0#29, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#28], [_w0#29], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#29 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#29, 1) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#29] +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct {code} > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 >
[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625436#comment-16625436 ] Ohad Raviv commented on SPARK-23985: the same is true for Spark 2.4: {code} sparkSession.range(10).selectExpr("cast(id as string) as a", "id as b", "id").write.saveAsTable("t1") val w = sparkSession.sql( "select *, row_number() over (partition by concat(a,'lit') order by b) from t1 where a>'1'") w.explain val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b") sparkSession.table("t1").withColumn("d", row_number() over windowSpec) .where("a>'1'") .explain {code} plans: {code} == Physical Plan == *(3) Project [a#11, b#12L, id#13L, row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22] +- Window [row_number() windowspecdefinition(_w0#23, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (PARTITION BY concat(a, lit) ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#22], [_w0#23], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#23 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#23, 1) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#23] +- *(1) Filter (isnotnull(a#11) && (a#11 > 1)) +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct == Physical Plan == *(3) Project [a#11, b#12L, id#13L, d#28] +- *(3) Filter (isnotnull(a#11) && (a#11 > 1)) +- Window [row_number() windowspecdefinition(_w0#29, b#12L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#28], [_w0#29], [b#12L ASC NULLS FIRST] +- *(2) Sort [_w0#29 ASC NULLS FIRST, b#12L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#29, 1) +- *(1) Project [a#11, b#12L, id#13L, concat(a#11, lit) AS _w0#29] +- *(1) FileScan parquet default.t1[a#11,b#12L,id#13L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:../catalyst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct {code} > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select *, row_number() over (partition by a order by b) from t1 where a>1 > {code} > it dowsn't work with: > {code:sql} > select *, row_number() over (partition by concat(a,'lit') order by b) from t1 > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code:scala} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625422#comment-16625422 ] Ohad Raviv commented on SPARK-23985: you're right. that's very strange. looks like something got lost in translation. when I'm running you're example (which is actually mine..) indeed I get the right plan. However, if I try my original code it is still the un-optimized plan (with Spark 2.3): {code} import org.apache.spark.sql.functions._ spark.range(10).selectExpr( "cast(id as string) a", "id as b").write.saveAsTable("t1") val windowSpec = Window.partitionBy(concat(col("a"), lit("lit"))).orderBy("b") spark.table("t1").withColumn("d", row_number() over windowSpec) .where("a>'1'") .explain {code} {code} == Physical Plan == *(3) Project [a#8, b#9L, d#13] +- *(3) Filter (isnotnull(a#8) && (a#8 > 1)) +- Window [row_number() windowspecdefinition(_w0#14, b#9L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#13], [_w0#14], [b#9L ASC NULLS FIRST] +- *(2) Sort [_w0#14 ASC NULLS FIRST, b#9L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_w0#14, 2) +- *(1) Project [a#8, b#9L, concat(a#8, lit) AS _w0#14] +- *(1) FileScan parquet unitest.t1[a#8,b#9L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[../t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct {code} can you understand the diff? > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select *, row_number() over (partition by a order by b) from t1 where a>1 > {code} > it dowsn't work with: > {code:sql} > select *, row_number() over (partition by concat(a,'lit') order by b) from t1 > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code:scala} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table
[ https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv updated SPARK-24528: --- Description: https://issues.apache.org/jira/browse/SPARK-24528#Closely related to SPARK-24410, we're trying to optimize a very common use case we have of getting the most updated row by id from a fact table. We're saving the table bucketed to skip the shuffle stage, but we're still "waste" time on the Sort operator evethough the data is already sorted. here's a good example: {code:java} sparkSession.range(N).selectExpr( "id as key", "id % 2 as t1", "id % 3 as t2") .repartition(col("key")) .write .mode(SaveMode.Overwrite) .bucketBy(3, "key") .sortBy("key", "t1") .saveAsTable("a1"){code} {code:java} sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain == Physical Plan == SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, Format: Parquet, Location: ...{code} and here's a bad example, but more realistic: {code:java} sparkSession.sql("set spark.sql.shuffle.partitions=2") sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain == Physical Plan == SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0 +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, Format: Parquet, Location: ... {code} I've traced the problem to DataSourceScanExec#235: {code:java} val sortOrder = if (sortColumns.nonEmpty) { // In case of bucketing, its possible to have multiple files belonging to the // same bucket in a given relation. Each of these files are locally sorted // but those files combined together are not globally sorted. Given that, // the RDD partition will not be sorted even if the relation has sort columns set // Current solution is to check if all the buckets have a single file in it val files = selectedPartitions.flatMap(partition => partition.files) val bucketToFilesGrouping = files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1){code} so obviously the code avoids dealing with this situation now.. could you think of a way to solve this or bypass it? was: Closely related to SPARK-24410, we're trying to optimize a very common use case we have of getting the most updated row by id from a fact table. We're saving the table bucketed to skip the shuffle stage, but we're still "waste" time on the Sort operator evethough the data is already sorted. here's a good example: {code:java} sparkSession.range(N).selectExpr( "id as key", "id % 2 as t1", "id % 3 as t2") .repartition(col("key")) .write .mode(SaveMode.Overwrite) .bucketBy(3, "key") .sortBy("key", "t1") .saveAsTable("a1"){code} {code:java} sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain == Physical Plan == SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, Format: Parquet, Location: ...{code} and here's a bad example, but more realistic: {code:java} sparkSession.sql("set spark.sql.shuffle.partitions=2") sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain == Physical Plan == SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0 +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, Format: Parquet, Location: ... {code} I've traced the problem to DataSourceScanExec#235: {code:java} val sortOrder = if (sortColumns.nonEmpty) { // In case of bucketing, its possible to have multiple files belonging to the // same bucket in a given relation. Each of these files are locally sorted // but those files combined together are not globally sorted. Given that, // the RDD partition will not be sorted even if the relation has sort columns set // Current solution is to check if all the buckets have a single file in it val files = selectedPartitions.flatMap(partition => partition.files) val
[jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table
[ https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1659#comment-1659 ] Ohad Raviv commented on SPARK-24528: After digging a little bit in the code and Jira I understand that this is just a special case of SPARK-2926, just that the performance boost is greater. over there they deal with moving the sort work from reducers to mappers and show reducers performance boost of ~10x and an overall performance boost of ~2x (I'm not sure why it has never got merged). In our case because the data is already sorted in the buckets we should expect this great 10x boost! because most of the needed code is already in there I guess it will be wise to migrate it (altough it contains some more fancy things like Tiered Merger that I'm not sure we need). > Missing optimization for Aggregations/Windowing on a bucketed table > --- > > Key: SPARK-24528 > URL: https://issues.apache.org/jira/browse/SPARK-24528 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > Closely related to SPARK-24410, we're trying to optimize a very common use > case we have of getting the most updated row by id from a fact table. > We're saving the table bucketed to skip the shuffle stage, but we're still > "waste" time on the Sort operator evethough the data is already sorted. > here's a good example: > {code:java} > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write > .mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("key", "t1") > .saveAsTable("a1"){code} > {code:java} > sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain > == Physical Plan == > SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, > key#24L, t1, t1#25L, t2, t2#26L))]) > +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, > t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) > +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, > Format: Parquet, Location: ...{code} > > and here's a bad example, but more realistic: > {code:java} > sparkSession.sql("set spark.sql.shuffle.partitions=2") > sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain > == Physical Plan == > SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, > key#32L, t1, t1#33L, t2, t2#34L))]) > +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, > t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) > +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0 > +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, > Format: Parquet, Location: ... > {code} > > I've traced the problem to DataSourceScanExec#235: > {code:java} > val sortOrder = if (sortColumns.nonEmpty) { > // In case of bucketing, its possible to have multiple files belonging to > the > // same bucket in a given relation. Each of these files are locally sorted > // but those files combined together are not globally sorted. Given that, > // the RDD partition will not be sorted even if the relation has sort > columns set > // Current solution is to check if all the buckets have a single file in it > val files = selectedPartitions.flatMap(partition => partition.files) > val bucketToFilesGrouping = > files.map(_.getPath.getName).groupBy(file => > BucketingUtils.getBucketId(file)) > val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= > 1){code} > so obviously the code avoids dealing with this situation now.. > could you think of a way to solve this or bypass it? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table
[ https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523344#comment-16523344 ] Ohad Raviv commented on SPARK-24528: Hi, well it took me some time to get to it, but here are my design conclusions: # currently all the file scans are done with FileScanRDD. in its current implementation it gets a list of files in each partition and iterates the one after the other. # that means we probably need another FileScanRDD that can "open" all the files and iterate them in a merge sort manner (like maintaing a heap to know what's the next file to iterate from). # the FileScanRDD is created in FileSourceScanExec.createBucketedReadRDD if the data is bucketed. # FileSourceScanExec is created in FileSourceStrategy. # that means we could understand if the data read output is required to be sorted in FileSourceStrategy and percolate this knowledge to the creation of the new FileScan(Sorted?)RDD. # thing to note here is to enable this sorted reading only if it's required otherwise it will cause performance issue. please tell me WDYT. > Missing optimization for Aggregations/Windowing on a bucketed table > --- > > Key: SPARK-24528 > URL: https://issues.apache.org/jira/browse/SPARK-24528 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > Closely related to SPARK-24410, we're trying to optimize a very common use > case we have of getting the most updated row by id from a fact table. > We're saving the table bucketed to skip the shuffle stage, but we're still > "waste" time on the Sort operator evethough the data is already sorted. > here's a good example: > {code:java} > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write > .mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("key", "t1") > .saveAsTable("a1"){code} > {code:java} > sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain > == Physical Plan == > SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, > key#24L, t1, t1#25L, t2, t2#26L))]) > +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, > t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) > +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, > Format: Parquet, Location: ...{code} > > and here's a bad example, but more realistic: > {code:java} > sparkSession.sql("set spark.sql.shuffle.partitions=2") > sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain > == Physical Plan == > SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, > key#32L, t1, t1#33L, t2, t2#34L))]) > +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, > t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) > +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0 > +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, > Format: Parquet, Location: ... > {code} > > I've traced the problem to DataSourceScanExec#235: > {code:java} > val sortOrder = if (sortColumns.nonEmpty) { > // In case of bucketing, its possible to have multiple files belonging to > the > // same bucket in a given relation. Each of these files are locally sorted > // but those files combined together are not globally sorted. Given that, > // the RDD partition will not be sorted even if the relation has sort > columns set > // Current solution is to check if all the buckets have a single file in it > val files = selectedPartitions.flatMap(partition => partition.files) > val bucketToFilesGrouping = > files.map(_.getPath.getName).groupBy(file => > BucketingUtils.getBucketId(file)) > val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= > 1){code} > so obviously the code avoids dealing with this situation now.. > could you think of a way to solve this or bypass it? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table
[ https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511578#comment-16511578 ] Ohad Raviv commented on SPARK-24528: I think the 2nd point better suits my usecase. i'll try to look into it. thanks. > Missing optimization for Aggregations/Windowing on a bucketed table > --- > > Key: SPARK-24528 > URL: https://issues.apache.org/jira/browse/SPARK-24528 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > Closely related to SPARK-24410, we're trying to optimize a very common use > case we have of getting the most updated row by id from a fact table. > We're saving the table bucketed to skip the shuffle stage, but we're still > "waste" time on the Sort operator evethough the data is already sorted. > here's a good example: > {code:java} > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write > .mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("key", "t1") > .saveAsTable("a1"){code} > {code:java} > sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain > == Physical Plan == > SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, > key#24L, t1, t1#25L, t2, t2#26L))]) > +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, > t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) > +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, > Format: Parquet, Location: ...{code} > > and here's a bad example, but more realistic: > {code:java} > sparkSession.sql("set spark.sql.shuffle.partitions=2") > sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain > == Physical Plan == > SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, > key#32L, t1, t1#33L, t2, t2#34L))]) > +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, > t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) > +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0 > +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, > Format: Parquet, Location: ... > {code} > > I've traced the problem to DataSourceScanExec#235: > {code:java} > val sortOrder = if (sortColumns.nonEmpty) { > // In case of bucketing, its possible to have multiple files belonging to > the > // same bucket in a given relation. Each of these files are locally sorted > // but those files combined together are not globally sorted. Given that, > // the RDD partition will not be sorted even if the relation has sort > columns set > // Current solution is to check if all the buckets have a single file in it > val files = selectedPartitions.flatMap(partition => partition.files) > val bucketToFilesGrouping = > files.map(_.getPath.getName).groupBy(file => > BucketingUtils.getBucketId(file)) > val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= > 1){code} > so obviously the code avoids dealing with this situation now.. > could you think of a way to solve this or bypass it? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table
[ https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511483#comment-16511483 ] Ohad Raviv commented on SPARK-24528: I understand the tradeoff, the question is how could we leverage the local file sorting. I'm sure the extra sort adds some significant overhead.. we still have to read all the data to memory and spill, etc. if we could push-down the sorting already to the DataSourceScanExec - instead of reading the files one after one we could merge stream the by the right order, I'm sure it would be much more effective. by that I'm trying to imitate HBase - the way it dedupes by key. > Missing optimization for Aggregations/Windowing on a bucketed table > --- > > Key: SPARK-24528 > URL: https://issues.apache.org/jira/browse/SPARK-24528 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > Closely related to SPARK-24410, we're trying to optimize a very common use > case we have of getting the most updated row by id from a fact table. > We're saving the table bucketed to skip the shuffle stage, but we're still > "waste" time on the Sort operator evethough the data is already sorted. > here's a good example: > {code:java} > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write > .mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("key", "t1") > .saveAsTable("a1"){code} > {code:java} > sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain > == Physical Plan == > SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, > key#24L, t1, t1#25L, t2, t2#26L))]) > +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, > t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) > +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, > Format: Parquet, Location: ...{code} > > and here's a bad example, but more realistic: > {code:java} > sparkSession.sql("set spark.sql.shuffle.partitions=2") > sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain > == Physical Plan == > SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, > key#32L, t1, t1#33L, t2, t2#34L))]) > +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, > t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) > +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0 > +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, > Format: Parquet, Location: ... > {code} > > I've traced the problem to DataSourceScanExec#235: > {code:java} > val sortOrder = if (sortColumns.nonEmpty) { > // In case of bucketing, its possible to have multiple files belonging to > the > // same bucket in a given relation. Each of these files are locally sorted > // but those files combined together are not globally sorted. Given that, > // the RDD partition will not be sorted even if the relation has sort > columns set > // Current solution is to check if all the buckets have a single file in it > val files = selectedPartitions.flatMap(partition => partition.files) > val bucketToFilesGrouping = > files.map(_.getPath.getName).groupBy(file => > BucketingUtils.getBucketId(file)) > val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= > 1){code} > so obviously the code avoids dealing with this situation now.. > could you think of a way to solve this or bypass it? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table
[ https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509465#comment-16509465 ] Ohad Raviv commented on SPARK-24528: [~cloud_fan], [~viirya] - Hi I found somewhat similar issue to [SPARK-24410], would really appreciate if you could tell me what you think.. > Missing optimization for Aggregations/Windowing on a bucketed table > --- > > Key: SPARK-24528 > URL: https://issues.apache.org/jira/browse/SPARK-24528 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > Closely related to SPARK-24410, we're trying to optimize a very common use > case we have of getting the most updated row by id from a fact table. > We're saving the table bucketed to skip the shuffle stage, but we're still > "waste" time on the Sort operator evethough the data is already sorted. > here's a good example: > {code:java} > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write > .mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("key", "t1") > .saveAsTable("a1"){code} > {code:java} > sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain > == Physical Plan == > SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, > key#24L, t1, t1#25L, t2, t2#26L))]) > +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, > t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) > +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, > Format: Parquet, Location: ...{code} > > and here's a bad example, but more realistic: > {code:java} > sparkSession.sql("set spark.sql.shuffle.partitions=2") > sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain > == Physical Plan == > SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, > key#32L, t1, t1#33L, t2, t2#34L))]) > +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, > t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) > +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0 > +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, > Format: Parquet, Location: ... > {code} > > I've traced the problem to DataSourceScanExec#235: > {code:java} > val sortOrder = if (sortColumns.nonEmpty) { > // In case of bucketing, its possible to have multiple files belonging to > the > // same bucket in a given relation. Each of these files are locally sorted > // but those files combined together are not globally sorted. Given that, > // the RDD partition will not be sorted even if the relation has sort > columns set > // Current solution is to check if all the buckets have a single file in it > val files = selectedPartitions.flatMap(partition => partition.files) > val bucketToFilesGrouping = > files.map(_.getPath.getName).groupBy(file => > BucketingUtils.getBucketId(file)) > val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= > 1){code} > so obviously the code avoids dealing with this situation now.. > could you think of a way to solve this or bypass it? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table
Ohad Raviv created SPARK-24528: -- Summary: Missing optimization for Aggregations/Windowing on a bucketed table Key: SPARK-24528 URL: https://issues.apache.org/jira/browse/SPARK-24528 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0, 2.4.0 Reporter: Ohad Raviv Closely related to SPARK-24410, we're trying to optimize a very common use case we have of getting the most updated row by id from a fact table. We're saving the table bucketed to skip the shuffle stage, but we're still "waste" time on the Sort operator evethough the data is already sorted. here's a good example: {code:java} sparkSession.range(N).selectExpr( "id as key", "id % 2 as t1", "id % 3 as t2") .repartition(col("key")) .write .mode(SaveMode.Overwrite) .bucketBy(3, "key") .sortBy("key", "t1") .saveAsTable("a1"){code} {code:java} sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain == Physical Plan == SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, Format: Parquet, Location: ...{code} and here's a bad example, but more realistic: {code:java} sparkSession.sql("set spark.sql.shuffle.partitions=2") sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain == Physical Plan == SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0 +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, Format: Parquet, Location: ... {code} I've traced the problem to DataSourceScanExec#235: {code:java} val sortOrder = if (sortColumns.nonEmpty) { // In case of bucketing, its possible to have multiple files belonging to the // same bucket in a given relation. Each of these files are locally sorted // but those files combined together are not globally sorted. Given that, // the RDD partition will not be sorted even if the relation has sort columns set // Current solution is to check if all the buckets have a single file in it val files = selectedPartitions.flatMap(partition => partition.files) val bucketToFilesGrouping = files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1){code} so obviously the code avoids dealing with this situation now.. could you think of a way to solve this or bypass it? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables
[ https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493561#comment-16493561 ] Ohad Raviv commented on SPARK-24410: [~sowen], [~cloud_fan] - could you please check if my assessment is correct? thanks! > Missing optimization for Union on bucketed tables > - > > Key: SPARK-24410 > URL: https://issues.apache.org/jira/browse/SPARK-24410 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ohad Raviv >Priority: Major > > A common use-case we have is of a partially aggregated table and daily > increments that we need to further aggregate. we do this my unioning the two > tables and aggregating again. > we tried to optimize this process by bucketing the tables, but currently it > seems that the union operator doesn't leverage the tables being bucketed > (like the join operator). > for example, for two bucketed tables a1,a2: > {code} > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write > .mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("t1") > .saveAsTable("a1") > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write.mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("t1") > .saveAsTable("a2") > {code} > for the join query we get the "SortMergeJoin" > {code} > select * from a1 join a2 on (a1.key=a2.key) > == Physical Plan == > *(3) SortMergeJoin [key#24L], [key#27L], Inner > :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0 > : +- *(1) Project [key#24L, t1#25L, t2#26L] > : +- *(1) Filter isnotnull(key#24L) > :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct > +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0 >+- *(2) Project [key#27L, t1#28L, t2#29L] > +- *(2) Filter isnotnull(key#27L) > +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct > {code} > but for aggregation after union we get a shuffle: > {code} > select key,count(*) from (select * from a1 union all select * from a2)z group > by key > == Physical Plan == > *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, > count(1)#36L]) > +- Exchange hashpartitioning(key#25L, 1) >+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], > output=[key#25L, count#38L]) > +- Union > :- *(1) Project [key#25L] > : +- *(1) FileScan parquet default.a1[key#25L] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], > PushedFilters: [], ReadSchema: struct > +- *(2) Project [key#28L] > +- *(2) FileScan parquet default.a2[key#28L] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], > PushedFilters: [], ReadSchema: struct > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24410) Missing optimization for Union on bucketed tables
Ohad Raviv created SPARK-24410: -- Summary: Missing optimization for Union on bucketed tables Key: SPARK-24410 URL: https://issues.apache.org/jira/browse/SPARK-24410 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Ohad Raviv A common use-case we have is of a partially aggregated table and daily increments that we need to further aggregate. we do this my unioning the two tables and aggregating again. we tried to optimize this process by bucketing the tables, but currently it seems that the union operator doesn't leverage the tables being bucketed (like the join operator). for example, for two bucketed tables a1,a2: {code} sparkSession.range(N).selectExpr( "id as key", "id % 2 as t1", "id % 3 as t2") .repartition(col("key")) .write .mode(SaveMode.Overwrite) .bucketBy(3, "key") .sortBy("t1") .saveAsTable("a1") sparkSession.range(N).selectExpr( "id as key", "id % 2 as t1", "id % 3 as t2") .repartition(col("key")) .write.mode(SaveMode.Overwrite) .bucketBy(3, "key") .sortBy("t1") .saveAsTable("a2") {code} for the join query we get the "SortMergeJoin" {code} select * from a1 join a2 on (a1.key=a2.key) == Physical Plan == *(3) SortMergeJoin [key#24L], [key#27L], Inner :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0 : +- *(1) Project [key#24L, t1#25L, t2#26L] : +- *(1) Filter isnotnull(key#24L) :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0 +- *(2) Project [key#27L, t1#28L, t2#29L] +- *(2) Filter isnotnull(key#27L) +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct {code} but for aggregation after union we get a shuffle: {code} select key,count(*) from (select * from a1 union all select * from a2)z group by key == Physical Plan == *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, count(1)#36L]) +- Exchange hashpartitioning(key#25L, 1) +- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], output=[key#25L, count#38L]) +- Union :- *(1) Project [key#25L] : +- *(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- *(2) Project [key#28L] +- *(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
[ https://issues.apache.org/jira/browse/SPARK-23985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439114#comment-16439114 ] Ohad Raviv commented on SPARK-23985: I see in the Optimizer that filters are getting pushed only if they appear in the partitionSpec as they are. Looks like we need to add to Expression some kind of property that indicates weather we can push through it. More trivial example than Concat could bu Struct. [~cloud_fan] - I see you have dealt with this code about a year ago, could you please take a look? Ohad. > predicate push down doesn't work with simple compound partition spec > > > Key: SPARK-23985 > URL: https://issues.apache.org/jira/browse/SPARK-23985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Ohad Raviv >Priority: Minor > > while predicate push down works with this query: > {code:sql} > select *, row_number() over (partition by a order by b) from t1 where a>1 > {code} > it dowsn't work with: > {code:sql} > select *, row_number() over (partition by concat(a,'lit') order by b) from t1 > where a>1 > {code} > > I added a test to FilterPushdownSuite which I think recreates the problem: > {code:scala} > test("Window: predicate push down -- ohad") { > val winExpr = windowExpr(count('b), > windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) > val originalQuery = testRelation.select('a, 'b, 'c, > winExpr.as('window)).where('a > 1) > val correctAnswer = testRelation > .where('a > 1).select('a, 'b, 'c) > .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) > .select('a, 'b, 'c, 'window).analyze > comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) > } > {code} > will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23985) predicate push down doesn't work with simple compound partition spec
Ohad Raviv created SPARK-23985: -- Summary: predicate push down doesn't work with simple compound partition spec Key: SPARK-23985 URL: https://issues.apache.org/jira/browse/SPARK-23985 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Ohad Raviv while predicate push down works with this query: {code:sql} select *, row_number() over (partition by a order by b) from t1 where a>1 {code} it dowsn't work with: {code:sql} select *, row_number() over (partition by concat(a,'lit') order by b) from t1 where a>1 {code} I added a test to FilterPushdownSuite which I think recreates the problem: {code:scala} test("Window: predicate push down -- ohad") { val winExpr = windowExpr(count('b), windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1) val correctAnswer = testRelation .where('a > 1).select('a, 'b, 'c) .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) .select('a, 'b, 'c, 'window).analyze comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } {code} will try to create a PR with a correction -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22910) Wrong results in Spark Job because failed to move to Trash
Ohad Raviv created SPARK-22910: -- Summary: Wrong results in Spark Job because failed to move to Trash Key: SPARK-22910 URL: https://issues.apache.org/jira/browse/SPARK-22910 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0, 2.1.0 Reporter: Ohad Raviv Our Spark job has completed with status successful although the data save was corrupted. What happened is that we have a monthly job. each run overwrites the output of the previous run. we happened to change the sql.shuffle.partitions number between the runs from 2000 to 1000, and what happened was that the new run had Warn failure of moving the old data to the user's .Trash because it was full. because it was only a warning the process continued and overwritten the new 1000 files - while leaving most of the old remaining 1000 files in their place. this resulted that in the final output we had a folder with mix of old and new data and that caused corruption in the process. the post mortem is relatively easy to understand. {code} hadoop fs -ls /the/folder -rwxr-xr-x 3 spark_user spark_user 209012005 2017-12-10 14:20 /the/folder/part-0.gz . . -rwxr-xr-x 3 spark_user spark_user 34899 2017-11-17 06:39 /the/folder/part-01990.gz {code} and in the driver's log: {code} 17/12/10 15:10:00 WARN Hive: Directory hdfs:///the/folder cannot be removed: java.io.IOException: Failed to move to trash: hdfs:///the/folder/part-0.gz java.io.IOException: Failed to move to trash: hdfs:///the/folder/part-0.gz at org.apache.hadoop.fs.TrashPolicyDefault.moveToTrash(TrashPolicyDefault.java:160) at org.apache.hadoop.fs.Trash.moveToTrash(Trash.java:109) at org.apache.hadoop.fs.Trash.moveToAppropriateTrash(Trash.java:90) at org.apache.hadoop.hive.shims.Hadoop23Shims.moveToAppropriateTrash(Hadoop23Shims.java:272) at org.apache.hadoop.hive.common.FileUtils.moveToTrash(FileUtils.java:603) at org.apache.hadoop.hive.common.FileUtils.trashFilesUnderDir(FileUtils.java:586) at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:2851) at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1640) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.sql.hive.client.Shim_v0_14.loadTable(HiveShim.scala:716) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply$mcV$sp(HiveClientImpl.scala:672) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:672) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:672) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:283) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:230) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:229) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:272) at org.apache.spark.sql.hive.client.HiveClientImpl.loadTable(HiveClientImpl.scala:671) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply$mcV$sp(HiveExternalCatalog.scala:741) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:739) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:739) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:95) at org.apache.spark.sql.hive.HiveExternalCatalog.loadTable(HiveExternalCatalog.scala:739) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:323) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:170) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:347) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at org.apache.spark.sql.execution.Sp
[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241887#comment-16241887 ] Ohad Raviv commented on SPARK-21657: Hi, I created a pull request: https://github.com/apache/spark/pull/19683 would appreciate if you could take a look. > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0 >Reporter: Ruslan Dautkhanov > Labels: cache, caching, collections, nested_types, performance, > pyspark, sparksql, sql > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sized nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling of 50,000 (see attached pyspark script), it took 7 hours to > explode the nested collections (\!) of 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224420#comment-16224420 ] Ohad Raviv commented on SPARK-21657: After some debugging, I think I understand the tricky part here. because there are outer fields in the query we set join=true for the Generate class, and because the Generator uses the array as child it can't be removed from the Generate output. I that because omitting the original column is so common it would make sense to add another attribute to the Generate class, like {{omitChild: Boolean}} and let the Optimizer turn it on with appropriate Rule. what do you think? > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0 >Reporter: Ruslan Dautkhanov > Labels: cache, caching, collections, nested_types, performance, > pyspark, sparksql, sql > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sized nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling of 50,000 (see attached pyspark script), it took 7 hours to > explode the nested collections (\!) of 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224374#comment-16224374 ] Ohad Raviv commented on SPARK-21657: ok i found the relevant rule: {code:java|title=Optimizer.scala.java|borderStyle=solid} // Turn off `join` for Generate if no column from it's child is used case p @ Project(_, g: Generate) if g.join && !g.outer && p.references.subsetOf(g.generatedSet) => p.copy(child = g.copy(join = false)) {code} I'm not sure yet why it doesn't work. > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0 >Reporter: Ruslan Dautkhanov > Labels: cache, caching, collections, nested_types, performance, > pyspark, sparksql, sql > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sized nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling of 50,000 (see attached pyspark script), it took 7 hours to > explode the nested collections (\!) of 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224365#comment-16224365 ] Ohad Raviv commented on SPARK-21657: After futher investigating I believe that my assesment is correct, the former case creates a generator with join=true while the later with join=false, as you can see in plans above (I also debugged). this causes the very long array of size 100k to be duplicated 100k times and afterwards get pruned because its column is not in the final projection. I'm not sure what's the best way to address this issue - ammend the generate operator according to the projection. in the meanwhile, in our case, I worked around that by manually adding the outer fields into each of structs of the array and then exploded only the array. it's an ugly solution but reduces our query time from 6 hours to about 2 mins. > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0 >Reporter: Ruslan Dautkhanov > Labels: cache, caching, collections, nested_types, performance, > pyspark, sparksql, sql > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sized nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling of 50,000 (see attached pyspark script), it took 7 hours to > explode the nested collections (\!) of 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16223946#comment-16223946 ] Ohad Raviv commented on SPARK-21657: Sure, the plan for {code:java} val df_exploded = df.select(expr("c1"), explode($"c_arr").as("c2")).selectExpr("c1" ,"c2.*") {code} is {noformat} == Parsed Logical Plan == 'Project [unresolvedalias('c1, None), ArrayBuffer(c2).*] +- Project [c1#6, c2#25] +- Generate explode(c_arr#7), true, false, [c2#25] +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#3, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#4] +- ExternalRDD [obj#2] == Analyzed Logical Plan == c1: string, _1: string, _2: string, _3: string, _4: string Project [c1#6, c2#25._1 AS _1#40, c2#25._2 AS _2#41, c2#25._3 AS _3#42, c2#25._4 AS _4#43] +- Project [c1#6, c2#25] +- Generate explode(c_arr#7), true, false, [c2#25] +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#3, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#4] +- ExternalRDD [obj#2] == Optimized Logical Plan == Project [c1#6, c2#25._1 AS _1#40, c2#25._2 AS _2#41, c2#25._3 AS _3#42, c2#25._4 AS _4#43] +- Generate explode(c_arr#7), true, false, [c2#25] +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala
[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16223902#comment-16223902 ] Ohad Raviv commented on SPARK-21657: I Switched to toArray instead of toList in the above code and I did get an improvement by factor of 2. but we still remain with the main bottleneck. now the diff in the above example between: {code:java} val df_exploded = df.select(expr("c1"), explode($"c_arr").as("c2")) {code} and: {code:java} val df_exploded = df.select(explode($"c_arr").as("c2")) {code} is 128 secs vs. 3 secs. Again I profiled the former and saw that all the time got consumed in: org.apache.spark.unsafe.Platform.copyMemory() 97.548096 23,991 ms (97.5%) the obvious diff between the execution plans is that the former has two WholeStageCodeGen plans and the later just one. I didn't exactly understood the generated code but I would guess that what happens is that in the problematic case the generated explode code is actually multiplying the long array to all the exploded rows and only filters it in the end. Please see if you can verify it or think on a workaround for it. > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0 >Reporter: Ruslan Dautkhanov > Labels: cache, caching, collections, nested_types, performance, > pyspark, sparksql, sql > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sized nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling of 50,000 (see attached pyspark script), it took 7 hours to > explode the nested collections (\!) of 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16222312#comment-16222312 ] Ohad Raviv edited comment on SPARK-21657 at 10/27/17 12:53 PM: --- Hi, Just ran a profiler for this code: {code:java} val BASE = 1 val N = 10 val df = sc.parallelize(List(("1234567890", (BASE to (BASE+N)).map(x => (x.toString, (x+1).toString, (x+2).toString, (x+3).toString)).toList ))).toDF("c1", "c_arr") val df_exploded = df.select(expr("c1"), explode($"c_arr").as("c2")) df_exploded.write.mode("overwrite").format("json").save("/tmp/blah_explode") {code} and it looks like [~srowen] is right, most of the time is spent in scala.collection.immutable.List.apply() (72.1%). inside: org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext() (100%) I logged the generated code and found the problematic code: {code:java} if (serializefromobject_funcResult1 != null) { serializefromobject_value5 = (scala.collection.immutable.List) serializefromobject_funcResult1; } else { serializefromobject_isNull5 = true; } . . . while (serializefromobject_loopIndex < serializefromobject_dataLength) { MapObjects_loopValue0 = (scala.Tuple4) (serializefromobject_value5.apply(serializefromobject_loopIndex)); {code} so that causes the quadratic time complexity. However, I'm not sure where is the code that generates this list instead of array for the exploded array. was (Author: uzadude): Hi, Just ran a profiler for this code: {code:scala} val BASE = 1 val N = 10 val df = sc.parallelize(List(("1234567890", (BASE to (BASE+N)).map(x => (x.toString, (x+1).toString, (x+2).toString, (x+3).toString)).toList ))).toDF("c1", "c_arr") val df_exploded = df.select(expr("c1"), explode($"c_arr").as("c2")) df_exploded.write.mode("overwrite").format("json").save("/tmp/blah_explode") {code} and it looks like [~srowen] is right, most of the time is spent in scala.collection.immutable.List.apply() (72.1%). inside: org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext() (100%) I logged the generated code and found the problematic code: {code:scala} if (serializefromobject_funcResult1 != null) { serializefromobject_value5 = (scala.collection.immutable.List) serializefromobject_funcResult1; } else { serializefromobject_isNull5 = true; } . . . while (serializefromobject_loopIndex < serializefromobject_dataLength) { MapObjects_loopValue0 = (scala.Tuple4) (serializefromobject_value5.apply(serializefromobject_loopIndex)); {code} so that causes the quadratic time complexity. However, I'm not sure where is the code that generates this list instead of array for the exploded array. > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0 >Reporter: Ruslan Dautkhanov > Labels: cache, caching, collections, nested_types, performance, > pyspark, sparksql, sql > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sized nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling of 50,000 (see attached pyspark script), it took 7 hours to > explode the nested collections (\!) of 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16222312#comment-16222312 ] Ohad Raviv commented on SPARK-21657: Hi, Just ran a profiler for this code: {code:scala} val BASE = 1 val N = 10 val df = sc.parallelize(List(("1234567890", (BASE to (BASE+N)).map(x => (x.toString, (x+1).toString, (x+2).toString, (x+3).toString)).toList ))).toDF("c1", "c_arr") val df_exploded = df.select(expr("c1"), explode($"c_arr").as("c2")) df_exploded.write.mode("overwrite").format("json").save("/tmp/blah_explode") {code} and it looks like [~srowen] is right, most of the time is spent in scala.collection.immutable.List.apply() (72.1%). inside: org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext() (100%) I logged the generated code and found the problematic code: {code:scala} if (serializefromobject_funcResult1 != null) { serializefromobject_value5 = (scala.collection.immutable.List) serializefromobject_funcResult1; } else { serializefromobject_isNull5 = true; } . . . while (serializefromobject_loopIndex < serializefromobject_dataLength) { MapObjects_loopValue0 = (scala.Tuple4) (serializefromobject_value5.apply(serializefromobject_loopIndex)); {code} so that causes the quadratic time complexity. However, I'm not sure where is the code that generates this list instead of array for the exploded array. > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0 >Reporter: Ruslan Dautkhanov > Labels: cache, caching, collections, nested_types, performance, > pyspark, sparksql, sql > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sized nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling of 50,000 (see attached pyspark script), it took 7 hours to > explode the nested collections (\!) of 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220450#comment-16220450 ] Ohad Raviv commented on SPARK-21657: Hi, Wanted to add that we're facing exactly the same issue. 6 hours work for one row that contains 250k array (of struct of 4 strings). Just wanted to state that if we explode only the array, e.g, in your example: cached_df = sqlc.sql('select explode(amft) from ' + table_name) it finishes in about 3 mins. it happens in Spark 2.1 and also 2.2, eventhough SPARK-16998 was resolved. > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0 >Reporter: Ruslan Dautkhanov > Labels: cache, caching, collections, nested_types, performance, > pyspark, sparksql, sql > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sized nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling of 50,000 (see attached pyspark script), it took 7 hours to > explode the nested collections (\!) of 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19368) Very bad performance in BlockMatrix.toIndexedRowMatrix()
[ https://issues.apache.org/jira/browse/SPARK-19368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15842581#comment-15842581 ] Ohad Raviv commented on SPARK-19368: well, not with the same elegant code. the main problem is that Sparse Vector is very inefficient to manipulate. from Breeze's site: {quote} You should not be adding lots of values to a SparseVector if you want good speed. SparseVectors have to maintain the invariant that the index array is always sorted, which makes insertions expensive. {quote} and then they suggest to use VectorBuilder for instead, but that is only good for SparseVector. with DenseVector the current implementation is better. so if you want I can just create two different functions for Sparse/Desne cases. > Very bad performance in BlockMatrix.toIndexedRowMatrix() > > > Key: SPARK-19368 > URL: https://issues.apache.org/jira/browse/SPARK-19368 > Project: Spark > Issue Type: Improvement > Components: MLlib >Affects Versions: 2.0.0, 2.1.0 >Reporter: Ohad Raviv >Priority: Minor > Attachments: profiler snapshot.png > > > In SPARK-12869, this function was optimized for the case of dense matrices > using Breeze. However, I have a case with very very sparse matrices which > suffers a great deal from this optimization. A process we have that took > about 20 mins now takes about 6.5 hours. > Here is a sample code to see the difference: > {quote} > val n = 4 > val density = 0.0002 > val rnd = new Random(123) > val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield > (rnd.nextInt\(n\), rnd.nextInt\(n\), rnd.nextDouble())) > .groupBy(t => (t._1,t._2)).map\(t => t._2.last).map\{ case > (i,j,d) => (i,(j,d)) }.toSeq > val entries: RDD\[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10) > val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, > Vectors.sparse(n, e._2.toSeq))) > val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n) > val t1 = System.nanoTime() > > println(mat.toBlockMatrix(1,1).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum()) > val t2 = System.nanoTime() > println("took: " + (t2 - t1) / 1000 / 1000 + " ms") > println("") > > println(mat.toBlockMatrix(1,1).toIndexedRowMatrix().rows.map(_.vector.numActives).sum()) > val t3 = System.nanoTime() > println("took: " + (t3 - t2) / 1000 / 1000 + " ms") > println("") > {quote} > I get: > {quote} > took: 9404 ms > > took: 57350 ms > > {quote} > Looking at it a little with a profiler, I see that the problem is with the > SliceVector.update() and SparseVector.apply. > I currently work-around this by doing: > {quote} > blockMatrix.toCoordinateMatrix().toIndexedRowMatrix() > {quote} > like it was in version 1.6. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19368) Very bad performance in BlockMatrix.toIndexedRowMatrix()
[ https://issues.apache.org/jira/browse/SPARK-19368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15839673#comment-15839673 ] Ohad Raviv commented on SPARK-19368: caused by.. > Very bad performance in BlockMatrix.toIndexedRowMatrix() > > > Key: SPARK-19368 > URL: https://issues.apache.org/jira/browse/SPARK-19368 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 2.0.0, 2.1.0 >Reporter: Ohad Raviv > Attachments: profiler snapshot.png > > > In SPARK-12869, this function was optimized for the case of dense matrices > using Breeze. However, I have a case with very very sparse matrices which > suffers a great deal from this optimization. A process we have that took > about 20 mins now takes about 6.5 hours. > Here is a sample code to see the difference: > {quote} > val n = 4 > val density = 0.0002 > val rnd = new Random(123) > val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield > (rnd.nextInt\(n\), rnd.nextInt\(n\), rnd.nextDouble())) > .groupBy(t => (t._1,t._2)).map\(t => t._2.last).map\{ case > (i,j,d) => (i,(j,d)) }.toSeq > val entries: RDD\[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10) > val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, > Vectors.sparse(n, e._2.toSeq))) > val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n) > val t1 = System.nanoTime() > > println(mat.toBlockMatrix(1,1).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum()) > val t2 = System.nanoTime() > println("took: " + (t2 - t1) / 1000 / 1000 + " ms") > println("") > > println(mat.toBlockMatrix(1,1).toIndexedRowMatrix().rows.map(_.vector.numActives).sum()) > val t3 = System.nanoTime() > println("took: " + (t3 - t2) / 1000 / 1000 + " ms") > println("") > {quote} > I get: > {quote} > took: 9404 ms > > took: 57350 ms > > {quote} > Looking at it a little with a profiler, I see that the problem is with the > SliceVector.update() and SparseVector.apply. > I currently work-around this by doing: > {quote} > blockMatrix.toCoordinateMatrix().toIndexedRowMatrix() > {quote} > like it was in version 1.6. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19368) Very bad performance in BlockMatrix.toIndexedRowMatrix()
[ https://issues.apache.org/jira/browse/SPARK-19368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv updated SPARK-19368: --- Attachment: profiler snapshot.png > Very bad performance in BlockMatrix.toIndexedRowMatrix() > > > Key: SPARK-19368 > URL: https://issues.apache.org/jira/browse/SPARK-19368 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 2.0.0, 2.1.0 >Reporter: Ohad Raviv > Attachments: profiler snapshot.png > > > In SPARK-12869, this function was optimized for the case of dense matrices > using Breeze. However, I have a case with very very sparse matrices which > suffers a great deal from this optimization. A process we have that took > about 20 mins now takes about 6.5 hours. > Here is a sample code to see the difference: > {quote} > val n = 4 > val density = 0.0002 > val rnd = new Random(123) > val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield > (rnd.nextInt\(n\), rnd.nextInt\(n\), rnd.nextDouble())) > .groupBy(t => (t._1,t._2)).map\(t => t._2.last).map\{ case > (i,j,d) => (i,(j,d)) }.toSeq > val entries: RDD\[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10) > val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, > Vectors.sparse(n, e._2.toSeq))) > val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n) > val t1 = System.nanoTime() > > println(mat.toBlockMatrix(1,1).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum()) > val t2 = System.nanoTime() > println("took: " + (t2 - t1) / 1000 / 1000 + " ms") > println("") > > println(mat.toBlockMatrix(1,1).toIndexedRowMatrix().rows.map(_.vector.numActives).sum()) > val t3 = System.nanoTime() > println("took: " + (t3 - t2) / 1000 / 1000 + " ms") > println("") > {quote} > I get: > {quote} > took: 9404 ms > > took: 57350 ms > > {quote} > Looking at it a little with a profiler, I see that the problem is with the > SliceVector.update() and SparseVector.apply. > I currently work-around this by doing: > {quote} > blockMatrix.toCoordinateMatrix().toIndexedRowMatrix() > {quote} > like it was in version 1.6. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19368) Very bad performance in BlockMatrix.toIndexedRowMatrix()
[ https://issues.apache.org/jira/browse/SPARK-19368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv updated SPARK-19368: --- Description: In SPARK-12869, this function was optimized for the case of dense matrices using Breeze. However, I have a case with very very sparse matrices which suffers a great deal from this optimization. A process we have that took about 20 mins now takes about 6.5 hours. Here is a sample code to see the difference: {quote} val n = 4 val density = 0.0002 val rnd = new Random(123) val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield (rnd.nextInt\(n\), rnd.nextInt\(n\), rnd.nextDouble())) .groupBy(t => (t._1,t._2)).map\(t => t._2.last).map\{ case (i,j,d) => (i,(j,d)) }.toSeq val entries: RDD\[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10) val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, Vectors.sparse(n, e._2.toSeq))) val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n) val t1 = System.nanoTime() println(mat.toBlockMatrix(1,1).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum()) val t2 = System.nanoTime() println("took: " + (t2 - t1) / 1000 / 1000 + " ms") println("") println(mat.toBlockMatrix(1,1).toIndexedRowMatrix().rows.map(_.vector.numActives).sum()) val t3 = System.nanoTime() println("took: " + (t3 - t2) / 1000 / 1000 + " ms") println("") {quote} I get: {quote} took: 9404 ms took: 57350 ms {quote} Looking at it a little with a profiler, I see that the problem is with the SliceVector.update() and SparseVector.apply. I currently work-around this by doing: {quote} blockMatrix.toCoordinateMatrix().toIndexedRowMatrix() {quote} like it was in version 1.6. was: In SPARK-12869, this function was optimized for the case of dense matrices using Breeze. However, I have a case with very very sparse matrices which suffers a great deal from this optimization. A process we have that took about 20 mins now takes about 6.5 hours. Here is a sample code to see the difference: {quote} val n = 4 val density = 0.0002 val rnd = new Random(123) val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield (rnd.nextInt(n), rnd.nextInt(n), rnd.nextDouble())) .groupBy(t => (t._1,t._2)).map(t => t._2.last).map { case (i,j,d) => (i,(j,d)) }.toSeq val entries: RDD[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10) val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, Vectors.sparse(n, e._2.toSeq))) val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n) val t1 = System.nanoTime() println(mat.toBlockMatrix(1,1).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum()) val t2 = System.nanoTime() println("took: " + (t2 - t1) / 1000 / 1000 + " ms") println("") println(mat.toBlockMatrix(1,1).toIndexedRowMatrix().rows.map(_.vector.numActives).sum()) val t3 = System.nanoTime() println("took: " + (t3 - t2) / 1000 / 1000 + " ms") println("") {quote} I get: {quote} took: 9404 ms took: 57350 ms {quote} Looking at it a little with a profiler, I see that the problem is with the SliceVector.update() and SparseVector.apply. I currently work-around this by doing: BlockMatrix.toCoordinateMatrix().toIndexedRowMatrix() like it was in the previous version. > Very bad performance in BlockMatrix.toIndexedRowMatrix() > > > Key: SPARK-19368 > URL: https://issues.apache.org/jira/browse/SPARK-19368 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 2.0.0, 2.1.0 >Reporter: Ohad Raviv > > In SPARK-12869, this function was optimized for the case of dense matrices > using Breeze. However, I have a case with very very sparse matrices which > suffers a great deal from this optimization. A process we have that took > about 20 mins now takes about 6.5 hours. > Here is a sample code to see the difference: > {quote} > val n = 4 > val density = 0.0002 > val rnd = new Random(123) > val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield > (rnd.nextInt\(n\), rnd.nextInt\(n\), rnd.nextDouble())) > .groupBy(t => (t._1,t._2)).map\(t =>
[jira] [Created] (SPARK-19368) Very bad performance in BlockMatrix.toIndexedRowMatrix()
Ohad Raviv created SPARK-19368: -- Summary: Very bad performance in BlockMatrix.toIndexedRowMatrix() Key: SPARK-19368 URL: https://issues.apache.org/jira/browse/SPARK-19368 Project: Spark Issue Type: Bug Components: MLlib Affects Versions: 2.1.0, 2.0.0 Reporter: Ohad Raviv In SPARK-12869, this function was optimized for the case of dense matrices using Breeze. However, I have a case with very very sparse matrices which suffers a great deal from this optimization. A process we have that took about 20 mins now takes about 6.5 hours. Here is a sample code to see the difference: {quote} val n = 4 val density = 0.0002 val rnd = new Random(123) val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield (rnd.nextInt(n), rnd.nextInt(n), rnd.nextDouble())) .groupBy(t => (t._1,t._2)).map(t => t._2.last).map { case (i,j,d) => (i,(j,d)) }.toSeq val entries: RDD[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10) val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, Vectors.sparse(n, e._2.toSeq))) val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n) val t1 = System.nanoTime() println(mat.toBlockMatrix(1,1).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum()) val t2 = System.nanoTime() println("took: " + (t2 - t1) / 1000 / 1000 + " ms") println("") println(mat.toBlockMatrix(1,1).toIndexedRowMatrix().rows.map(_.vector.numActives).sum()) val t3 = System.nanoTime() println("took: " + (t3 - t2) / 1000 / 1000 + " ms") println("") {quote} I get: {quote} took: 9404 ms took: 57350 ms {quote} Looking at it a little with a profiler, I see that the problem is with the SliceVector.update() and SparseVector.apply. I currently work-around this by doing: BlockMatrix.toCoordinateMatrix().toIndexedRowMatrix() like it was in the previous version. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19230) View creation in Derby gets SQLDataException because definition gets very big
[ https://issues.apache.org/jira/browse/SPARK-19230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823644#comment-15823644 ] Ohad Raviv commented on SPARK-19230: looks like it happens since SPARK-13827 - it just caused the attribute names get much longer. As a not-so-bullet-proof work around I am now doing is to change the attribute normalization from "gen_attr_" -> "ga_" in: {quote} private def normalizedName(n: NamedExpression): String = synchronized \{ "gen_attr_" + exprIdMap.getOrElseUpdate(n.exprId.id, nextGenAttrId.getAndIncrement()) } {quote} -> {quote} private def normalizedName(n: NamedExpression): String = synchronized \{ "ga_" + exprIdMap.getOrElseUpdate(n.exprId.id, nextGenAttrId.getAndIncrement()) } {quote} I guess a better solution will be to not always invoke the Canonicalizer's NormalizedAttribute rule, just when there is an ambiguity. > View creation in Derby gets SQLDataException because definition gets very big > - > > Key: SPARK-19230 > URL: https://issues.apache.org/jira/browse/SPARK-19230 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Ohad Raviv > > somewhat related to SPARK-6024. > In our tests mockups we have a process that creates a pretty big table > definition: > {quote} > create table t1 ( > field_name_1 string, > field_name_2 string, > field_name_3 string, > . > . > . > field_name_1000 string > ) > {quote} > which succeeds. But then we add some calculated fields on top of it with a > view: > {quote} > create view v1 as > select *, > some_udf(field_name_1) as field_calc1, > some_udf(field_name_2) as field_calc2, > . > . > some_udf(field_name_10) as field_calc10 > from t1 > {quote} > And we get this exception: > {quote} > java.sql.SQLDataException: A truncation error was encountered trying to > shrink LONG VARCHAR 'SELECT `gen_attr_0` AS `field_name_1`, `gen_attr_1` AS > `field_name_2&' to length 32700. > at > org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown > Source) > at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown > Source) > at > org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown > Source) > at > org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown > Source) > at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown > Source) > at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown > Source) > at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown > Source) > at > org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeStatement(Unknown > Source) > at > org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeLargeUpdate(Unknown > Source) > at > org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeUpdate(Unknown > Source) > at > com.jolbox.bonecp.PreparedStatementHandle.executeUpdate(PreparedStatementHandle.java:205) > at > org.datanucleus.store.rdbms.ParamLoggingPreparedStatement.executeUpdate(ParamLoggingPreparedStatement.java:399) > at > org.datanucleus.store.rdbms.SQLController.executeStatementUpdate(SQLController.java:439) > at > org.datanucleus.store.rdbms.request.InsertRequest.execute(InsertRequest.java:410) > at > org.datanucleus.store.rdbms.RDBMSPersistenceHandler.insertTable(RDBMSPersistenceHandler.java:167) > at > org.datanucleus.store.rdbms.RDBMSPersistenceHandler.insertObject(RDBMSPersistenceHandler.java:143) > at > org.datanucleus.state.JDOStateManager.internalMakePersistent(JDOStateManager.java:3784) > at > org.datanucleus.state.JDOStateManager.makePersistent(JDOStateManager.java:3760) > at > org.datanucleus.ExecutionContextImpl.persistObjectInternal(ExecutionContextImpl.java:2219) > at > org.datanucleus.ExecutionContextImpl.persistObjectWork(ExecutionContextImpl.java:2065) > at > org.datanucleus.ExecutionContextImpl.persistObject(ExecutionContextImpl.java:1913) > at > org.datanucleus.ExecutionContextThreadedImpl.persistObject(ExecutionContextThreadedImpl.java:217) > at > org.datanucleus.api.jdo.JDOPersistenceManager.jdoMakePersistent(JDOPersistenceManager.java:727) > at > org.datanucleus.api.jdo.JDOPersistenceManager.makePersistent(JDOPersistenceManager.java:752) > at > org.apache.hadoop.hive.metastore.ObjectStore.createTable(ObjectStore.java:814) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImp
[jira] [Created] (SPARK-19230) View creation in Derby gets SQLDataException because definition gets very big
Ohad Raviv created SPARK-19230: -- Summary: View creation in Derby gets SQLDataException because definition gets very big Key: SPARK-19230 URL: https://issues.apache.org/jira/browse/SPARK-19230 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: Ohad Raviv somewhat related to SPARK-6024. In our tests mockups we have a process that creates a pretty big table definition: {quote} create table t1 ( field_name_1 string, field_name_2 string, field_name_3 string, . . . field_name_1000 string ) {quote} which succeeds. But then we add some calculated fields on top of it with a view: {quote} create view v1 as select *, some_udf(field_name_1) as field_calc1, some_udf(field_name_2) as field_calc2, . . some_udf(field_name_10) as field_calc10 from t1 {quote} And we get this exception: {quote} java.sql.SQLDataException: A truncation error was encountered trying to shrink LONG VARCHAR 'SELECT `gen_attr_0` AS `field_name_1`, `gen_attr_1` AS `field_name_2&' to length 32700. at org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source) at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source) at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown Source) at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown Source) at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown Source) at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown Source) at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeStatement(Unknown Source) at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeLargeUpdate(Unknown Source) at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeUpdate(Unknown Source) at com.jolbox.bonecp.PreparedStatementHandle.executeUpdate(PreparedStatementHandle.java:205) at org.datanucleus.store.rdbms.ParamLoggingPreparedStatement.executeUpdate(ParamLoggingPreparedStatement.java:399) at org.datanucleus.store.rdbms.SQLController.executeStatementUpdate(SQLController.java:439) at org.datanucleus.store.rdbms.request.InsertRequest.execute(InsertRequest.java:410) at org.datanucleus.store.rdbms.RDBMSPersistenceHandler.insertTable(RDBMSPersistenceHandler.java:167) at org.datanucleus.store.rdbms.RDBMSPersistenceHandler.insertObject(RDBMSPersistenceHandler.java:143) at org.datanucleus.state.JDOStateManager.internalMakePersistent(JDOStateManager.java:3784) at org.datanucleus.state.JDOStateManager.makePersistent(JDOStateManager.java:3760) at org.datanucleus.ExecutionContextImpl.persistObjectInternal(ExecutionContextImpl.java:2219) at org.datanucleus.ExecutionContextImpl.persistObjectWork(ExecutionContextImpl.java:2065) at org.datanucleus.ExecutionContextImpl.persistObject(ExecutionContextImpl.java:1913) at org.datanucleus.ExecutionContextThreadedImpl.persistObject(ExecutionContextThreadedImpl.java:217) at org.datanucleus.api.jdo.JDOPersistenceManager.jdoMakePersistent(JDOPersistenceManager.java:727) at org.datanucleus.api.jdo.JDOPersistenceManager.makePersistent(JDOPersistenceManager.java:752) at org.apache.hadoop.hive.metastore.ObjectStore.createTable(ObjectStore.java:814) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:114) at com.sun.proxy.$Proxy17.createTable(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1416) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1449) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107) at com.sun.proxy.$Proxy19.create_table_with_environment_context(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.create_table_with_environment_c
[jira] [Commented] (SPARK-18861) Spark-SQL unconsistent behavior with "struct" expressions
[ https://issues.apache.org/jira/browse/SPARK-18861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15750729#comment-15750729 ] Ohad Raviv commented on SPARK-18861: I think it was a problem at v2.0.0. it is better to resolve it as fixed at version 2.1 > Spark-SQL unconsistent behavior with "struct" expressions > - > > Key: SPARK-18861 > URL: https://issues.apache.org/jira/browse/SPARK-18861 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Ohad Raviv > > We are getting strangly inconsistent behavior with expressions involving > "struct". Let's start with this simple table: > {quote} > Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c").createOrReplaceTempView("t1") > sql("desc t1").show() > {quote} > Then we get this DF: > {quote} > |col_name|data_type|comment| > | a| int| | > | b| int| | > | c| int| | > {quote} > Now, although we can clearly see that all the fields are of type int, we we > run: > {quote} > sql("SELECT case when a>b then struct(a,b) else struct(c,c) end from t1") > {quote} > we get this error: > {quote} > org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (t1.`a` > > t1.`b`) THEN struct(t1.`a`, t1.`b`) ELSE struct(t1.`c`, t1.`c`) END' due to > data type mismatch: THEN and ELSE expressions should all be same type or > coercible to a common type; line 1 pos 7 > {quote} > if we try this: > {quote} > sql("SELECT case when a>b then struct(cast(a as int), cast(b as int)) else > struct(cast(c as int), cast(c as int)) end from t1") > {quote} > we get another exception: > {quote} > requirement failed: Unresolved attributes found when constructing > LocalRelation. > java.lang.IllegalArgumentException: requirement failed: Unresolved attributes > found when constructing LocalRelation. > at scala.Predef$.require(Predef.scala:224) > at > org.apache.spark.sql.catalyst.plans.logical.LocalRelation.(LocalRelation.scala:49) > {quote} > However, these do work: > {quote} > sql("SELECT case when a>b then struct(cast(a as double), cast(b as double)) > else struct(cast(c as double), cast(c as double)) end from t1") > sql("SELECT case when a>b then struct(cast(a as string), cast(b as string)) > else struct(cast(c as string), cast(c as string)) end from t1") > {quote} > any ideas? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18861) Spark-SQL unconsistent behavior with "struct" expressions
Ohad Raviv created SPARK-18861: -- Summary: Spark-SQL unconsistent behavior with "struct" expressions Key: SPARK-18861 URL: https://issues.apache.org/jira/browse/SPARK-18861 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.0 Reporter: Ohad Raviv We are getting strangly inconsistent behavior with expressions involving "struct". Let's start with this simple table: {quote} Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c").createOrReplaceTempView("t1") sql("desc t1").show() {quote} Then we get this DF: {quote} |col_name|data_type|comment| | a| int| | | b| int| | | c| int| | {quote} Now, although we can clearly see that all the fields are of type int, we we run: {quote} sql("SELECT case when a>b then struct(a,b) else struct(c,c) end from t1") {quote} we get this error: {quote} org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (t1.`a` > t1.`b`) THEN struct(t1.`a`, t1.`b`) ELSE struct(t1.`c`, t1.`c`) END' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type; line 1 pos 7 {quote} if we try this: {quote} sql("SELECT case when a>b then struct(cast(a as int), cast(b as int)) else struct(cast(c as int), cast(c as int)) end from t1") {quote} we get another exception: {quote} requirement failed: Unresolved attributes found when constructing LocalRelation. java.lang.IllegalArgumentException: requirement failed: Unresolved attributes found when constructing LocalRelation. at scala.Predef$.require(Predef.scala:224) at org.apache.spark.sql.catalyst.plans.logical.LocalRelation.(LocalRelation.scala:49) {quote} However, these do work: {quote} sql("SELECT case when a>b then struct(cast(a as double), cast(b as double)) else struct(cast(c as double), cast(c as double)) end from t1") sql("SELECT case when a>b then struct(cast(a as string), cast(b as string)) else struct(cast(c as string), cast(c as string)) end from t1") {quote} any ideas? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17662) Dedup UDAF
[ https://issues.apache.org/jira/browse/SPARK-17662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748264#comment-15748264 ] Ohad Raviv commented on SPARK-17662: When I tried to use you suggestion I have encountered some problems, so I have opened a StackOverflow question and you can find all the details there: http://stackoverflow.com/questions/41143001/spark-sql-dedup-rows/ > Dedup UDAF > -- > > Key: SPARK-17662 > URL: https://issues.apache.org/jira/browse/SPARK-17662 > Project: Spark > Issue Type: New Feature >Reporter: Ohad Raviv > > We have a common use case od deduping a table in a creation order. > For example, we have an event log of user actions. A user marks his favorite > category from time to time. > In our analytics we would like to know only the user's last favorite category. > The data: > user_idaction_typevaluedate > 123 fav category 1 2016-02-01 > 123 fav category 4 2016-02-02 > 123 fav category 8 2016-02-03 > 123 fav category 2 2016-02-04 > we would like to get only the last update by the date column. > we could of-course do it in sql: > select * from ( > select *, row_number() over (partition by user_id,action_type order by date > desc) as rnum from tbl) > where rnum=1; > but then, I believe it can't be optimized on the mappers side and we'll get > all the data shuffled to the reducers instead of partially aggregated in the > map side. > We have written a UDAF for this, but then we have other issues - like > blocking push-down-predicate for columns. > do you have any idea for a proper solution? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18748) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15731203#comment-15731203 ] Ohad Raviv commented on SPARK-18748: accidently. I already closed the other ticket as duplicate > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18748 > URL: https://issues.apache.org/jira/browse/SPARK-18748 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Ohad Raviv > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-18747) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv closed SPARK-18747. -- Resolution: Duplicate > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18747 > URL: https://issues.apache.org/jira/browse/SPARK-18747 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Ohad Raviv > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18748) UDF multiple evaluations causes very poor performance
Ohad Raviv created SPARK-18748: -- Summary: UDF multiple evaluations causes very poor performance Key: SPARK-18748 URL: https://issues.apache.org/jira/browse/SPARK-18748 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.1 Reporter: Ohad Raviv We have a use case where we have a relatively expensive UDF that needs to be calculated. The problem is that instead of being calculated once, it gets calculated over and over again. for example: {quote} def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is not null and c<>''").show {quote} with the output: {quote} blahblah1 blahblah1 blahblah1 +---+ | c| +---+ |nothing| +---+ {quote} You can see that for each reference of column "c" you will get the println. that causes very poor performance for our real use case. This also came out on StackOverflow: http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ with two problematic work-arounds: 1. cache() after the first time. e.g. {quote} hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not null and c<>''").show {quote} while it works, in our case we can't do that because the table is too big to cache. 2. move back and forth to rdd: {quote} val df = hiveContext.sql("select veryExpensiveCalc('a') as c") hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and c<>''").show {quote} which works but then we loose some of the optimizations like push down predicate features, etc. and its very ugly. Any ideas on how we can make the UDF get calculated just once in a reasonable way? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18747) UDF multiple evaluations causes very poor performance
Ohad Raviv created SPARK-18747: -- Summary: UDF multiple evaluations causes very poor performance Key: SPARK-18747 URL: https://issues.apache.org/jira/browse/SPARK-18747 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.1 Reporter: Ohad Raviv We have a use case where we have a relatively expensive UDF that needs to be calculated. The problem is that instead of being calculated once, it gets calculated over and over again. for example: {quote} def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is not null and c<>''").show {quote} with the output: {quote} blahblah1 blahblah1 blahblah1 +---+ | c| +---+ |nothing| +---+ {quote} You can see that for each reference of column "c" you will get the println. that causes very poor performance for our real use case. This also came out on StackOverflow: http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ with two problematic work-arounds: 1. cache() after the first time. e.g. {quote} hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not null and c<>''").show {quote} while it works, in our case we can't do that because the table is too big to cache. 2. move back and forth to rdd: {quote} val df = hiveContext.sql("select veryExpensiveCalc('a') as c") hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and c<>''").show {quote} which works but then we loose some of the optimizations like push down predicate features, etc. and its very ugly. Any ideas on how we can make the UDF get calculated just once in a reasonable way? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17662) Dedup UDAF
[ https://issues.apache.org/jira/browse/SPARK-17662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15672928#comment-15672928 ] Ohad Raviv commented on SPARK-17662: you're right, great solution! I didn't know about the "max(struct(date, *))" syntax. thanks! > Dedup UDAF > -- > > Key: SPARK-17662 > URL: https://issues.apache.org/jira/browse/SPARK-17662 > Project: Spark > Issue Type: New Feature >Reporter: Ohad Raviv > > We have a common use case od deduping a table in a creation order. > For example, we have an event log of user actions. A user marks his favorite > category from time to time. > In our analytics we would like to know only the user's last favorite category. > The data: > user_idaction_typevaluedate > 123 fav category 1 2016-02-01 > 123 fav category 4 2016-02-02 > 123 fav category 8 2016-02-03 > 123 fav category 2 2016-02-04 > we would like to get only the last update by the date column. > we could of-course do it in sql: > select * from ( > select *, row_number() over (partition by user_id,action_type order by date > desc) as rnum from tbl) > where rnum=1; > but then, I believe it can't be optimized on the mappers side and we'll get > all the data shuffled to the reducers instead of partially aggregated in the > map side. > We have written a UDAF for this, but then we have other issues - like > blocking push-down-predicate for columns. > do you have any idea for a proper solution? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17662) Dedup UDAF
Ohad Raviv created SPARK-17662: -- Summary: Dedup UDAF Key: SPARK-17662 URL: https://issues.apache.org/jira/browse/SPARK-17662 Project: Spark Issue Type: New Feature Reporter: Ohad Raviv We have a common use case od deduping a table in a creation order. For example, we have an event log of user actions. A user marks his favorite category from time to time. In our analytics we would like to know only the user's last favorite category. The data: user_idaction_typevaluedate 123 fav category 1 2016-02-01 123 fav category 4 2016-02-02 123 fav category 8 2016-02-03 123 fav category 2 2016-02-04 we would like to get only the last update by the date column. we could of-course do it in sql: select * from ( select *, row_number() over (partition by user_id,action_type order by date desc) as rnum from tbl) where rnum=1; but then, I believe it can't be optimized on the mappers side and we'll get all the data shuffled to the reducers instead of partially aggregated in the map side. We have written a UDAF for this, but then we have other issues - like blocking push-down-predicate for columns. do you have any idea for a proper solution? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16976) KCore implementation
[ https://issues.apache.org/jira/browse/SPARK-16976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15413461#comment-15413461 ] Ohad Raviv commented on SPARK-16976: well, it's not for MLlib but for GraphX and seems very much in the spirit of ConnectedComponents > KCore implementation > > > Key: SPARK-16976 > URL: https://issues.apache.org/jira/browse/SPARK-16976 > Project: Spark > Issue Type: New Feature > Components: GraphX >Reporter: Ohad Raviv >Priority: Minor > > Added K-Core implementation to GraphX. Looks like a quick win.. very simple > algorithm and can be quite handy in noise filtering. We have used it already > a few times. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16976) KCore implementation
Ohad Raviv created SPARK-16976: -- Summary: KCore implementation Key: SPARK-16976 URL: https://issues.apache.org/jira/browse/SPARK-16976 Project: Spark Issue Type: New Feature Components: GraphX Reporter: Ohad Raviv Priority: Minor Added K-Core implementation to GraphX. Looks like a quick win.. very simple algorithm and can be quite handy in noise filtering. We have used it already a few times. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16820) Sparse - Sparse matrix multiplication
[ https://issues.apache.org/jira/browse/SPARK-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401039#comment-15401039 ] Ohad Raviv commented on SPARK-16820: I will create a PR soon with a suggested fix, but tell me what you think about that.. > Sparse - Sparse matrix multiplication > - > > Key: SPARK-16820 > URL: https://issues.apache.org/jira/browse/SPARK-16820 > Project: Spark > Issue Type: New Feature > Components: ML >Affects Versions: 2.0.0 >Reporter: Ohad Raviv > > While working on MCL implementation on Spark we have encountered some > difficulties. > The main part of this process is distributed sparse matrix multiplication > that has two main steps: > 1.Simulate multiply – preparation before the real multiplication in order > to see which blocks should be multiplied. > 2.The actual blocks multiplication and summation. > In our case the sparse matrix has 50M rows and columns, and 2B non-zeros. > The current multiplication suffers from these issues: > 1.A relatively trivial bug already fixed in the first step the caused the > process to be very slow [SPARK-16469] > 2.Still after the bug fix, if we have too many blocks the Simulate > multiply will take very long time and will multiply the data many times. > (O(n^3) where n is the number of blocks) > 3.Spark supports only multiplication with Dense matrices. Thus, it > converts a Sparse matrix into a dense matrix before the multiplication. > 4.For summing the intermediate block results Spark uses Breeze’s CSC > matrix operations – here the problem is that it is very inefficient to update > a CSC matrix in a zero value. > That means that with many blocks (default block size is 1024) – in our case > 50M/1024 ~= 50K, the simulate multiply will effectively never finish or will > generate 50K*16GB ~= 1000TB of data. On the other hand, if we use bigger > block size e.g. 100k we get OutOfMemoryException in the “toDense” method of > the multiply. We have worked around that by implementing our-selves both the > Sparse multiplication and addition in a very naïve way – but at least it > works. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16821) GraphX MCL algorithm
Ohad Raviv created SPARK-16821: -- Summary: GraphX MCL algorithm Key: SPARK-16821 URL: https://issues.apache.org/jira/browse/SPARK-16821 Project: Spark Issue Type: New Feature Components: GraphX Affects Versions: 2.0.0 Reporter: Ohad Raviv we have had the need to use MCL clustering algorithm in a project we are working on. We have based our implementation on joandre's code: https://github.com/joandre/MCL_spark We had a few scaling problems that we had to work around our selves and opened a seperate Jira on them. Since we started to work on the algorithm we have been approached a few times by different people that also have the need for this algorithm. Do you think you can add this algorithm to your base code? it looks like now there isn't any graph clustering algorithm yet.. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16820) Sparse - Sparse matrix multiplication
Ohad Raviv created SPARK-16820: -- Summary: Sparse - Sparse matrix multiplication Key: SPARK-16820 URL: https://issues.apache.org/jira/browse/SPARK-16820 Project: Spark Issue Type: New Feature Components: ML Affects Versions: 2.0.0 Reporter: Ohad Raviv While working on MCL implementation on Spark we have encountered some difficulties. The main part of this process is distributed sparse matrix multiplication that has two main steps: 1. Simulate multiply – preparation before the real multiplication in order to see which blocks should be multiplied. 2. The actual blocks multiplication and summation. In our case the sparse matrix has 50M rows and columns, and 2B non-zeros. The current multiplication suffers from these issues: 1. A relatively trivial bug already fixed in the first step the caused the process to be very slow [SPARK-16469] 2. Still after the bug fix, if we have too many blocks the Simulate multiply will take very long time and will multiply the data many times. (O(n^3) where n is the number of blocks) 3. Spark supports only multiplication with Dense matrices. Thus, it converts a Sparse matrix into a dense matrix before the multiplication. 4. For summing the intermediate block results Spark uses Breeze’s CSC matrix operations – here the problem is that it is very inefficient to update a CSC matrix in a zero value. That means that with many blocks (default block size is 1024) – in our case 50M/1024 ~= 50K, the simulate multiply will effectively never finish or will generate 50K*16GB ~= 1000TB of data. On the other hand, if we use bigger block size e.g. 100k we get OutOfMemoryException in the “toDense” method of the multiply. We have worked around that by implementing our-selves both the Sparse multiplication and addition in a very naïve way – but at least it works. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16469) Long running Driver task while multiplying big matrices
Ohad Raviv created SPARK-16469: -- Summary: Long running Driver task while multiplying big matrices Key: SPARK-16469 URL: https://issues.apache.org/jira/browse/SPARK-16469 Project: Spark Issue Type: Improvement Components: MLlib Affects Versions: 2.0.0 Reporter: Ohad Raviv Priority: Minor Fix For: 2.0.0 We have a use case of multiplying very big sparse matrices. we have about 1000x1000 distributed block matrices multiplication and the simulate multiply goes like O(n^4) (n being 1000). it takes about 1.5 hours. We modified it slightly with classical hashmap and now run in about 30 seconds O(n^2). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13313) Strongly connected components doesn't find all strongly connected components
[ https://issues.apache.org/jira/browse/SPARK-13313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15199811#comment-15199811 ] Ohad Raviv commented on SPARK-13313: Hi, I am trying to use graphx's SCC and was very concerned with this issue, so I have taken this dataset and ran it with python's networkx strongly_connected_components function and got exactly the same results of 519 SCCs with maximal size = 4051. So although I don't know what is the real result, the fact that both algorithms agree make me believe that they are correct. I have also looked at the code and it looks fine to me, I don't agree that you should change the edge direction on line 89. > Strongly connected components doesn't find all strongly connected components > > > Key: SPARK-13313 > URL: https://issues.apache.org/jira/browse/SPARK-13313 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 1.6.0 >Reporter: Petar Zecevic > > Strongly connected components algorithm doesn't find all strongly connected > components. I was using Wikispeedia dataset > (http://snap.stanford.edu/data/wikispeedia.html) and the algorithm found 519 > SCCs and one of them had 4051 vertices, which in reality don't have any edges > between them. > I think the problem could be on line 89 of StronglyConnectedComponents.scala > file where EdgeDirection.In should be changed to EdgeDirection.Out. I believe > the second Pregel call should use Out edge direction, the same as the first > call because the direction is reversed in the provided sendMsg function > (message is sent to source vertex and not destination vertex). > If that is changed (line 89), the algorithm starts finding much more SCCs, > but eventually stack overflow exception occurs. I believe graph objects that > are changed through iterations should not be cached, but checkpointed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org