[jira] [Updated] (SPARK-41449) Stage level scheduling, allow to change number of executors

2022-12-08 Thread Shay Elbaz (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shay Elbaz updated SPARK-41449:
---
Description: 
Since the total/max number of executor is constant throughout the application - 
in dynamic or static allocation - there is loose control over how much GPUs 
will be requested from the resource manager. 

For example, if an application needs 500 executors for the ETL part (with N 
cores each), but it needs - *or allowed -* only 50 GPUs for the DL part, in 
practice it will request at least 500 GPUs from the RM, since 
`spark.executor.instances` is set to 500. This leads to resource management 
challenges in multi tenant environments.

A quick workaround is to repartition the RDD to 50 partitions just before 
switching resources, but it has obvious downsides. 

It would be very helpful if the total/max number of executors could also be 
configured in the Resource Profile.

  was:
Since the total/max number of executor is constant throughout the application - 
in dynamic or static allocation - there is loose control over how much GPUs 
will be requested from the resource manager. 

For example, if an application needs 500 executors for the ETL part (with N 
cores each), but it needs - *or allowed -* only 50 GPUs for the DL part, in 
practice it will request at least 500 GPUs from the RM, since 
`spark.executor.instances` is set to 500. This leads to resource management 
challenges in multi tenant environments.

A quick workaround is to repartition the RDD to 50 partitions just before 
switching resources, but it has obvious downsides. 

It would be very helpful if the total/max number of executors could be also 
configured in the Resource Profile.


> Stage level scheduling, allow to change number of executors
> ---
>
> Key: SPARK-41449
> URL: https://issues.apache.org/jira/browse/SPARK-41449
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 3.3.0, 3.3.1
>Reporter: Shay Elbaz
>Priority: Major
>  Labels: scheduler
>
> Since the total/max number of executor is constant throughout the application 
> - in dynamic or static allocation - there is loose control over how much GPUs 
> will be requested from the resource manager. 
> For example, if an application needs 500 executors for the ETL part (with N 
> cores each), but it needs - *or allowed -* only 50 GPUs for the DL part, in 
> practice it will request at least 500 GPUs from the RM, since 
> `spark.executor.instances` is set to 500. This leads to resource management 
> challenges in multi tenant environments.
> A quick workaround is to repartition the RDD to 50 partitions just before 
> switching resources, but it has obvious downsides. 
> It would be very helpful if the total/max number of executors could also be 
> configured in the Resource Profile.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41449) Stage level scheduling, allow to change number of executors

2022-12-08 Thread Shay Elbaz (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shay Elbaz updated SPARK-41449:
---
Description: 
Since the total/max number of executor is constant throughout the application - 
in dynamic or static allocation - there is loose control over how much GPUs 
will be requested from the resource manager. 

For example, if an application needs 500 executors for the ETL part (with N 
cores each), but it needs - *or allowed -* only 50 GPUs for the DL part, in 
practice it will request at least 500 GPUs from the RM, since 
`spark.executor.instances` is set to 500. This leads to resource management 
challenges in multi tenant environments.

A quick workaround is to repartition the RDD to 50 partitions just before 
switching resources, but it has obvious downsides. 

It would be very helpful if the total/max number of executors could be also 
configured in the Resource Profile.

  was:
Since the (max) number of executor is constant throughout the application - in 
dynamic or static allocation - there is loose control over how much GPUs will 
be requested from the resource manager. 

For example, if an application needs 500 executors for the ETL part (with N 
cores each), but it needs - *or allowed -* only 50 GPUs for the DL part, in 
practice it will request at least 500 GPUs from the RM, since 
`spark.executor.instances` is set to 500. This leads to resource management 
challenges in multi tenant environments.

A quick workaround is to repartition the RDD to 50 partitions, but it has 
obvious downsides. 

It would be very helpful if the total/max number of executors could be also 
configured in the Resource Profile.


> Stage level scheduling, allow to change number of executors
> ---
>
> Key: SPARK-41449
> URL: https://issues.apache.org/jira/browse/SPARK-41449
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 3.3.0, 3.3.1
>Reporter: Shay Elbaz
>Priority: Major
>  Labels: scheduler
>
> Since the total/max number of executor is constant throughout the application 
> - in dynamic or static allocation - there is loose control over how much GPUs 
> will be requested from the resource manager. 
> For example, if an application needs 500 executors for the ETL part (with N 
> cores each), but it needs - *or allowed -* only 50 GPUs for the DL part, in 
> practice it will request at least 500 GPUs from the RM, since 
> `spark.executor.instances` is set to 500. This leads to resource management 
> challenges in multi tenant environments.
> A quick workaround is to repartition the RDD to 50 partitions just before 
> switching resources, but it has obvious downsides. 
> It would be very helpful if the total/max number of executors could be also 
> configured in the Resource Profile.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41449) Stage level scheduling, allow to change number of executors

2022-12-08 Thread Shay Elbaz (Jira)
Shay Elbaz created SPARK-41449:
--

 Summary: Stage level scheduling, allow to change number of 
executors
 Key: SPARK-41449
 URL: https://issues.apache.org/jira/browse/SPARK-41449
 Project: Spark
  Issue Type: Improvement
  Components: Scheduler
Affects Versions: 3.3.1, 3.3.0
Reporter: Shay Elbaz


Since the (max) number of executor is constant throughout the application - in 
dynamic or static allocation - there is loose control over how much GPUs will 
be requested from the resource manager. 

For example, if an application needs 500 executors for the ETL part (with N 
cores each), but it needs - *or allowed -* only 50 GPUs for the DL part, in 
practice it will request at least 500 GPUs from the RM, since 
`spark.executor.instances` is set to 500. This leads to resource management 
challenges in multi tenant environments.

A quick workaround is to repartition the RDD to 50 partitions, but it has 
obvious downsides. 

It would be very helpful if the total/max number of executors could be also 
configured in the Resource Profile.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32578) PageRank not sending the correct values in Pergel sendMessage

2020-09-15 Thread Shay Elbaz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195948#comment-17195948
 ] 

Shay Elbaz commented on SPARK-32578:


It turned out the problem was in my benchmark, sorry about that.

