[jira] [Commented] (SPARK-24528) Add support to read multiple sorted bucket files for data source v1

2021-07-07 Thread Rahij Ramsharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376475#comment-17376475
 ] 

Rahij Ramsharan commented on SPARK-24528:
-

Great, thank you!

> Add support to read multiple sorted bucket files for data source v1
> ---
>
> Key: SPARK-24528
> URL: https://issues.apache.org/jira/browse/SPARK-24528
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Ohad Raviv
>Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-24528#Closely related to  
> SPARK-24410, we're trying to optimize a very common use case we have of 
> getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still 
> "waste" time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("key", "t1")
> .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
> key#24L, t1, t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, 
> t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
> Format: Parquet, Location: ...{code}
>  
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
> key#32L, t1, t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, 
> t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
> Format: Parquet, Location: ...
> {code}
>  
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
>   // In case of bucketing, its possible to have multiple files belonging to 
> the
>   // same bucket in a given relation. Each of these files are locally sorted
>   // but those files combined together are not globally sorted. Given that,
>   // the RDD partition will not be sorted even if the relation has sort 
> columns set
>   // Current solution is to check if all the buckets have a single file in it
>   val files = selectedPartitions.flatMap(partition => partition.files)
>   val bucketToFilesGrouping =
> files.map(_.getPath.getName).groupBy(file => 
> BucketingUtils.getBucketId(file))
>   val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
> 1){code}
> so obviously the code avoids dealing with this situation now..
> could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24528) Add support to read multiple sorted bucket files for data source v1

2021-07-05 Thread Rahij Ramsharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17374872#comment-17374872
 ] 

Rahij Ramsharan commented on SPARK-24528:
-

Hi [~chengsu], I wanted to see if there was any progress on this since 
[https://github.com/apache/spark/pull/29625 
|https://github.com/apache/spark/pull/29625]. We've been using that PR on our 
fork and it has been working fine, but would be nice to move off of that to a 
commit that has been merged instead.

> Add support to read multiple sorted bucket files for data source v1
> ---
>
> Key: SPARK-24528
> URL: https://issues.apache.org/jira/browse/SPARK-24528
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Ohad Raviv
>Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-24528#Closely related to  
> SPARK-24410, we're trying to optimize a very common use case we have of 
> getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still 
> "waste" time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("key", "t1")
> .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
> key#24L, t1, t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, 
> t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
> Format: Parquet, Location: ...{code}
>  
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
> key#32L, t1, t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, 
> t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
> Format: Parquet, Location: ...
> {code}
>  
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
>   // In case of bucketing, its possible to have multiple files belonging to 
> the
>   // same bucket in a given relation. Each of these files are locally sorted
>   // but those files combined together are not globally sorted. Given that,
>   // the RDD partition will not be sorted even if the relation has sort 
> columns set
>   // Current solution is to check if all the buckets have a single file in it
>   val files = selectedPartitions.flatMap(partition => partition.files)
>   val bucketToFilesGrouping =
> files.map(_.getPath.getName).groupBy(file => 
> BucketingUtils.getBucketId(file))
>   val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
> 1){code}
> so obviously the code avoids dealing with this situation now..
> could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32544) Bucketing and Partitioning information are not passed on to non FileFormat datasource writes

2020-08-06 Thread Rahij Ramsharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17172313#comment-17172313
 ] 

Rahij Ramsharan commented on SPARK-32544:
-

[~hyukjin.kwon] do you happen to know when that is planned to happen?

> Bucketing and Partitioning information are not passed on to non FileFormat 
> datasource writes
> 
>
> Key: SPARK-32544
> URL: https://issues.apache.org/jira/browse/SPARK-32544
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 3.0.0
>Reporter: Rahij Ramsharan
>Priority: Major
>
> When writing to a FileFormat datasource, bucket spec and partition columns 
> are passed into InsertIntoHadoopFsRelationCommand: 
> [https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L474-L475].
>  
> However, from what I can tell, the RelationProvider API does not have a way 
> to pass in these: 
> [https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L511-L513].
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32544) Bucketing and Partitioning information are not passed on to non FileFormat datasource writes

2020-08-05 Thread Rahij Ramsharan (Jira)
Rahij Ramsharan created SPARK-32544:
---

 Summary: Bucketing and Partitioning information are not passed on 
to non FileFormat datasource writes
 Key: SPARK-32544
 URL: https://issues.apache.org/jira/browse/SPARK-32544
 Project: Spark
  Issue Type: Improvement
  Components: Input/Output