> PageRank not sending the correct values in Pergel sendMessage
> -
>
> Key: SPARK-32578
> URL: https://issues.apache.org/jira/browse/SPARK-32578
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 2.3.0, 2.4.0, 3.0.0
>Reporter: Shay Elbaz
>Priority: Major
>
> The core sendMessage method is incorrect:
> {code:java}
> def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
>  if (edge.srcAttr._2 > tol) {
>Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
>   // *** THIS ^ ***
>  } else {
>Iterator.empty
>  }
> }{code}
>  
> Instead of using the source PR value, it's using the PR delta (2nd tuple 
> arg). This is not the documented behavior, nor a valid PR algorithm AFAIK.
> This is a 7 years old code, all versions affected.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32578) PageRank not sending the correct values in Pergel sendMessage

2020-08-09 Thread Shay Elbaz (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shay Elbaz updated SPARK-32578:
---
Description: 
The core sendMessage method is incorrect:
{code:java}
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
 if (edge.srcAttr._2 > tol) {
   Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
  // *** THIS ^ ***
 } else {
   Iterator.empty
 }
}{code}
 

Instead of using the source PR value, it's using the PR delta (2nd tuple arg). 
This is not the documented behavior, nor a valid PR algorithm AFAIK.

This is a 7 years old code, all versions affected.

 

 

  was:
The core sendMessage method is incorrect:
{code:java}
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
 if (edge.srcAttr._2 > tol) {
   Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
 //  THIS ^
 } else {
   Iterator.empty
 }
}{code}
 

Instead of sending the source PR value, it sends the PR delta. This is not the 
documented behavior, nor a valid PR algorithm AFAIK.

 

This is a 7 years old code, all versions affected.

 

 


> PageRank not sending the correct values in Pergel sendMessage
> -
>
> Key: SPARK-32578
> URL: https://issues.apache.org/jira/browse/SPARK-32578
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 2.3.0, 2.4.0, 3.0.0
>Reporter: Shay Elbaz
>Priority: Major
>
> The core sendMessage method is incorrect:
> {code:java}
> def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
>  if (edge.srcAttr._2 > tol) {
>Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
>   // *** THIS ^ ***
>  } else {
>Iterator.empty
>  }
> }{code}
>  
> Instead of using the source PR value, it's using the PR delta (2nd tuple 
> arg). This is not the documented behavior, nor a valid PR algorithm AFAIK.
> This is a 7 years old code, all versions affected.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32578) PageRank not sending the correct values in Pergel sendMessage

2020-08-09 Thread Shay Elbaz (Jira)
Shay Elbaz created SPARK-32578:
--

 Summary: PageRank not sending the correct values in Pergel 
sendMessage
 Key: SPARK-32578
 URL: https://issues.apache.org/jira/browse/SPARK-32578
 Project: Spark
  Issue Type: Bug
  Components: GraphX
Affects Versions: 3.0.0, 2.4.0, 2.3.0
Reporter: Shay Elbaz


The core sendMessage method is incorrect:
{code:java}
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
 if (edge.srcAttr._2 > tol) {
   Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
 //  THIS ^
 } else {
   Iterator.empty
 }
}{code}
 

Instead of sending the source PR value, it sends the PR delta. This is not the 
documented behavior, nor a valid PR algorithm AFAIK.

 

This is a 7 years old code, all versions affected.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27318) Join operation on bucketing table fails with base adaptive enabled

2020-07-22 Thread Shay Elbaz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162641#comment-17162641
 ] 

Shay Elbaz edited comment on SPARK-27318 at 7/22/20, 11:54 AM:
---

 

Was able to reproduce on 2.4.3, without external files/data

Executed via spark-shell, on HDP 2.6.5 cluster:

 
{code:java}
scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

// create bucketed table and avoid SPARK-24343:
scala> spark.range(1, 1 << 30, 1, 200).repartition(400, 
$"id").write.bucketBy(400, 
"id").sortBy("id").mode("overwrite").saveAsTable("test.buckets")
20/07/22 02:26:25 WARN HiveExternalCatalog: Persisting bucketed data source 
table `test`.`buckets` into Hive metastore in Spark SQL specific format, which 
is NOT compatible with Hive.

// join:
scala> spark.range(1000).join(spark.table("test.buckets"), Seq("id")).explain
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection 
requires all of its partitionings have the same numPartitions.
  at scala.Predef$.require(Predef.scala:224)
  at 
org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:291)
  at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
  at 
org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:150)
...
...
...
{code}
 

 


was (Author: shay_elbaz):
 

Was able to reproduce on 2.4.3, without external files/data

Executed via spark-shell, on HDP 2.6.5 cluster:

 
{code:java}
scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

// create bucketed table:
scala> spark.range(1000).repartition(4, $"id").write.bucketBy(4, 
"id").sortBy("id").mode("overwrite").saveAsTable("buckets")
20/07/22 02:26:25 WARN HiveExternalCatalog: Persisting bucketed data source 
table `shay_test`.`buckets` into Hive metastore in Spark SQL specific format, 
which is NOT compatible with Hive.

// join with pre-partitioned df:
scala> spark.range(100).repartition(4, $"id").join(spark.table("buckets"), 
Seq("id")).explain
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection 
requires all of its partitionings have the same numPartitions.
  at scala.Predef$.require(Predef.scala:224)
  at 
org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:291)
  at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
  at 
org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:150)
...
...
...
{code}
 

 

> Join operation on bucketing table fails with base adaptive enabled
> --
>
> Key: SPARK-27318
> URL: https://issues.apache.org/jira/browse/SPARK-27318
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Supritha
>Priority: Major
>
> Join Operation on bucketed table is failing.
> Steps to reproduce the issue.
> {code}
> spark.sql("set spark.sql.adaptive.enabled=true")
> {code}
> 1. Create table bukcet3 and bucket4 Table as below and load the data.
> {code}
> sql("create table bucket3(id3 int,country3 String, sports3 String) row format 
> delimited fields terminated by ','").show()
> sql("create table bucket4(id4 int,country4 String) row format delimited 
> fields terminated by ','").show()
> sql("load data local inpath '/opt/abhidata/bucket2.txt' into table 
> bucket3").show()
> sql("load data local inpath '/opt/abhidata/bucket3.txt' into table 
> bucket4").show()
> {code}
> 2. Create bucketing table as below
> {code}
> spark.sqlContext.table("bucket3").write.bucketBy(3, 
> "id3").saveAsTable("bucketed_table_3");
> spark.sqlContext.table("bucket4").write.bucketBy(4, 
> "id4").saveAsTable("bucketed_table_4");
> {code}
> 3. Execute the join query on the bucketed table 
> {code}
> sql("select * from bucketed_table_3 join bucketed_table_4 on 
> bucketed_table_3.id3 = bucketed_table_4.id4").show()
> {code}
>  
> {code:java}
> java.lang.IllegalArgumentException: requirement failed: 
> PartitioningCollection requires all of its partitionings have the same 
> numPartitions. at scala.Predef$.require(Predef.scala:224) at 
> org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:291)
>  at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
>  at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionA

[jira] [Comment Edited] (SPARK-27318) Join operation on bucketing table fails with base adaptive enabled

2020-07-22 Thread Shay Elbaz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162641#comment-17162641
 ] 

Shay Elbaz edited comment on SPARK-27318 at 7/22/20, 10:14 AM:
---

 

Was able to reproduce on 2.4.3, without external files/data

Executed via spark-shell, on HDP 2.6.5 cluster:

 
{code:java}
scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

// create bucketed table:
scala> spark.range(1000).repartition(4, $"id").write.bucketBy(4, 
"id").sortBy("id").mode("overwrite").saveAsTable("buckets")
20/07/22 02:26:25 WARN HiveExternalCatalog: Persisting bucketed data source 
table `shay_test`.`buckets` into Hive metastore in Spark SQL specific format, 
which is NOT compatible with Hive.

// join with pre-partitioned df:
scala> spark.range(100).repartition(4, $"id").join(spark.table("buckets"), 
Seq("id")).explain
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection 
requires all of its partitionings have the same numPartitions.
  at scala.Predef$.require(Predef.scala:224)
  at 
org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:291)
  at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
  at 
org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:150)
...
...
...
{code}
 

 


was (Author: shay_elbaz):
 

Was able to reproduce on 2.4.3, without external files/data

Executed via spark-shell, on HDP 2.6.5 cluster:

 
{code:java}
// create bucketed table:
scala> spark.range(1000).repartition(4, $"id").write.bucketBy(4, 
"id").sortBy("id").mode("overwrite").saveAsTable("buckets")
20/07/22 02:26:25 WARN HiveExternalCatalog: Persisting bucketed data source 
table `shay_test`.`buckets` into Hive metastore in Spark SQL specific format, 
which is NOT compatible with Hive.

// join with pre-partitioned df:
scala> spark.range(100).repartition(4, $"id").join(spark.table("buckets"), 
Seq("id")).explain
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection 
requires all of its partitionings have the same numPartitions.
  at scala.Predef$.require(Predef.scala:224)
  at 
org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:291)
  at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
  at 
org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:150)
...
...
...
{code}
 

 

> Join operation on bucketing table fails with base adaptive enabled
> --
>
> Key: SPARK-27318
> URL: https://issues.apache.org/jira/browse/SPARK-27318
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Supritha
>Priority: Major
>
> Join Operation on bucketed table is failing.
> Steps to reproduce the issue.
> {code}
> spark.sql("set spark.sql.adaptive.enabled=true")
> {code}
> 1. Create table bukcet3 and bucket4 Table as below and load the data.
> {code}
> sql("create table bucket3(id3 int,country3 String, sports3 String) row format 
> delimited fields terminated by ','").show()
> sql("create table bucket4(id4 int,country4 String) row format delimited 
> fields terminated by ','").show()
> sql("load data local inpath '/opt/abhidata/bucket2.txt' into table 
> bucket3").show()
> sql("load data local inpath '/opt/abhidata/bucket3.txt' into table 
> bucket4").show()
> {code}
> 2. Create bucketing table as below
> {code}
> spark.sqlContext.table("bucket3").write.bucketBy(3, 
> "id3").saveAsTable("bucketed_table_3");
> spark.sqlContext.table("bucket4").write.bucketBy(4, 
> "id4").saveAsTable("bucketed_table_4");
> {code}
> 3. Execute the join query on the bucketed table 
> {code}
> sql("select * from bucketed_table_3 join bucketed_table_4 on 
> bucketed_table_3.id3 = bucketed_table_4.id4").show()
> {code}
>  
> {code:java}
> java.lang.IllegalArgumentException: requirement failed: 
> PartitioningCollection requires all of its partitionings have the same 
> numPartitions. at scala.Predef$.require(Predef.scala:224) at 
> org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:291)
>  at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
>  at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:150)
>  at 
> org.apach

[jira] [Comment Edited] (SPARK-27318) Join operation on bucketing table fails with base adaptive enabled

2020-07-22 Thread Shay Elbaz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162641#comment-17162641
 ] 

Shay Elbaz edited comment on SPARK-27318 at 7/22/20, 9:37 AM:
--

 

Was able to reproduce on 2.4.3, without external files/data

Executed via spark-shell, on HDP 2.6.5 cluster:

 
{code:java}
// create bucketed table:
scala> spark.range(1000).repartition(4, $"id").write.bucketBy(4, 
"id").sortBy("id").mode("overwrite").saveAsTable("buckets")
20/07/22 02:26:25 WARN HiveExternalCatalog: Persisting bucketed data source 
table `shay_test`.`buckets` into Hive metastore in Spark SQL specific format, 
which is NOT compatible with Hive.

// join with pre-partitioned df:
scala> spark.range(100).repartition(4, $"id").join(spark.table("buckets"), 
Seq("id")).explain
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection 
requires all of its partitionings have the same numPartitions.
  at scala.Predef$.require(Predef.scala:224)
  at 
org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:291)
  at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
  at 
org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:150)
...
...
...
{code}
 

 


was (Author: shay_elbaz):
 

Was able to reproduce on 2.4.3.

Executed via spark-shell, on HDP 2.6.5 cluster:

 
{code:java}
// create bucketed table:
scala> spark.range(1000).repartition(4, $"id").write.bucketBy(4, 
"id").sortBy("id").mode("overwrite").saveAsTable("buckets")
20/07/22 02:26:25 WARN HiveExternalCatalog: Persisting bucketed data source 
table `shay_test`.`buckets` into Hive metastore in Spark SQL specific format, 
which is NOT compatible with Hive.

// join with pre-partitioned df:
scala> spark.range(100).repartition(4, $"id").join(spark.table("buckets"), 
Seq("id")).explain
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection 
requires all of its partitionings have the same numPartitions.
  at scala.Predef$.require(Predef.scala:224)
  at 