Affects Versions: 3.0.0
Reporter: Rahij Ramsharan


When writing to a FileFormat datasource, bucket spec and partition columns are 
passed into InsertIntoHadoopFsRelationCommand: 
[https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L474-L475].

 

However, from what I can tell, the RelationProvider API does not have a way to 
pass in these: 
[https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L511-L513].
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-29259) Filesystem.exists is called even when not necessary for append save mode

2019-09-26 Thread Rahij Ramsharan (Jira)
Rahij Ramsharan created SPARK-29259:
---

 Summary: Filesystem.exists is called even when not necessary for 
append save mode
 Key: SPARK-29259
 URL: https://issues.apache.org/jira/browse/SPARK-29259
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.4
Reporter: Rahij Ramsharan


When saving a dataframe into Hadoop 
([https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala#L93]),
 spark first checks if the file exists before inspecting the SaveMode to 
determine if it should actually insert data. However, the pathExists variable 
is actually not used in the case of SaveMode.Append. In some file systems, the 
exists call can be expensive and hence this PR makes that call only when 
necessary.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19528) external shuffle service would close while still have request from executor when dynamic allocation is enabled

2017-07-25 Thread Rahij Ramsharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100357#comment-16100357
 ] 

Rahij Ramsharan commented on SPARK-19528:
-

Hi, has this issue been resolved? Seeing something similar to [~KaiXu] on spark 
2.3.0, hadoop 2.6.0-cdh5.11.0

> external shuffle service would close while still have request from executor 
> when dynamic allocation is enabled 
> ---
>
> Key: SPARK-19528
> URL: https://issues.apache.org/jira/browse/SPARK-19528
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Shuffle, Spark Core
>Affects Versions: 1.6.2
> Environment: Hadoop2.7.1
> spark1.6.2
> hive2.2
>Reporter: KaiXu
>
> when dynamic allocation is enabled, the external shuffle service is used for 
> maintain the unfinished status between executors. So the external shuffle 
> service should not close before the executor while still have request from 
> executor.
> container's log:
> {noformat}
> 17/02/09 08:30:46 INFO executor.CoarseGrainedExecutorBackend: Connecting to 
> driver: spark://CoarseGrainedScheduler@192.168.1.1:41867
> 17/02/09 08:30:46 INFO executor.CoarseGrainedExecutorBackend: Successfully 
> registered with driver
> 17/02/09 08:30:46 INFO executor.Executor: Starting executor ID 75 on host 
> hsx-node8
> 17/02/09 08:30:46 INFO util.Utils: Successfully started service 
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40374.
> 17/02/09 08:30:46 INFO netty.NettyBlockTransferService: Server created on 
> 40374
> 17/02/09 08:30:46 INFO storage.BlockManager: external shuffle service port = 
> 7337
> 17/02/09 08:30:46 INFO storage.BlockManagerMaster: Trying to register 
> BlockManager
> 17/02/09 08:30:46 INFO storage.BlockManagerMaster: Registered BlockManager
> 17/02/09 08:30:46 INFO storage.BlockManager: Registering executor with local 
> external shuffle service.
> 17/02/09 08:30:51 ERROR client.TransportResponseHandler: Still have 1 
> requests outstanding when connection from hsx-node8/192.168.1.8:7337 is closed
> 17/02/09 08:30:51 ERROR storage.BlockManager: Failed to connect to external 
> shuffle server, will retry 2 more times after waiting 5 seconds...
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>   at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:144)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>   at 
> org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:215)
>   at 
> org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:201)
>   at org.apache.spark.executor.Executor.(Executor.scala:86)
>   at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83)
>   at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
>   at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
>   at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>   at 
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.TimeoutException: Timeout waiting for task.
>   at 
> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276)
>   at 
> org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:96)
>   at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:274)
>   ... 14 more
> 17/02/09 08:31:01 ERROR storage.BlockManager: Failed to connect to external 
> shuffle server, will retry 1 more times after waiting 5 seconds...
> {noformat}
> nodemanager's log:
> {noformat}
> 2017-02-09 08:30:48,836 INFO 
> org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Removed 
> completed containers from NM context: [container_1486564603520_0097_01_05]
> 2017-02-09 08:31:12,122 WARN 
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
> from container container_1486564603520_0096_01_71 is : 1
> 2017-02-09 08:31:12,122