org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:291)
  at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
  at 
org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:150)
...
...
...
{code}
 

 

> Join operation on bucketing table fails with base adaptive enabled
> --
>
> Key: SPARK-27318
> URL: https://issues.apache.org/jira/browse/SPARK-27318
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Supritha
>Priority: Major
>
> Join Operation on bucketed table is failing.
> Steps to reproduce the issue.
> {code}
> spark.sql("set spark.sql.adaptive.enabled=true")
> {code}
> 1. Create table bukcet3 and bucket4 Table as below and load the data.
> {code}
> sql("create table bucket3(id3 int,country3 String, sports3 String) row format 
> delimited fields terminated by ','").show()
> sql("create table bucket4(id4 int,country4 String) row format delimited 
> fields terminated by ','").show()
> sql("load data local inpath '/opt/abhidata/bucket2.txt' into table 
> bucket3").show()
> sql("load data local inpath '/opt/abhidata/bucket3.txt' into table 
> bucket4").show()
> {code}
> 2. Create bucketing table as below
> {code}
> spark.sqlContext.table("bucket3").write.bucketBy(3, 
> "id3").saveAsTable("bucketed_table_3");
> spark.sqlContext.table("bucket4").write.bucketBy(4, 
> "id4").saveAsTable("bucketed_table_4");
> {code}
> 3. Execute the join query on the bucketed table 
> {code}
> sql("select * from bucketed_table_3 join bucketed_table_4 on 
> bucketed_table_3.id3 = bucketed_table_4.id4").show()
> {code}
>  
> {code:java}
> java.lang.IllegalArgumentException: requirement failed: 
> PartitioningCollection requires all of its partitionings have the same 
> numPartitions. at scala.Predef$.require(Predef.scala:224) at 
> org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:291)
>  at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
>  at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:150)
>  at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchang

[jira] [Commented] (SPARK-27318) Join operation on bucketing table fails with base adaptive enabled

2020-07-22 Thread Shay Elbaz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162641#comment-17162641
 ] 

Shay Elbaz commented on SPARK-27318:


 

Was able to reproduce on 2.4.3.

Executed via spark-shell, on HDP 2.6.5 cluster:

 
{code:java}
// create bucketed table:
scala> spark.range(1000).repartition(4, $"id").write.bucketBy(4, 
"id").sortBy("id").mode("overwrite").saveAsTable("buckets")
20/07/22 02:26:25 WARN HiveExternalCatalog: Persisting bucketed data source 
table `shay_test`.`buckets` into Hive metastore in Spark SQL specific format, 
which is NOT compatible with Hive.

// join with pre-partitioned df:
scala> spark.range(100).repartition(4, $"id").join(spark.table("buckets"), 
Seq("id")).explain
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection 
requires all of its partitionings have the same numPartitions.
  at scala.Predef$.require(Predef.scala:224)
  at 
org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:291)
  at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
  at 
org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:150)
...
...
...
{code}
 

 

> Join operation on bucketing table fails with base adaptive enabled
> --
>
> Key: SPARK-27318
> URL: https://issues.apache.org/jira/browse/SPARK-27318
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Supritha
>Priority: Major
>
> Join Operation on bucketed table is failing.
> Steps to reproduce the issue.
> {code}
> spark.sql("set spark.sql.adaptive.enabled=true")
> {code}
> 1. Create table bukcet3 and bucket4 Table as below and load the data.
> {code}
> sql("create table bucket3(id3 int,country3 String, sports3 String) row format 
> delimited fields terminated by ','").show()
> sql("create table bucket4(id4 int,country4 String) row format delimited 
> fields terminated by ','").show()
> sql("load data local inpath '/opt/abhidata/bucket2.txt' into table 
> bucket3").show()
> sql("load data local inpath '/opt/abhidata/bucket3.txt' into table 
> bucket4").show()
> {code}
> 2. Create bucketing table as below
> {code}
> spark.sqlContext.table("bucket3").write.bucketBy(3, 
> "id3").saveAsTable("bucketed_table_3");
> spark.sqlContext.table("bucket4").write.bucketBy(4, 
> "id4").saveAsTable("bucketed_table_4");
> {code}
> 3. Execute the join query on the bucketed table 
> {code}
> sql("select * from bucketed_table_3 join bucketed_table_4 on 
> bucketed_table_3.id3 = bucketed_table_4.id4").show()
> {code}
>  
> {code:java}
> java.lang.IllegalArgumentException: requirement failed: 
> PartitioningCollection requires all of its partitionings have the same 
> numPartitions. at scala.Predef$.require(Predef.scala:224) at 
> org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:291)
>  at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
>  at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:150)
>  at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.List.foreach(List.scala:392) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.immutable.List.map(List.scala:296) at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:149)
>  at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
>  at 
> org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:296)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:282)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:282)
>  at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:281) 
> at 
> org

[jira] [Commented] (SPARK-30399) Bucketing does not compatible with partitioning in practice

2020-06-07 Thread Shay Elbaz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17127548#comment-17127548
 ] 

Shay Elbaz commented on SPARK-30399:


Hi [~hyukjin.kwon], thanks for replying. 

Perhaps the issue was misunderstood - 
Spark will fail to read from bucketed table with partitions, and there is no 
way around it. This might happen only after some partitions were added (if not 
immediately), so users might encounter this only after deployment.

Please consider this again.

> Bucketing does not compatible with partitioning in practice
> ---
>
> Key: SPARK-30399
> URL: https://issues.apache.org/jira/browse/SPARK-30399
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: HDP 2.7
>Reporter: Shay Elbaz
>Priority: Minor
>
> When using Spark Bucketed table, Spark would use as many partitions as the 
> number of buckets for the map-side join 
> (_FileSourceScanExec.createBucketedReadRDD_). This works great for "static" 
> tables, but quite disastrous for _time-partitioned_ tables. In our use case, 
> a daily partitioned key-value table is added 100GB of data every day. So in 
> 100 days there are 10TB of data we want to join with. Aiming to this 
> scenario, we need thousands of buckets if we want every task to successfully 
> *read and sort* all of it's data in a map-side join. But in such case, every 
> daily increment would emit thousands of small files, leading to other big 
> issues.
> In practice, and with a hope for some hidden optimization, we set the number 
> of buckets to 1000 and backfilled such a table with 10TB. When trying to join 
> with the smallest input, every executor was killed by Yarn due to over 
> allocating memory in the sorting phase. Even without such failures, it would 
> take every executor unreasonably amount of time to locally sort all its data.
> A question on SO remained unanswered for a while, so I thought asking here - 
> is it by design that buckets cannot be used in time-partitioned table, or am 
> I doing something wrong?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30399) Bucketing does not compatible with partitioning in practice

2019-12-31 Thread Shay Elbaz (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shay Elbaz updated SPARK-30399:
---
Description: 
When using Spark Bucketed table, Spark would use as many partitions as the 
number of buckets for the map-side join 
(_FileSourceScanExec.createBucketedReadRDD_). This works great for "static" 
tables, but quite disastrous for _time-partitioned_ tables. In our use case, a 
daily partitioned key-value table is added 100GB of data every day. So in 100 
days there are 10TB of data we want to join with. Aiming to this scenario, we 
need thousands of buckets if we want every task to successfully *read and sort* 
all of it's data in a map-side join. But in such case, every daily increment 
would emit thousands of small files, leading to other big issues.

In practice, and with a hope for some hidden optimization, we set the number of 
buckets to 1000 and backfilled such a table with 10TB. When trying to join with 
the smallest input, every executor was killed by Yarn due to over allocating 
memory in the sorting phase. Even without such failures, it would take every 
executor unreasonably amount of time to locally sort all its data.

A question on SO remained unanswered for a while, so I thought asking here - is 
it by design that buckets cannot be used in time-partitioned table, or am I 
doing something wrong?

  was:
When using Spark Bucketed table, Spark would use as many partitions as the 
number of buckets for the map-side join 
(_FileSourceScanExec.createBucketedReadRDD_). This works great for "static" 
tables, but quite disastrous for _time-partitioned_ tables. In our use case, a 
daily partitioned key-value table is added 100GB of data every day. So in 100 
days there are 10TB of data we want to join with - aiming to this scenario, we 
need thousands of buckets if we want every task to successfully *read and sort* 
all of it's data in a map-side join. But in such case, every daily increment 
would emit thousands of small files, leading to other big issues. 

In practice, and with a hope for some hidden optimization, we set the number of 
buckets to 1000 and backfilled such a table with 10TB. When trying to join with 
the smallest input, every executor was killed by Yarn due to over allocating 
memory in the sorting phase. Even without such failures, it would take every 
executor unreasonably amount of time to locally sort all its data.

A question on SO remained unanswered for a while, so I thought asking here - is 
it by design that buckets cannot be used in time-partitioned table, or am I 
doing something wrong?


> Bucketing does not compatible with partitioning in practice
> ---
>
> Key: SPARK-30399
> URL: https://issues.apache.org/jira/browse/SPARK-30399
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: HDP 2.7
>Reporter: Shay Elbaz
>Priority: Minor
>
> When using Spark Bucketed table, Spark would use as many partitions as the 
> number of buckets for the map-side join 
> (_FileSourceScanExec.createBucketedReadRDD_). This works great for "static" 
> tables, but quite disastrous for _time-partitioned_ tables. In our use case, 
> a daily partitioned key-value table is added 100GB of data every day. So in 
> 100 days there are 10TB of data we want to join with. Aiming to this 
> scenario, we need thousands of buckets if we want every task to successfully 
> *read and sort* all of it's data in a map-side join. But in such case, every 
> daily increment would emit thousands of small files, leading to other big 
> issues.
> In practice, and with a hope for some hidden optimization, we set the number 
> of buckets to 1000 and backfilled such a table with 10TB. When trying to join 
> with the smallest input, every executor was killed by Yarn due to over 
> allocating memory in the sorting phase. Even without such failures, it would 
> take every executor unreasonably amount of time to locally sort all its data.
> A question on SO remained unanswered for a while, so I thought asking here - 
> is it by design that buckets cannot be used in time-partitioned table, or am 
> I doing something wrong?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30399) Bucketing does not compatible with partitioning in practice

2019-12-31 Thread Shay Elbaz (Jira)
Shay Elbaz created SPARK-30399:
--

 Summary: Bucketing does not compatible with partitioning in 
practice
 Key: SPARK-30399
 URL: https://issues.apache.org/jira/browse/SPARK-30399
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
 Environment: HDP 2.7
Reporter: Shay Elbaz


When using Spark Bucketed table, Spark would use as many partitions as the 
number of buckets for the map-side join 
(_FileSourceScanExec.createBucketedReadRDD_). This works great for "static" 
tables, but quite disastrous for _time-partitioned_ tables. In our use case, a 
daily partitioned key-value table is added 100GB of data every day. So in 100 
days there are 10TB of data we want to join with - aiming to this scenario, we 
need thousands of buckets if we want every task to successfully *read and sort* 
all of it's data in a map-side join. But in such case, every daily increment 
would emit thousands of small files, leading to other big issues. 

In practice, and with a hope for some hidden optimization, we set the number of 
buckets to 1000 and backfilled such a table with 10TB. When trying to join with 
the smallest input, every executor was killed by Yarn due to over allocating 
memory in the sorting phase. Even without such failures, it would take every 
executor unreasonably amount of time to locally sort all its data.

A question on SO remained unanswered for a while, so I thought asking here - is 
it by design that buckets cannot be used in time-partitioned table, or am I 
doing something wrong?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30089) count over Window function with orderBy gives wrong results

2019-12-01 Thread Shay Elbaz (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shay Elbaz updated SPARK-30089:
---
Description: 
Please consider the following data, where *event_id* has 5 non unique values, 
and *time* is some boolean value:
{code:java}
val df = spark
  .range(20)
  .drop("id")
  .withColumn("event_id", (rand() * 5).cast("int"))
  .withColumn("secondary_key", rand())
  .withColumn("time", (rand() * 2).cast("int"))
{code}
output:
{noformat}
++---++
|event_id|secondary_key  |time|
++---++
|4   |0.9772771523180686 |0   |
|2   |0.9334658337212178 |1   |
|3   |0.19471070128057155|0   |
|3   |0.7199139320519544 |1   |
|0   |0.2950226274440527 |0   |
|1   |0.26756419276811183|0   |
|0   |0.8505002394080461 |1   |
|2   |0.43758689359535163|1   |
|1   |0.9328102324257992 |0   |
|2   |0.9829272033815031 |0   |
|3   |0.4579860738704702 |0   |
|1   |0.9220937240097472 |1   |
|1   |0.5145520547685413 |0   |
|2   |0.11314407779922231|0   |
|2   |0.42837936719991054|1   |
|3   |0.23501843822326307|1   |
|2   |0.20053336248248554|0   |
|3   |0.07781415213387388|0   |
|3   |0.633004353367962  |0   |
|3   |0.4071317068782465 |0   |
++---++{noformat}
 

Now we would like to get the event index in every *time,* and also the total 
rows per *time:*
{code:java}
val spec = Window.partitionBy("time").orderBy("event_id", "secondary_key")
df
  .withColumn("event_index", row_number().over(spec))
  .withColumn("events_in_time", count("event_id").over(spec))
  .show(false)
{code}
 

It seems that _orderBy_ has some side effect, as per this output (see 
events_in_time):
{noformat}
++---++---+--+
|event_id|secondary_key  |time|event_index|events_in_time|
++---++---+--+
|0   |0.46503911208798054|1   |1  |1 |
|1   |0.39987355658705015|1   |2  |2 |
|1   |0.5691951918819504 |1   |3  |3 |
|2   |0.07400147458165662|1   |4  |4 |
|2   |0.7592681952170066 |1   |5  |5 |
|3   |0.02912532019167091|1   |6  |6 |
|3   |0.8055599468620407 |1   |7  |7 |
|4   |0.2145552471806751 |1   |8  |8 |
|4   |0.9898589033586774 |1   |9  |9 |
|0   |0.39486528440812896|0   |1  |1 |
|1   |0.2861869575899465 |0   |2  |2 |
|1   |0.83560556569591   |0   |3  |3 |
|2   |0.09764393740040855|0   |4  |4 |
|2   |0.1372111795261538 |0   |5  |5 |
|2   |0.18723423836738395|0   |6  |6 |
|2   |0.5326764866419712 |0   |7  |7 |
|3   |0.93985884066349   |0   |8  |8 |
|3   |0.9956976178321568 |0   |9  |9 |
|4   |0.6508676154889343 |0   |10 |10|
|4   |0.6664965696641834 |0   |11 |11|
++---++---+--+{noformat}
 

I expected to see 2 distinct values in events_in_time, 13 and 7 for foo=0 and 
foo=1 respectively. *We do get the expected results when omitting orderBy from 
window spec,* which lead me to believe there _is_ a bug in this plan.

  was:
Please consider the following data, where *event_id* has 5 non unique values, 
and *time* is some boolean value:

 

 
{code:java}
val df = spark
  .range(20)
  .drop("id")
  .withColumn("event_id", (rand() * 5).cast("int"))
  .withColumn("secondary_key", rand())
  .withColumn("time", (rand() * 2).cast("int"))
{code}
 

output:

 
{noformat}
++---++
|event_id|secondary_key  |time|
++---++
|4   |0.9772771523180686 |0   |
|2   |0.9334658337212178 |1   |
|3   |0.19471070128057155|0   |
|3   |0.7199139320519544 |1   |
|0   |0.2950226274440527 |0   |
|1   |0.26756419276811183|0   |
|0   |0.8505002394080461 |1   |
|2   |0.43758689359535163|1   |
|1   |0.9328102324257992 |0   |
|2   |0.9829272033815031 |0   |
|3   |0.4579860738704702 |0   |
|1   |0.9220937240097472 |1   |
|1   |0.5145520547685413 |0   |
|2   |0.11314407779922231|0   |
|2   |0.42837936719991054|1   |
|3   |0.23501843822326307|1   |
|2   |0.20053336248248554|0   |
|3   |0.07781415213387388|0   |
|3   |0.633004353367962  |0   |
|3   |0.4071317068782465 |0   |
++---++{noformat}
 

 

Now we would like to get the event index in every *time,* and also the total 
rows per *time:*

 

 
{code:java}
val spec = Window.partitionBy("time").orderBy("event_i

[jira] [Created] (SPARK-30089) count over Window function with orderBy gives wrong results

2019-12-01 Thread Shay Elbaz (Jira)
Shay Elbaz created SPARK-30089:
--

 Summary: count over Window function with orderBy gives wrong 
results
 Key: SPARK-30089
 URL: https://issues.apache.org/jira/browse/SPARK-30089
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Shay Elbaz


Please consider the following data, where *event_id* has 5 non unique values, 
and *time* is some boolean value:

 

 
{code:java}
val df = spark
  .range(20)
  .drop("id")
  .withColumn("event_id", (rand() * 5).cast("int"))
  .withColumn("secondary_key", rand())
  .withColumn("time", (rand() * 2).cast("int"))
{code}
 

output:

 
{noformat}
++---++
|event_id|secondary_key  |time|
++---++
|4   |0.9772771523180686 |0   |
|2   |0.9334658337212178 |1   |
|3   |0.19471070128057155|0   |
|3   |0.7199139320519544 |1   |
|0   |0.2950226274440527 |0   |
|1   |0.26756419276811183|0   |
|0   |0.8505002394080461 |1   |
|2   |0.43758689359535163|1   |
|1   |0.9328102324257992 |0   |
|2   |0.9829272033815031 |0   |
|3   |0.4579860738704702 |0   |
|1   |0.9220937240097472 |1   |
|1   |0.5145520547685413 |0   |
|2   |0.11314407779922231|0   |
|2   |0.42837936719991054|1   |
|3   |0.23501843822326307|1   |
|2   |0.20053336248248554|0   |
|3   |0.07781415213387388|0   |
|3   |0.633004353367962  |0   |
|3   |0.4071317068782465 |0   |
++---++{noformat}
 

 

Now we would like to get the event index in every *time,* and also the total 
rows per *time:*

 

 
{code:java}
val spec = Window.partitionBy("time").orderBy("event_id", "secondary_key")
df
  .withColumn("event_index", row_number().over(spec))
  .withColumn("events_in_time", count("event_id").over(spec))
  .show(false)
{code}
 

It seems that _orderBy_ has some side effect, as per this output (see 
events_in_time):

 
{noformat}
++---++---+--+
|event_id|secondary_key  |time|event_index|events_in_time|
++---++---+--+
|0   |0.46503911208798054|1   |1  |1 |
|1   |0.39987355658705015|1   |2  |2 |
|1   |0.5691951918819504 |1   |3  |3 |
|2   |0.07400147458165662|1   |4  |4 |
|2   |0.7592681952170066 |1   |5  |5 |
|3   |0.02912532019167091|1   |6  |6 |
|3   |0.8055599468620407 |1   |7  |7 |
|4   |0.2145552471806751 |1   |8  |8 |
|4   |0.9898589033586774 |1   |9  |9 |
|0   |0.39486528440812896|0   |1  |1 |
|1   |0.2861869575899465 |0   |2  |2 |
|1   |0.83560556569591   |0   |3  |3 |
|2   |0.09764393740040855|0   |4  |4 |
|2   |0.1372111795261538 |0   |5  |5 |
|2   |0.18723423836738395|0   |6  |6 |
|2   |0.5326764866419712 |0   |7  |7 |
|3   |0.93985884066349   |0   |8  |8 |
|3   |0.9956976178321568 |0   |9  |9 |
|4   |0.6508676154889343 |0   |10 |10|
|4   |0.6664965696641834 |0   |11 |11|
++---++---+--+{noformat}
I expected to see 2 distinct values in events_in_time, 13 and 7 for foo=0 and 
foo=1 respectively. *We do get the expected results when omitting orderBy from 
window spec,* which lead me to believe there _is_ a bug in this plan.

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26438) Driver waits to spark.sql.broadcastTimeout before throwing OutOfMemoryError - is this by design?

2018-12-25 Thread Shay Elbaz (JIRA)
Shay Elbaz created SPARK-26438:
--

 Summary: Driver waits to spark.sql.broadcastTimeout before 
throwing OutOfMemoryError - is this by design?
 Key: SPARK-26438
 URL: https://issues.apache.org/jira/browse/SPARK-26438
 Project: Spark
  Issue Type: Question
  Components: Spark Core, SQL
Affects Versions: 2.3.0
Reporter: Shay Elbaz


When broadcasting too large DataFrame, the driver does not fail immediately 
when the broadcast thread throws OutOfMemoryError. Instead it waits for 
`spark.sql.broadcastTimeout` to meet. Is that by design or a bug?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19256) Hive bucketing support

2018-11-27 Thread Shay Elbaz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700067#comment-16700067
 ] 

Shay Elbaz commented on SPARK-19256:


[~chengsu] this is great! If there is anything I can do to assist please ask :)

> Hive bucketing support
> --
>
> Key: SPARK-19256
> URL: https://issues.apache.org/jira/browse/SPARK-19256
> Project: Spark
>  Issue Type: Umbrella
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Tejas Patil
>Priority: Minor
>
> JIRA to track design discussions and tasks related to Hive bucketing support 
> in Spark.
> Proposal : 
> https://docs.google.com/document/d/1a8IDh23RAkrkg9YYAeO51F4aGO8-xAlupKwdshve2fc/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19256) Hive bucketing support

2018-10-09 Thread Shay Elbaz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643828#comment-16643828
 ] 

Shay Elbaz commented on SPARK-19256:


+1

[~tejasp] is this still under progress?

> Hive bucketing support
> --
>
> Key: SPARK-19256
> URL: https://issues.apache.org/jira/browse/SPARK-19256
> Project: Spark
>  Issue Type: Umbrella
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Tejas Patil
>Priority: Minor
>
> JIRA to track design discussions and tasks related to Hive bucketing support 
> in Spark.
> Proposal : 
> https://docs.google.com/document/d/1a8IDh23RAkrkg9YYAeO51F4aGO8-xAlupKwdshve2fc/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-25 Thread Shay Elbaz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1678#comment-1678
 ] 

Shay Elbaz commented on SPARK-24904:


[~mgaido] Technically you *can* that, you just need an additional shuffle 
(after the map side join) to fill in the missing rows as you mentioned. And 
since the current implementation already shuffles, I don't see how it makes 
sense to involve the entire big table in the shuffle. Instead, Spark could do 
the following:
 # Broadcast the small table.
 # Just link inner join, load the big table and hash-join. 
The output is (expected to be) very small compared to the big table.
 # Keep the small table broadcasted, and shuffle the results from last stage 
(say, sort-merge).
 # Now on each task, fill in missing rows from the broadcasted table. This is 
trivial if using sort-merge and the broadcasted table is just another partition 
to merge. 

As I mentioned in the description, this is can be achieved by the user using 2 
joins, but shouldn't Spark offer this by default? Needless to say how sub 
optimal the current implementation is compared to the above plan. Am I missing 
something?

> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Question
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan does not 
> include broadcasting the small table. But when the join is on the large DF 
> side, the broadcast does take place. Is there a good reason for this? In the 
> below example it sure doesn't make any sense to shuffle the entire large 
> table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-25 Thread Shay Elbaz (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shay Elbaz updated SPARK-24904:
---
Issue Type: Improvement  (was: Question)

> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan does not 
> include broadcasting the small table. But when the join is on the large DF 
> side, the broadcast does take place. Is there a good reason for this? In the 
> below example it sure doesn't make any sense to shuffle the entire large 
> table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-25 Thread Shay Elbaz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555769#comment-16555769
 ] 

Shay Elbaz commented on SPARK-24904:


[~mgaido] indeed this assumption is not always true. However the map side 
result will always be smaller or equal to the big table, that's why I think 
this approach is better. Kind of filter push down :)

> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan falls back to 
> sort merge join. But when the join is on the large DF side, the broadcast 
> does take place. Is there a good reason for this? In the below example it 
> sure doesn't make any sense to shuffle the entire large table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-25 Thread Shay Elbaz (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shay Elbaz updated SPARK-24904:
---
Description: 
When joining a "large" dataframe with broadcasted small one, and join-type is 
on the small DF side (see right-join below), the physical plan falls back to 
sort merge join. But when the join is on the large DF side, the broadcast does 
take place. Is there a good reason for this? In the below example it sure 
doesn't make any sense to shuffle the entire large table:

 
{code:java}
val small = spark.range(1, 10)
val big = spark.range(1, 1 << 30)
  .withColumnRenamed("id", "id2")

big.join(broadcast(small), $"id" === $"id2", "right")
.explain

//OUTPUT:
== Physical Plan == 
SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
:- *Sort [id2#16307L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(id2#16307L, 1000)
 : +- *Project [id#16304L AS id2#16307L]
 :    +- *Range (1, 1073741824, step=1, splits=Some(600))
 +- *Sort [id#16310L ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(id#16310L, 1000)
   +- *Range (1, 10, step=1, splits=Some(600))
{code}
As a workaround, users need to perform inner instead of right join, and then 
join the result back with the small DF to fill the missing rows.

 

 

 

 

  was:
When joining a "large" dataframe with broadcasted small one, and join-type is 
on the small DF side (see right-join below), the physical plan does not include 
broadcasting the small table. But when the join is on the large DF side, the 
broadcast does take place. Is there a good reason for this? In the below 
example it sure doesn't make any sense to shuffle the entire large table:

 
{code:java}
val small = spark.range(1, 10)
val big = spark.range(1, 1 << 30)
  .withColumnRenamed("id", "id2")

big.join(broadcast(small), $"id" === $"id2", "right")
.explain

//OUTPUT:
== Physical Plan == 
SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
:- *Sort [id2#16307L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(id2#16307L, 1000)
 : +- *Project [id#16304L AS id2#16307L]
 :    +- *Range (1, 1073741824, step=1, splits=Some(600))
 +- *Sort [id#16310L ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(id#16310L, 1000)
   +- *Range (1, 10, step=1, splits=Some(600))
{code}
As a workaround, users need to perform inner instead of right join, and then 
join the result back with the small DF to fill the missing rows.

 

 

 

 


> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan falls back to 
> sort merge join. But when the join is on the large DF side, the broadcast 
> does take place. Is there a good reason for this? In the below example it 
> sure doesn't make any sense to shuffle the entire large table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-24 Thread Shay Elbaz (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shay Elbaz updated SPARK-24904:
---
Description: 
When joining a "large" dataframe with broadcasted small one, and join-type is 
on the small DF side (see right-join below), the physical plan does not include 
broadcasting the small table. But when the join is on the large DF side, the 
broadcast does take place. Is there a good reason for this? In the below 
example it sure doesn't make any sense to shuffle the entire large table:

 
{code:java}
val small = spark.range(1, 10)
val big = spark.range(1, 1 << 30)
  .withColumnRenamed("id", "id2")

big.join(broadcast(small), $"id" === $"id2", "right")
.explain

//OUTPUT:
== Physical Plan == 
SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
:- *Sort [id2#16307L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(id2#16307L, 1000)
 : +- *Project [id#16304L AS id2#16307L]
 :    +- *Range (1, 1073741824, step=1, splits=Some(600))
 +- *Sort [id#16310L ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(id#16310L, 1000)
   +- *Range (1, 10, step=1, splits=Some(600))
{code}
As a workaround, users need to perform inner instead of right join, and then 
join the result back with the small DF to fill the missing rows.

 

 

 

 

  was:
When joining a "large" dataframe with broadcasted small one, and join-type is 
on the small DF side (see right-join below), the physical plan does not include 
broadcasting the small table. But when the join is on the large DF side, the 
broadcast does take place. Is there a good reason for this? In the below 
example it sure doesn't make any sense to shuffle the entire large table:

 
{code:java}
val small = spark.range(1, 10)
val big = spark.range(1, 1 << 30)
  .withColumnRenamed("id", "id2")

big.join(broadcast(small), $"id" === $"id2", "right")
.explain


== Physical Plan == 
SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
:- *Sort [id2#16307L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(id2#16307L, 1000)
 : +- *Project [id#16304L AS id2#16307L]
 :    +- *Range (1, 1073741824, step=1, splits=Some(600))
 +- *Sort [id#16310L ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(id#16310L, 1000)
   +- *Range (1, 10, step=1, splits=Some(600))
{code}
As a workaround, users need to perform inner instead of right join, and then 
join the result back with the small DF to fill the missing rows.

 

 

 

 


> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Question
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan does not 
> include broadcasting the small table. But when the join is on the large DF 
> side, the broadcast does take place. Is there a good reason for this? In the 
> below example it sure doesn't make any sense to shuffle the entire large 
> table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-24 Thread Shay Elbaz (JIRA)
Shay Elbaz created SPARK-24904:
--

 Summary: Join with broadcasted dataframe causes shuffle of 
redundant data
 Key: SPARK-24904
 URL: https://issues.apache.org/jira/browse/SPARK-24904
 Project: Spark
  Issue Type: Question
  Components: SQL
Affects Versions: 2.1.2
Reporter: Shay Elbaz


When joining a "large" dataframe with broadcasted small one, and join-type is 
on the small DF side (see right-join below), the physical plan does not include 
broadcasting the small table. But when the join is on the large DF side, the 
broadcast does take place. Is there a good reason for this? In the below 
example it sure doesn't make any sense to shuffle the entire large table:

 
{code:java}
val small = spark.range(1, 10)
val big = spark.range(1, 1 << 30)
  .withColumnRenamed("id", "id2")

big.join(broadcast(small), $"id" === $"id2", "right")
.explain


== Physical Plan == 
SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
:- *Sort [id2#16307L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(id2#16307L, 1000)
 : +- *Project [id#16304L AS id2#16307L]
 :    +- *Range (1, 1073741824, step=1, splits=Some(600))
 +- *Sort [id#16310L ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(id#16310L, 1000)
   +- *Range (1, 10, step=1, splits=Some(600))
{code}
As a workaround, users need to perform inner instead of right join, and then 
join the result back with the small DF to fill the missing rows.

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5377) Dynamically add jar into Spark Driver's classpath.

2018-02-19 Thread Shay Elbaz (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369112#comment-16369112
 ] 

Shay Elbaz commented on SPARK-5377:
---

+1

This seems like a very useful improvement and will save us many current 
workarounds.

Any specific reason for why was this closed?

> Dynamically add jar into Spark Driver's classpath.
> --
>
> Key: SPARK-5377
> URL: https://issues.apache.org/jira/browse/SPARK-5377
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Chengxiang Li
>Priority: Major
>
> Spark support dynamically add jar to executor classpath through 
> SparkContext::addJar(), while it does not support dynamically add jar into 
> driver classpath. In most case(if not all the case), user dynamically add jar 
> with SparkContext::addJar()  because some classes from the jar would be 
> referred in upcoming Spark job, which means the classes need to be loaded in 
> Spark driver side either,e.g during serialization. I think it make sense to 
> add an API to add jar into driver classpath, or just make it available in 
> SparkContext::addJar(). HIVE-9410 is a real case from Hive on Spark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org