[jira] [Commented] (SPARK-28304) FileFormatWriter introduces an uncoditional sort, even when all attributes are constants

2019-07-20 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16889436#comment-16889436
 ] 

Eyal Farago commented on SPARK-28304:
-

[~joshrosen], thanks for your comment,

I think this is a bit broader subject than just FileFormatWriter as any sort 
operator can either simplify its ordering or be completely eliminated when 
some/all of the sort columns are known to be constant.

furthermore, in some cases one ordering can be used to satisfy several 
orderings (or on the other hand, ordering requirements of downstream operators 
can be relaxed) - so I believe this is best handled by the optimizer/planner, 
by essentially making EnsureRequirements aware of these kind of cases. as a 
short term fix, the SortExec operator can filter away constant ordering columns 
in the execute() method, and in case it's left with no ordering columns simply 
bypass the sort altogether. 

BTW, is there any reason the FileFormatWriter doesn't take the regular 
optimizing/planning path?

 

> FileFormatWriter introduces an uncoditional sort, even when all attributes 
> are constants
> 
>
> Key: SPARK-28304
> URL: https://issues.apache.org/jira/browse/SPARK-28304
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: Eyal Farago
>Priority: Major
>  Labels: performance
>
> FileFormatWriter derives a required sort order based on the partition 
> columns, bucketing columns and explicitly required ordering. However in some 
> use cases Some (or even all) of these fields are constant, in these cases the 
> sort can be skipped.
> i.e. in my use-case, we add a GUUID column identifying a specific 
> (incremental) load, this can be thought of as a batch id. Since we run one 
> batch at a time, this column is always a constant which means there's no need 
> to sort based on this column, since we don't use bucketing or require an 
> explicit ordering the entire sort can be skipped for our case.
>  
> I suggest:
>  # filter away constant columns from the required ordering calculated by 
> FileFormatWriter 
>  # generalizing this to any Sort operator in a spark plan.
>  # introduce optimizer rules to remove constants from sort ordering, 
> potentially eliminating the sort operator altogether.
>  # modify EnsureRequirements to be aware of constant field when deciding 
> whether to introduce a sort or not. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28304) FileFormatWriter introduces an uncoditional sort, even when all attributes are constants

2019-07-08 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880402#comment-16880402
 ] 

Eyal Farago commented on SPARK-28304:
-

cc [~cloud_fan],[~hvanhovell]

> FileFormatWriter introduces an uncoditional sort, even when all attributes 
> are constants
> 
>
> Key: SPARK-28304
> URL: https://issues.apache.org/jira/browse/SPARK-28304
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: Eyal Farago
>Priority: Major
>  Labels: performance
>
> FileFormatWriter derives a required sort order based on the partition 
> columns, bucketing columns and explicitly required ordering. However in some 
> use cases Some (or even all) of these fields are constant, in these cases the 
> sort can be skipped.
> i.e. in my use-case, we add a GUUID column identifying a specific 
> (incremental) load, this can be thought of as a batch id. Since we run one 
> batch at a time, this column is always a constant which means there's no need 
> to sort based on this column, since we don't use bucketing or require an 
> explicit ordering the entire sort can be skipped for our case.
>  
> I suggest:
>  # filter away constant columns from the required ordering calculated by 
> FileFormatWriter 
>  # generalizing this to any Sort operator in a spark plan.
>  # introduce optimizer rules to remove constants from sort ordering, 
> potentially eliminating the sort operator altogether.
>  # modify EnsureRequirements to be aware of constant field when deciding 
> whether to introduce a sort or not. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28304) FileFormatWriter introduces an uncoditional sort, even when all attributes are constants

2019-07-08 Thread Eyal Farago (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eyal Farago updated SPARK-28304:

Summary: FileFormatWriter introduces an uncoditional sort, even when all 
attributes are constants  (was: FileFormatWriter introduces an uncoditional 
join, even when all attributes are constants)

> FileFormatWriter introduces an uncoditional sort, even when all attributes 
> are constants
> 
>
> Key: SPARK-28304
> URL: https://issues.apache.org/jira/browse/SPARK-28304
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: Eyal Farago
>Priority: Major
>  Labels: performance
>
> FileFormatWriter derives a required sort order based on the partition 
> columns, bucketing columns and explicitly required ordering. However in some 
> use cases Some (or even all) of these fields are constant, in these cases the 
> sort can be skipped.
> i.e. in my use-case, we add a GUUID column identifying a specific 
> (incremental) load, this can be thought of as a batch id. Since we run one 
> batch at a time, this column is always a constant which means there's no need 
> to sort based on this column, since we don't use bucketing or require an 
> explicit ordering the entire sort can be skipped for our case.
>  
> I suggest:
>  # filter away constant columns from the required ordering calculated by 
> FileFormatWriter 
>  # generalizing this to any Sort operator in a spark plan.
>  # introduce optimizer rules to remove constants from sort ordering, 
> potentially eliminating the sort operator altogether.
>  # modify EnsureRequirements to be aware of constant field when deciding 
> whether to introduce a sort or not. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28304) FileFormatWriter introduces an uncoditional join, even when all attributes are constants

2019-07-08 Thread Eyal Farago (JIRA)
Eyal Farago created SPARK-28304:
---

 Summary: FileFormatWriter introduces an uncoditional join, even 
when all attributes are constants
 Key: SPARK-28304
 URL: https://issues.apache.org/jira/browse/SPARK-28304
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.2
Reporter: Eyal Farago


FileFormatWriter derives a required sort order based on the partition columns, 
bucketing columns and explicitly required ordering. However in some use cases 
Some (or even all) of these fields are constant, in these cases the sort can be 
skipped.

i.e. in my use-case, we add a GUUID column identifying a specific (incremental) 
load, this can be thought of as a batch id. Since we run one batch at a time, 
this column is always a constant which means there's no need to sort based on 
this column, since we don't use bucketing or require an explicit ordering the 
entire sort can be skipped for our case.

 

I suggest:
 # filter away constant columns from the required ordering calculated by 
FileFormatWriter 
 # generalizing this to any Sort operator in a spark plan.
 # introduce optimizer rules to remove constants from sort ordering, 
potentially eliminating the sort operator altogether.
 # modify EnsureRequirements to be aware of constant field when deciding 
whether to introduce a sort or not. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2019-05-15 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840498#comment-16840498
 ] 

Eyal Farago commented on SPARK-24437:
-

[~mgaido], looking at this again I suspect in this case loosing the broadcast 
does not necessarily means loosing the lineage as the broadcast was built by 
executing a plan and this plan is still available in case the broadcast isn't.

I think the usage of broadcast variables inside spark sql is different than 
that of directly using them in the RDD API. While in the RDD API the 
broadcasted data is actually originated at the driver and spark has no way of 
reconstructing this data, in the spark-sql scenario the broadcasted data is 
built by spark and the knowledge of reconstructing this data is still available 
for spark so in the rare event both the broadcast and part of the cached 
partitions are lost it's possible to reconstruct the broadcasted value (by 
executing the plan that produced this data in the first place) and then 
recompute the missing partitions again.

I think it's at least theoretically possible to identify cached plans based on 
broadcasted data and change the relevant operators to reference the broadcast 
variable via a soft/weak reference. unfortunately I believe it'd turn out to be 
quite difficult, especially in the face of no-deterministic operators.

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Major
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17556) Executor side broadcast for broadcast joins

2019-03-08 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-17556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788206#comment-16788206
 ] 

Eyal Farago commented on SPARK-17556:
-

why was this abandoned?

[~viirya]'s pull request seems promising.

I think the last comment by [~LI,Xiao] applies for current implementation as 
well as executors hold the entire broadcast anyway (assuming they ran task that 
used it) - so memory footprint on the executors side doesn't change, re. 
performance regression in case of multiple smaller partitions this also applies 
for current implementation as the RDD partitions has to be calculated and 
transferred to the driver.

one thing I personally think could be improved in [~viirya]'s PR was the 
requirement for the RDD to be pre-persisted, I think blocks could be evaluated 
in the mapPartition operation performed in the newly introduced RDD.broadcast 
method, this would have solved most comments by [~holdenk_amp] in the PR.

> Executor side broadcast for broadcast joins
> ---
>
> Key: SPARK-17556
> URL: https://issues.apache.org/jira/browse/SPARK-17556
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core, SQL
>Reporter: Reynold Xin
>Priority: Major
> Attachments: executor broadcast.pdf, executor-side-broadcast.pdf
>
>
> Currently in Spark SQL, in order to perform a broadcast join, the driver must 
> collect the result of an RDD and then broadcast it. This introduces some 
> extra latency. It might be possible to broadcast directly from executors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming

2018-12-28 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730301#comment-16730301
 ] 

Eyal Farago commented on SPARK-22579:
-

Glanced over this on my cell, seems like spark-25905 only addresses the reading 
side, when the executor serving this block has all values in memory a similar 
issue happens on that executor as well.

> BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be 
> implemented using streaming
> --
>
> Key: SPARK-22579
> URL: https://issues.apache.org/jira/browse/SPARK-22579
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Spark Core
>Affects Versions: 2.1.0
>Reporter: Eyal Farago
>Priority: Major
>
> when an RDD partition is cached on an executor bu the task requiring it is 
> running on another executor (process locality ANY), the cached partition is 
> fetched via BlockManager.getRemoteValues which delegates to 
> BlockManager.getRemoteBytes, both calls are blocking.
> in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes 
> cluster, cached to disk. rough math shows that average partition size is 
> 700MB.
> looking at spark UI it was obvious that tasks running with process locality 
> 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I 
> was able to capture thread dumps of executors executing remote tasks and got 
> this stake trace:
> {quote}Thread ID  Thread Name Thread StateThread Locks
> 1521  Executor task launch worker-1000WAITING 
> Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978})
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> scala.concurrent.Await$.result(package.scala:190)
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190)
> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104)
> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582)
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550)
> org.apache.spark.storage.BlockManager.get(BlockManager.scala:638)
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690)
> org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote}
> digging into the code showed that the block manager first fetches all bytes 
> (getRemoteBytes) and then wraps it with a deserialization stream, this has 
> several draw backs:
> 1. blocking, requesting executor is blocked while the remote executor is 
> serving the block.
> 2. potentially large memory footprint on requesting executor, in my use case 
> a 700mb of raw bytes stored in a ChunkedByteBuffer.
> 3. inefficient, requesting side usually don't need all values at once as it 
> consumes the values via an iterator.
> 4. potentially large memory footprint on serving executor, in case the block 
> is cached in deserialized form the serving executor has to serialize it into 
> a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU 
> intensive, memory footprint can be reduced by using a limited buffer for 
> serialization 'spilling' to the response stream.
> I suggest improving this either by implementing full streaming mechanism or 
> some kind of pagination mechanism, in addition the requesting executor should 
> be able to make progress with the data it already has, blocking only when 
> local buffer is exhausted and remote side didn't deliver the next chunk of 
> the stream (or page in case of pagination) yet.



-

[jira] [Commented] (SPARK-17403) Fatal Error: Scan cached strings

2018-11-10 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-17403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16682591#comment-16682591
 ] 

Eyal Farago commented on SPARK-17403:
-

[~paul_lysak], [~hvanhovell],

please notice the exception happens when building the column buffers, hence the 
corrupt string comes from upstream - in the repro code this is the join, not 
necessarily an in-mem-relation.

[~hvanhovell], I'd suspect that the upstream operator somehow overwrites the 
row/buffer returned from the Iterator.next method while the downstream code 
(in-mem column building code) doesn't take the necessary precautions (copy). if 
this is the case I'd suspect the issue is somehow related to code generation 
either in the join or InMemoryTableScanExec operator.

 

another option, [~paul_lysak] mentioned that the problem happens when the 
application is 'big enough', given that DataSet's default persistence level is 
_memory and disk_, is it possible that spark starts evacuating these blocks 
from memory and then bad things start happening?

> Fatal Error: Scan cached strings
> 
>
> Key: SPARK-17403
> URL: https://issues.apache.org/jira/browse/SPARK-17403
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: Spark standalone cluster (3 Workers, 47 cores)
> Ubuntu 14
> Java 8
>Reporter: Ruben Hernando
>Priority: Major
>
> The process creates views from JDBC (SQL server) source and combines them to 
> create other views.
>  Finally it dumps results via JDBC
> Error:
> {quote} # JRE version: Java(TM) SE Runtime Environment (8.0_101-b13) (build 
> 1.8.0_101-b13)
>  # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.101-b13 mixed mode 
> linux-amd64 )
>  # Problematic frame:
>  # J 4895 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J 
> (9 bytes) @ 0x7fbb355dfd6c [0x7fbb355dfd60+0xc]
>  #{quote}
> SQL Query plan (fields truncated):
> {noformat}
> == Parsed Logical Plan ==
> 'Project [*]
> +- 'UnresolvedRelation `COEQ_63`
> == Analyzed Logical Plan ==
> InstanceId: bigint, price: double, ZoneId: int, priceItemId: int, priceId: int
> Project [InstanceId#20236L, price#20237, ZoneId#20239, priceItemId#20242, 
> priceId#20244]
> +- SubqueryAlias coeq_63
>+- Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS 
> price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244]
>   +- SubqueryAlias 6__input
>  +- 
> Relation[_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,TableP_DCID#148,TableSHID#149,SL_ACT_GI_DTE#150,SL_Xcl_C#151,SL_Xcl_C#152,SL_Css_Cojs#153L,SL_Config#154,SL_CREATEDON#
>  .. 36 more fields] JDBCRelation((select [SLTables].[_TableSL_SID], 
> [SLTables]. ... [...]  FROM [sch].[SLTables] [SLTables] JOIN sch.TPSLTables 
> TPSLTables ON [TPSLTables].[_TableSL_SID] = [SLTables].[_TableSL_SID] where 
> _TP = 24) input)
> def pers
> == Optimized Logical Plan ==
> Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS 
> price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244]
> +- InMemoryRelation [_TableSL_SID#143L, _TableP_DC_SID#144L, 
> _TableSH_SID#145L, ID#146, Name#147, ... 36 more fields], true, 1, 
> StorageLevel(disk, memory, deserialized, 1 replicas)
>:  +- *Scan JDBCRelation((select [SLTables].[_TableSL_SID], 
> [SLTables].[_TableP_DC_SID], [SLTables].[_TableSH_SID], [SLTables].[ID], 
> [SLTables].[Name], [SLTables].[TableP_DCID], [SLTables].[TableSHID], 
> [TPSLTables].[SL_ACT_GI_DTE],  ... [...] FROM [sch].[SLTables] [SLTables] 
> JOIN sch.TPSLTables TPSLTables ON [TPSLTables].[_TableSL_SID] = 
> [SLTables].[_TableSL_SID] where _TP = 24) input) 
> [_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,TableP_DCID#148,TableSHID#149,SL_ACT_GI_DTE#150,SL_Xcl_C#151,...
>  36 more fields] 
> == Physical Plan ==
> *Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS 
> price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244]
> +- InMemoryTableScan [_TableSL_SID#143L, SL_RD_ColR_N#189]
>:  +- InMemoryRelation [_TableSL_SID#143L, _TableP_DC_SID#144L, 
> _TableSH_SID#145L, ID#146, Name#147, ... 36 more fields], true, 1, 
> StorageLevel(disk, memory, deserialized, 1 replicas)
>: :  +- *Scan JDBCRelation((select [SLTables].[_TableSL_SID], 
> [SLTables].[_TableP_DC_SID], [SLTables].[_TableSH_SID], [SLTables].[ID], 
> [SLTables].[Name], [SLTables].[TableP_DCID],  ... [...] FROM [sch].[SLTables] 
> [SLTables] JOIN sch.TPSLTables TPSLTables ON [TPSLTables].[_TableSL_SID] = 
> [SLTables].[_TableSL_SID] where _TP = 24) input) 
> [_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,,... 
> 36 more fields]
> {noformat}



--
This message was 

[jira] [Updated] (SPARK-17403) Fatal Error: Scan cached strings

2018-11-10 Thread Eyal Farago (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-17403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eyal Farago updated SPARK-17403:

Description: 
The process creates views from JDBC (SQL server) source and combines them to 
create other views.
 Finally it dumps results via JDBC

Error:
{quote} # JRE version: Java(TM) SE Runtime Environment (8.0_101-b13) (build 
1.8.0_101-b13)
 # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.101-b13 mixed mode 
linux-amd64 )
 # Problematic frame:
 # J 4895 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J (9 
bytes) @ 0x7fbb355dfd6c [0x7fbb355dfd60+0xc]
 #{quote}
SQL Query plan (fields truncated):
{noformat}
== Parsed Logical Plan ==
'Project [*]
+- 'UnresolvedRelation `COEQ_63`

== Analyzed Logical Plan ==
InstanceId: bigint, price: double, ZoneId: int, priceItemId: int, priceId: int
Project [InstanceId#20236L, price#20237, ZoneId#20239, priceItemId#20242, 
priceId#20244]
+- SubqueryAlias coeq_63
   +- Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS 
price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244]
  +- SubqueryAlias 6__input
 +- 
Relation[_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,TableP_DCID#148,TableSHID#149,SL_ACT_GI_DTE#150,SL_Xcl_C#151,SL_Xcl_C#152,SL_Css_Cojs#153L,SL_Config#154,SL_CREATEDON#
 .. 36 more fields] JDBCRelation((select [SLTables].[_TableSL_SID], 
[SLTables]. ... [...]  FROM [sch].[SLTables] [SLTables] JOIN sch.TPSLTables 
TPSLTables ON [TPSLTables].[_TableSL_SID] = [SLTables].[_TableSL_SID] where _TP 
= 24) input)
def pers
== Optimized Logical Plan ==
Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS 
price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244]
+- InMemoryRelation [_TableSL_SID#143L, _TableP_DC_SID#144L, _TableSH_SID#145L, 
ID#146, Name#147, ... 36 more fields], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :  +- *Scan JDBCRelation((select [SLTables].[_TableSL_SID], 
[SLTables].[_TableP_DC_SID], [SLTables].[_TableSH_SID], [SLTables].[ID], 
[SLTables].[Name], [SLTables].[TableP_DCID], [SLTables].[TableSHID], 
[TPSLTables].[SL_ACT_GI_DTE],  ... [...] FROM [sch].[SLTables] [SLTables] JOIN 
sch.TPSLTables TPSLTables ON [TPSLTables].[_TableSL_SID] = 
[SLTables].[_TableSL_SID] where _TP = 24) input) 
[_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,TableP_DCID#148,TableSHID#149,SL_ACT_GI_DTE#150,SL_Xcl_C#151,...
 36 more fields] 

== Physical Plan ==
*Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS 
price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244]
+- InMemoryTableScan [_TableSL_SID#143L, SL_RD_ColR_N#189]
   :  +- InMemoryRelation [_TableSL_SID#143L, _TableP_DC_SID#144L, 
_TableSH_SID#145L, ID#146, Name#147, ... 36 more fields], true, 1, 
StorageLevel(disk, memory, deserialized, 1 replicas)
   : :  +- *Scan JDBCRelation((select [SLTables].[_TableSL_SID], 
[SLTables].[_TableP_DC_SID], [SLTables].[_TableSH_SID], [SLTables].[ID], 
[SLTables].[Name], [SLTables].[TableP_DCID],  ... [...] FROM [sch].[SLTables] 
[SLTables] JOIN sch.TPSLTables TPSLTables ON [TPSLTables].[_TableSL_SID] = 
[SLTables].[_TableSL_SID] where _TP = 24) input) 
[_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,,... 
36 more fields]
{noformat}

  was:
The process creates views from JDBC (SQL server) source and combines them to 
create other views.
Finally it dumps results via JDBC

Error:
{quote}

# JRE version: Java(TM) SE Runtime Environment (8.0_101-b13) (build 
1.8.0_101-b13)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.101-b13 mixed mode linux-amd64 
)
# Problematic frame:
# J 4895 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J (9 
bytes) @ 0x7fbb355dfd6c [0x7fbb355dfd60+0xc]
#

{quote}

SQL Query plan (fields truncated):

{noformat}
== Parsed Logical Plan ==
'Project [*]
+- 'UnresolvedRelation `COEQ_63`

== Analyzed Logical Plan ==
InstanceId: bigint, price: double, ZoneId: int, priceItemId: int, priceId: int
Project [InstanceId#20236L, price#20237, ZoneId#20239, priceItemId#20242, 
priceId#20244]
+- SubqueryAlias coeq_63
   +- Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS 
price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244]
  +- SubqueryAlias 6__input
 +- 
Relation[_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,TableP_DCID#148,TableSHID#149,SL_ACT_GI_DTE#150,SL_Xcl_C#151,SL_Xcl_C#152,SL_Css_Cojs#153L,SL_Config#154,SL_CREATEDON#
 .. 36 more fields] JDBCRelation((select [SLTables].[_TableSL_SID], 
[SLTables]. ... [...]  FROM [sch].[SLTables] [SLTables] JOIN sch.TPSLTables 
TPSLTables ON [TPSLTables].[_TableSL_SID] = [SLTables].[_TableSL_SID] where _TP 
= 24) input)

== Optimized Logical Plan ==
Pro

[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-11-08 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679840#comment-16679840
 ] 

Eyal Farago commented on SPARK-24437:
-

[~dvogelbacher], I think what you actually want is somewhat described in 
SPARK-17556 which seems to be stuck at the moment.

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-11-08 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679560#comment-16679560
 ] 

Eyal Farago commented on SPARK-24437:
-

[~dvogelbacher], what about the _checkpoint_ approach?

another possibility: if the queries are actually rather small can you force 
them into memory and then convert them into _DataSet_s and cache these? this 
way you're getting completely rid of the broadcasts and lineage, effectively 
storing what you need. this still has a minor drawback as your DataSets are now 
built on top of a parallelized collection RDD which still has a memory 
footprint in the driver's heap.

re. your question about why is the broadcast being kept as part of the lineage, 
it'd require a long trip down the rabbit hole to understand the way the plan is 
being transformed and represented once being cached... as you wrote yourself 
this is a rather unusual use-case so it might require unusual handling on your 
side...

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-11-05 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675719#comment-16675719
 ] 

Eyal Farago commented on SPARK-24437:
-

[~dvogelbacher] this is a bit puzzling,

spark will usually choose to broadcast a relation if it's small enough, is this 
not the case here? I'd usually expect the join to be larger than its 
components. can you specify the sizes here? how many queries are cached at any 
given moment?

few workarounds I can think of: tweak the broadcast threshold configuration 
key, I don't remember the exact details but I think you can disable it 
altogether. 

 

another option might be the _checkpoint_ operation which trades disk space for 
truncated lineage, according to the scaladocs it's 'evolving' and available 
since spark 2.1, might be worth a try.

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-11-05 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675347#comment-16675347
 ] 

Eyal Farago commented on SPARK-24437:
-

[~mgaido], I think we agree on most of the details :)

I do think that once the plan is already executed, keeping the broadcast around 
is kind of a waste as this data is effectively cached as part of the cached 
relation.

on the other side, 'loosing' the broadcast means losing the ability to recover 
from a node(s) failure (spark relied on the RDD lineage for that), so I guess 
it's the least worst way of caching this relation.

 

BTW, [~dvogelbacher], what's consuming most of the memory on your scenario? are 
these the broadcast variables or the cached relations? I'd expect the later to 
be much heavier in most use cases.

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25548) In the PruneFileSourcePartitions optimizer, replace the nonPartitionOps field with true in the And(partitionOps, nonPartitionOps) to make the partition can be pruned

2018-11-03 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673957#comment-16673957
 ] 

Eyal Farago commented on SPARK-25548:
-

[~eaton], I think there are two possible approaches to handle this:

first would be extracting the partitions predicate and _And_ing it with the 
original predicate:
{code:java}
select * from src_par where 
(P_d in (2,3)) and 
((p_d=2 and key=2) or (p_d=3 and key=3))
{code}
second approach would be transforming this into a union:
{code:java}
select * from src_par where (p_d=2 and key=2) 
UNION ALL
select * from src_par where (p_d=3 and key=3)
{code}
I think second approach is easier to implement but it'd require additional 
rules to make sure partitioned are not scanned multiple times, ie. consider 
what'd happen if your predicate looked like this:
{code:java}
 (p_d=2 and key=2) or (p_d=3 and key=3) or (p_d=2 and key=33)
{code}
a naive approach would scan partition #2 twice while it's pretty obvious this 
can be avoided by _OR_ing the first and third conditions.

The first approach seems a bit more complicated by I think it somewhat 
resembles what you've started implementing in your pr, [~cloud_fan] your 
thoughts?

> In the PruneFileSourcePartitions optimizer, replace the nonPartitionOps field 
> with true in the And(partitionOps, nonPartitionOps) to make the partition can 
> be pruned
> -
>
> Key: SPARK-25548
> URL: https://issues.apache.org/jira/browse/SPARK-25548
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: eaton
>Assignee: Apache Spark
>Priority: Critical
>
> In the PruneFileSourcePartitions optimizer, the partition files will not be 
> pruned if we use partition filter and non partition filter together, for 
> example:
> sql("CREATE TABLE IF NOT EXISTS src_par (key INT, value STRING) partitioned 
> by(p_d int) stored as parquet ")
>  sql("insert overwrite table src_par partition(p_d=2) select 2 as key, '4' as 
> value")
>  sql("insert overwrite table src_par partition(p_d=3) select 3 as key, '4' as 
> value")
>  sql("insert overwrite table src_par partition(p_d=4) select 4 as key, '4' as 
> value")
> The sql below will scan all the partition files, in which, the partition 
> **p_d=4** should be pruned.
>  **sql("select * from src_par where (p_d=2 and key=2) or (p_d=3 and 
> key=3)").show**



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-11-03 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673949#comment-16673949
 ] 

Eyal Farago commented on SPARK-24437:
-

[~dvogelbacher],

haven't looked to deep into this, but here's two immediate conclusions from the 
screen shot you've attached:
 # broadcast is referenced from MapPartitionsRDD's f member, this seems 
reasonable for a broadcast join.
 # the entire thing is cached (CachedRDDBuilder), is it possible you're caching 
this DataSet? unlike RDDs DataSet's persistence is manually managed - hence 
they're not automatically garbage collected once the last reference is dropped.

having that said, I'd still expect spark to cache only the in-memory 
representation and not the entire RDD lineage, so this does look like some sort 
of a bug, something like an over capturing function/closure in the caching code.

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-09-14 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614683#comment-16614683
 ] 

Eyal Farago commented on SPARK-24410:
-

[~viirya], I see that the PR is now closed (postponed) due to concerns of 
performance regression when partitioning is not required by parent operators.

I think this can be solved by introducing a rule that identifies an Exchange 
over Union that already supports the required partitioning, in this case the 
union operator can be replaced by a partitioning aware union operator that 
merges the partitions (basically implements your approach as presented in the 
PR).

I think this approach limits the regression mentioned in the pull request and 
improves performance when upstream requires a matching partitioning, do you 
think this approach also has to be governed by a cost based approach?

 

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-09-14 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614678#comment-16614678
 ] 

Eyal Farago commented on SPARK-24410:
-

[~viirya], I've opened SPARK-25203 because of your answer on August 13th.
however looking at your PR it seems you opted to solve the general case, and 
bucketed table is just a private case of the general case.
do you think SPARK-25203 should be closed as a duplicate of this? if so can you 
please add the example used there to this issue as well?

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25203) spark sql, union all does not propagate child partitioning (when possible)

2018-08-22 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589356#comment-16589356
 ] 

Eyal Farago commented on SPARK-25203:
-

seems I was wrong regarding the resulting distribution,

modified the query a bit:
{code:java}
select *, spark_partition_id() as P  from (select * from t1DU distribute by c1)
-- !query 15 schema
struct
-- !query 15 output
1   a   43
2   a   174
2   b   174
3   b   51
{code}

it's now obvious that records are properly clustered. (I guess the original 
query computed the partition id BEFORE the repartitioning).

> spark sql, union all does not propagate child partitioning (when possible)
> --
>
> Key: SPARK-25203
> URL: https://issues.apache.org/jira/browse/SPARK-25203
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0, 2.3.0, 2.4.0
>Reporter: Eyal Farago
>Priority: Major
>
> in spark-sql, union all does not propagate partitioning when all child plans 
> have the same partitioning, this causes introduction of non necessary 
> Exchange nodes when parent operator requires a distribution satisfied by this 
> partitioning.
>  
> {code:java}
> CREATE OR REPLACE TEMPORARY VIEW t1 AS VALUES (1, 'a'), (2, 'b') tbl(c1, c2);
> CREATE OR REPLACE TEMPORARY VIEW t1D1 AS select c1, c2 from t1 distribute by 
> c1;
> CREATE OR REPLACE TEMPORARY VIEW t1D2 AS select c1 + 1 as c11, c2 from t1 
> distribute by c11;
> create or REPLACE TEMPORARY VIEW t1DU as
> select * from t1D1
> UNION ALL
> select * from t1D2;
> EXPLAIN select * from t1DU distribute by c1;
> == Physical Plan ==
> Exchange hashpartitioning(c1#x, 200)
> +- Union
>:- Exchange hashpartitioning(c1#x, 200)
>:  +- LocalTableScan [c1#x, c2#x]
>+- Exchange hashpartitioning(c11#x, 200)
>   +- LocalTableScan [c11#x, c2#x]
> {code}
> the Exchange introduced in the last query is unnecessary since the unioned 
> data is already partitioned by column _c1_, in fact the equivalent RDD 
> operation identifies this scenario and introduces a PartitionerAwareUnionRDD 
> which maintains children's shared partitioner.
> I suggest modifying modifying org.apache.spark.sql.execution.UnionExec by 
> overriding _outputPartitioning_ in a way that identifies common partitioning 
> among child plans and use that (falling back to default implementation 
> otherwise).
> furthermore, it seems current implementation does not properly clusters data:
> {code:java}
> select *, spark_partition_id() as P  from t1DU distribute by c1
> -- !query 15 schema
> struct
> -- !query 15 output
> 1 a   43
> 2 a   374
> 2 b   174
> 3 b   251
> {code}
> notice _c1=2_ in partitions 174 and 374.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25203) spark sql, union all does not propagate child partitioning (when possible)

2018-08-22 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589332#comment-16589332
 ] 

Eyal Farago commented on SPARK-25203:
-

CC: [~hvanhovell], [~cloud_fan]

> spark sql, union all does not propagate child partitioning (when possible)
> --
>
> Key: SPARK-25203
> URL: https://issues.apache.org/jira/browse/SPARK-25203
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0, 2.3.0, 2.4.0
>Reporter: Eyal Farago
>Priority: Major
>
> in spark-sql, union all does not propagate partitioning when all child plans 
> have the same partitioning, this causes introduction of non necessary 
> Exchange nodes when parent operator requires a distribution satisfied by this 
> partitioning.
>  
> {code:java}
> CREATE OR REPLACE TEMPORARY VIEW t1 AS VALUES (1, 'a'), (2, 'b') tbl(c1, c2);
> CREATE OR REPLACE TEMPORARY VIEW t1D1 AS select c1, c2 from t1 distribute by 
> c1;
> CREATE OR REPLACE TEMPORARY VIEW t1D2 AS select c1 + 1 as c11, c2 from t1 
> distribute by c11;
> create or REPLACE TEMPORARY VIEW t1DU as
> select * from t1D1
> UNION ALL
> select * from t1D2;
> EXPLAIN select * from t1DU distribute by c1;
> == Physical Plan ==
> Exchange hashpartitioning(c1#x, 200)
> +- Union
>:- Exchange hashpartitioning(c1#x, 200)
>:  +- LocalTableScan [c1#x, c2#x]
>+- Exchange hashpartitioning(c11#x, 200)
>   +- LocalTableScan [c11#x, c2#x]
> {code}
> the Exchange introduced in the last query is unnecessary since the unioned 
> data is already partitioned by column _c1_, in fact the equivalent RDD 
> operation identifies this scenario and introduces a PartitionerAwareUnionRDD 
> which maintains children's shared partitioner.
> I suggest modifying modifying org.apache.spark.sql.execution.UnionExec by 
> overriding _outputPartitioning_ in a way that identifies common partitioning 
> among child plans and use that (falling back to default implementation 
> otherwise).
> furthermore, it seems current implementation does not properly clusters data:
> {code:java}
> select *, spark_partition_id() as P  from t1DU distribute by c1
> -- !query 15 schema
> struct
> -- !query 15 output
> 1 a   43
> 2 a   374
> 2 b   174
> 3 b   251
> {code}
> notice _c1=2_ in partitions 174 and 374.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25203) spark sql, union all does not propagate child partitioning (when possible)

2018-08-22 Thread Eyal Farago (JIRA)
Eyal Farago created SPARK-25203:
---

 Summary: spark sql, union all does not propagate child 
partitioning (when possible)
 Key: SPARK-25203
 URL: https://issues.apache.org/jira/browse/SPARK-25203
 Project: Spark
  Issue Type: Bug
  Components: Optimizer, SQL
Affects Versions: 2.3.0, 2.2.0, 2.4.0
Reporter: Eyal Farago


in spark-sql, union all does not propagate partitioning when all child plans 
have the same partitioning, this causes introduction of non necessary Exchange 
nodes when parent operator requires a distribution satisfied by this 
partitioning.

 
{code:java}
CREATE OR REPLACE TEMPORARY VIEW t1 AS VALUES (1, 'a'), (2, 'b') tbl(c1, c2);
CREATE OR REPLACE TEMPORARY VIEW t1D1 AS select c1, c2 from t1 distribute by c1;
CREATE OR REPLACE TEMPORARY VIEW t1D2 AS select c1 + 1 as c11, c2 from t1 
distribute by c11;

create or REPLACE TEMPORARY VIEW t1DU as
select * from t1D1
UNION ALL
select * from t1D2;

EXPLAIN select * from t1DU distribute by c1;

== Physical Plan ==
Exchange hashpartitioning(c1#x, 200)
+- Union
   :- Exchange hashpartitioning(c1#x, 200)
   :  +- LocalTableScan [c1#x, c2#x]
   +- Exchange hashpartitioning(c11#x, 200)
  +- LocalTableScan [c11#x, c2#x]
{code}

the Exchange introduced in the last query is unnecessary since the unioned data 
is already partitioned by column _c1_, in fact the equivalent RDD operation 
identifies this scenario and introduces a PartitionerAwareUnionRDD which 
maintains children's shared partitioner.

I suggest modifying modifying org.apache.spark.sql.execution.UnionExec by 
overriding _outputPartitioning_ in a way that identifies common partitioning 
among child plans and use that (falling back to default implementation 
otherwise).

furthermore, it seems current implementation does not properly clusters data:

{code:java}
select *, spark_partition_id() as P  from t1DU distribute by c1
-- !query 15 schema
struct
-- !query 15 output
1   a   43
2   a   374
2   b   174
3   b   251
{code}

notice _c1=2_ in partitions 174 and 374.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-08-13 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578789#comment-16578789
 ] 

Eyal Farago commented on SPARK-24410:
-

[~viirya], my bad :)

seems there are two distinct issues here: one is general behavior of 
join/aggregate over unions, the other is the guarantees of bucketed 
partitioning.

looking more carefully at the results of your query it seems that the two DFs 
are not co-partitioned (which is a bit surprising), so my apologies.

having that said, there's a more general issue with pushing down shuffle 
related operations over a union, do you guys think this deserves a separate 
issue?

 

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25103) CompletionIterator may delay GC of completed resources

2018-08-13 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578519#comment-16578519
 ] 

Eyal Farago commented on SPARK-25103:
-

CC: [~cloud_fan], [~hvanhovell]

> CompletionIterator may delay GC of completed resources
> --
>
> Key: SPARK-25103
> URL: https://issues.apache.org/jira/browse/SPARK-25103
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.1, 2.1.0, 2.2.0, 2.3.0
>Reporter: Eyal Farago
>Priority: Major
>
> while working on SPARK-22713 , I fund (and partially fixed) a scenario in 
> which an iterator is already exhausted but still holds a reference to some 
> resources that can be GCed at this point.
> However, these resources can not be GCed because of this reference.
> the specific fix applied in SPARK-22713 was to wrap the iterator with a 
> CompletionIterator that cleans it when exhausted, thing is that it's quite 
> easy to get this wrong by closing over local variables or _this_ reference in 
> the cleanup function itself.
> I propose solving this by modifying CompletionIterator to discard references 
> to the wrapped iterator and cleanup function once exhausted.
>  
>  * a dive into the code showed that most CompletionIterators are eventually 
> used by 
> {code:java}
> org.apache.spark.scheduler.ShuffleMapTask#runTask{code}
> which does:
> {code:java}
> writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: 
> Product2[Any, Any]]]){code}
> looking at 
> {code:java}
> org.apache.spark.shuffle.ShuffleWriter#write{code}
> implementations, it seems all of them first exhaust the iterator and then 
> perform some kind of post-processing: i.e. merging spills, sorting, writing 
> partitions files and then concatenating them into a single file... bottom 
> line the Iterator may actually be 'sitting' for some time after being 
> exhausted.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25103) CompletionIterator may delay GC of completed resources

2018-08-13 Thread Eyal Farago (JIRA)
Eyal Farago created SPARK-25103:
---

 Summary: CompletionIterator may delay GC of completed resources
 Key: SPARK-25103
 URL: https://issues.apache.org/jira/browse/SPARK-25103
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0, 2.2.0, 2.1.0, 2.0.1
Reporter: Eyal Farago


while working on SPARK-22713 , I fund (and partially fixed) a scenario in which 
an iterator is already exhausted but still holds a reference to some resources 
that can be GCed at this point.

However, these resources can not be GCed because of this reference.

the specific fix applied in SPARK-22713 was to wrap the iterator with a 
CompletionIterator that cleans it when exhausted, thing is that it's quite easy 
to get this wrong by closing over local variables or _this_ reference in the 
cleanup function itself.

I propose solving this by modifying CompletionIterator to discard references to 
the wrapped iterator and cleanup function once exhausted.

 
 * a dive into the code showed that most CompletionIterators are eventually 
used by 
{code:java}
org.apache.spark.scheduler.ShuffleMapTask#runTask{code}
which does:

{code:java}
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: 
Product2[Any, Any]]]){code}

looking at 
{code:java}
org.apache.spark.shuffle.ShuffleWriter#write{code}
implementations, it seems all of them first exhaust the iterator and then 
perform some kind of post-processing: i.e. merging spills, sorting, writing 
partitions files and then concatenating them into a single file... bottom line 
the Iterator may actually be 'sitting' for some time after being exhausted.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-08-13 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578489#comment-16578489
 ] 

Eyal Farago commented on SPARK-24410:
-

[~viirya], I think your conclusion about co-partitioning is wrong, the 
following code segment from your comment:
{code:java}
val df1 = spark.table("a1").select(spark_partition_id(), $"key")
val df2 = spark.table("a2").select(spark_partition_id(), $"key")
df1.union(df2).select(spark_partition_id(), $"key").collect
{code}
this prints the partition ids as assigned by union, assuming union simply 
concatenates the partitions from df1 and df2 assigning them a running number 
id, it really makes sense you'd get two partitions per key: one coming from df1 
and the other from df2.

applying this select on each dataframe separately you'd get the exact same 
results meaning a given key will have the same partition id in both dataframes.

I think this code fragment basically shows what's wrong with current 
implementation of Union, no that we can't optimize unions of co-partitioned 
relations.

if union was a bit more 'partitioning aware' it'd be able to identify that both 
children have the same partitioning scheme and 'inherit' it. as you actually 
showed this might be a bit tricky as the same logical attribute from different 
children has a different expression id, but Union eventually maps these child 
attributes into a single output attribute, so this information can be used to 
resolve the partitioning columns and determine their equality.

furthermore, Union being smarter on its output partitioning won't cut it, few 
rules have to be added/modified:

1. applying exchange on a union should sometimes be pushed to the children 
(children can be partitioned to those supporting the required partitioning and 
others not supporting it, the exchange can be applied to a union of the 
non-supporting children and then unioned with the rest of the children)
 2. partial aggregate also has to be pushed to the children resulting with a 
union of partial aggregations, again it's possible to partition children 
according to their support of the required partitioning.
 3. final aggregation over a union introduces an exchange which will then be 
pushed to the children, the aggregation is then applied on top of the 
partitioning aware union (think of the way PartitionerAwareUnionRDD handles 
partitioning).
 * partition children = partitioning an array by a predicate 
(scala.collection.TraversableLike#partition)
 * other operators like join may require additional rules.
 * some of this ideas were discussed offline with [~hvanhovell]

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: 

[jira] [Commented] (SPARK-22713) OOM caused by the memory contention and memory leak in TaskMemoryManager

2018-08-13 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578322#comment-16578322
 ] 

Eyal Farago commented on SPARK-22713:
-

[~jerrylead], can you please test this?

> OOM caused by the memory contention and memory leak in TaskMemoryManager
> 
>
> Key: SPARK-22713
> URL: https://issues.apache.org/jira/browse/SPARK-22713
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.1.1, 2.1.2
>Reporter: Lijie Xu
>Assignee: Eyal Farago
>Priority: Critical
> Fix For: 2.4.0
>
>
> The pdf version of this issue with high-quality figures is available at 
> https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/report/OOM-TaskMemoryManager.pdf.
> *[Abstract]* 
> I recently encountered an OOM error in a PageRank application 
> (_org.apache.spark.examples.SparkPageRank_). After profiling the application, 
> I found the OOM error is related to the memory contention in shuffle spill 
> phase. Here, the memory contention means that a task tries to release some 
> old memory consumers from memory for keeping the new memory consumers. After 
> analyzing the OOM heap dump, I found the root cause is a memory leak in 
> _TaskMemoryManager_. Since memory contention is common in shuffle phase, this 
> is a critical bug/defect. In the following sections, I will use the 
> application dataflow, execution log, heap dump, and source code to identify 
> the root cause.
> *[Application]* 
> This is a PageRank application from Spark’s example library. The following 
> figure shows the application dataflow. The source code is available at \[1\].
> !https://raw.githubusercontent.com/JerryLead/Misc/master/OOM-TasksMemoryManager/figures/PageRankDataflow.png|width=100%!
> *[Failure symptoms]*
> This application has a map stage and many iterative reduce stages. An OOM 
> error occurs in a reduce task (Task-28) as follows.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/Stage.png?raw=true|width=100%!
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/task.png?raw=true|width=100%!
>  
> *[OOM root cause identification]*
> Each executor has 1 CPU core and 6.5GB memory, so it only runs one task at a 
> time. After analyzing the application dataflow, error log, heap dump, and 
> source code, I found the following steps lead to the OOM error. 
> => The MemoryManager found that there is not enough memory to cache the 
> _links:ShuffledRDD_ (rdd-5-28, red circles in the dataflow figure).
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/ShuffledRDD.png?raw=true|width=100%!
> => The task needs to shuffle twice (1st shuffle and 2nd shuffle in the 
> dataflow figure).
> => The task needs to generate two _ExternalAppendOnlyMap_ (E1 for 1st shuffle 
> and E2 for 2nd shuffle) in sequence.
> => The 1st shuffle begins and ends. E1 aggregates all the shuffled data of 
> 1st shuffle and achieves 3.3 GB.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/FirstShuffle.png?raw=true|width=100%!
> => The 2nd shuffle begins. E2 is aggregating the shuffled data of 2nd 
> shuffle, and finding that there is not enough memory left. This triggers the 
> memory contention.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/SecondShuffle.png?raw=true|width=100%!
> => To handle the memory contention, the _TaskMemoryManager_ releases E1 
> (spills it onto disk) and assumes that the 3.3GB space is free now.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/MemoryContention.png?raw=true|width=100%!
> => E2 continues to aggregates the shuffled records of 2nd shuffle. However, 
> E2 encounters an OOM error while shuffling.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/OOMbefore.png?raw=true|width=100%!
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/OOMError.png?raw=true|width=100%!
> *[Guess]* 
> The task memory usage below reveals that there is not memory drop down. So, 
> the cause may be that the 3.3GB _ExternalAppendOnlyMap_ (E1) is not actually 
> released by the _TaskMemoryManger_. 
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/GCFigure.png?raw=true|width=100%!
> *[Root cause]* 
> After analyzing the heap dump, I found the guess is right (the 3.3GB 
> _ExternalAppendOnlyMap_ is actually not released). The 1.6GB object is 
> _ExternalAppendOnlyMap (E2)_.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/heapdump.png?raw=true|width=100%!
> *[Question]* 
> Why the released _ExternalAppendOnlyMap_ is still in memory?
>

[jira] [Commented] (SPARK-22286) OutOfMemoryError caused by memory leak and large serializer batch size in ExternalAppendOnlyMap

2018-05-20 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481961#comment-16481961
 ] 

Eyal Farago commented on SPARK-22286:
-

[~jerrylead], don't you think actual root cause here is the use of Java 
serialization? any way that's what I grasp from looking at the last image, the 
one showing ObjectInputStream$HandleTable summing up to 400MB.

in potential solution 1st action item, do you mean removing items from the map 
as the iterator makes progress? I'm not sure that'll make much of a difference 
here, in any case this involves some book keeping as the memory has to be 
reported back to the TaskMemoryManager (-which seems not to be informed on any 
allocation by this class- which in informed of memory allocation via calls to 
maybeSpill() from insertAll()).

one thing that troubles me about this class is the fact it keeps a potentially 
large buffer per spilled map, I'm referring to:
{code:java}
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
{code}
where each DiskMapIterator has a buffer large enough to store a batch of 
records (1 by default), this combined with the fact this buffer size is 
effectively unbounded may result with a substantially sized buffer (per spill).

re. action item #3, I think Kryo does the trick, btw why use batches at all? 
this is basically a sequential read deserializing one record at a time, so it 
makes sense to buffer disk reads, but why batch? (unless this was designed with 
Java serialization pitfalls in mind in the first place).

> OutOfMemoryError caused by memory leak and large serializer batch size in 
> ExternalAppendOnlyMap
> ---
>
> Key: SPARK-22286
> URL: https://issues.apache.org/jira/browse/SPARK-22286
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.1.1, 2.1.2
>Reporter: Lijie Xu
>Priority: Critical
>
> *[Abstract]* 
> I recently encountered an OOM error in a simple _groupByKey_ application. 
> After profiling the application, I found the OOM error is related to the 
> shuffle spill and records (de)serialization. After analyzing the OOM heap 
> dump, I found the root causes are (1) memory leak in ExternalAppendOnlyMap, 
> (2) large static serializer batch size (_spark.shuffle.spill.batchSize_ 
> =1) defined in ExternalAppendOnlyMap, and (3) memory leak in the 
> deserializer. Since almost all the Spark applications rely on 
> ExternalAppendOnlyMap to perform shuffle and reduce, this is a critical 
> bug/defect. In the following sections, I will detail the testing application, 
> data, environment, failure symptoms, diagnosing procedure, identified root 
> causes, and potential solutions.
> *[Application]* 
> This is a simple GroupBy application as follows.
> {code}
> table.map(row => (row.sourceIP[1,7], row)).groupByKey().saveAsTextFile()
> {code}
> The _sourceIP_ (an IP address like 127.100.101.102) is a column of the 
> _UserVisits_ table. This application has the same logic as the aggregation 
> query in Berkeley SQL benchmark (https://amplab.cs.berkeley.edu/benchmark/) 
> as follows. 
> {code}
>   SELECT * FROM UserVisits
>   GROUP BY SUBSTR(sourceIP, 1, 7);
> {code}
> The application code is available at \[1\].
> *[Data]* 
> The UserVisits table size is 16GB (9 columns, 132,000,000 rows) with uniform 
> distribution. The HDFS block size is 128MB. The data generator is available 
> at \[2\].
> *[Environment]* 
> Spark 2.1 (Spark 2.2 may also have this error), Oracle Java Hotspot 1.8.0, 1 
> master and 8 workers as follows.
> !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Workers.png|width=100%!
> This application launched 32 executors. Each executor has 1 core and 7GB 
> memory. The detailed application configuration is
> {code}
>total-executor-cores = 32
>executor-cores = 1 
>executor-memory = 7G
>spark.default.parallelism=32 
>spark.serializer = JavaSerializer (KryoSerializer also has OOM error)
> {code}
> *[Failure symptoms]*
> This application has a map stage and a reduce stage. An OOM error occurs in a 
> reduce task (Task-17) as follows.
> !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Stage.png|width=100%!
> !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Tasks.png|width=100%!
>  
> Task-17 generated an OOM error. It shuffled ~1GB data and spilled 3.6GB data 
> onto the disk.
> Task-17 log below shows that this task is reading the next record by invoking 
> _ExternalAppendOnlyMap.hasNext_(). From the OOM stack traces and the above 
> shuffle metrics, we cannot identify the OOM root causes. 
> !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/OOMStackTrace.png|

[jira] [Commented] (SPARK-22713) OOM caused by the memory contention and memory leak in TaskMemoryManager

2018-05-20 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481881#comment-16481881
 ] 

Eyal Farago commented on SPARK-22713:
-

[~jerrylead], what's the relation to SPARK-22286? does solving this at least 
partially solve that as well?

> OOM caused by the memory contention and memory leak in TaskMemoryManager
> 
>
> Key: SPARK-22713
> URL: https://issues.apache.org/jira/browse/SPARK-22713
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.1.1, 2.1.2
>Reporter: Lijie Xu
>Priority: Critical
>
> The pdf version of this issue with high-quality figures is available at 
> https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/report/OOM-TaskMemoryManager.pdf.
> *[Abstract]* 
> I recently encountered an OOM error in a PageRank application 
> (_org.apache.spark.examples.SparkPageRank_). After profiling the application, 
> I found the OOM error is related to the memory contention in shuffle spill 
> phase. Here, the memory contention means that a task tries to release some 
> old memory consumers from memory for keeping the new memory consumers. After 
> analyzing the OOM heap dump, I found the root cause is a memory leak in 
> _TaskMemoryManager_. Since memory contention is common in shuffle phase, this 
> is a critical bug/defect. In the following sections, I will use the 
> application dataflow, execution log, heap dump, and source code to identify 
> the root cause.
> *[Application]* 
> This is a PageRank application from Spark’s example library. The following 
> figure shows the application dataflow. The source code is available at \[1\].
> !https://raw.githubusercontent.com/JerryLead/Misc/master/OOM-TasksMemoryManager/figures/PageRankDataflow.png|width=100%!
> *[Failure symptoms]*
> This application has a map stage and many iterative reduce stages. An OOM 
> error occurs in a reduce task (Task-28) as follows.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/Stage.png?raw=true|width=100%!
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/task.png?raw=true|width=100%!
>  
> *[OOM root cause identification]*
> Each executor has 1 CPU core and 6.5GB memory, so it only runs one task at a 
> time. After analyzing the application dataflow, error log, heap dump, and 
> source code, I found the following steps lead to the OOM error. 
> => The MemoryManager found that there is not enough memory to cache the 
> _links:ShuffledRDD_ (rdd-5-28, red circles in the dataflow figure).
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/ShuffledRDD.png?raw=true|width=100%!
> => The task needs to shuffle twice (1st shuffle and 2nd shuffle in the 
> dataflow figure).
> => The task needs to generate two _ExternalAppendOnlyMap_ (E1 for 1st shuffle 
> and E2 for 2nd shuffle) in sequence.
> => The 1st shuffle begins and ends. E1 aggregates all the shuffled data of 
> 1st shuffle and achieves 3.3 GB.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/FirstShuffle.png?raw=true|width=100%!
> => The 2nd shuffle begins. E2 is aggregating the shuffled data of 2nd 
> shuffle, and finding that there is not enough memory left. This triggers the 
> memory contention.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/SecondShuffle.png?raw=true|width=100%!
> => To handle the memory contention, the _TaskMemoryManager_ releases E1 
> (spills it onto disk) and assumes that the 3.3GB space is free now.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/MemoryContention.png?raw=true|width=100%!
> => E2 continues to aggregates the shuffled records of 2nd shuffle. However, 
> E2 encounters an OOM error while shuffling.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/OOMbefore.png?raw=true|width=100%!
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/OOMError.png?raw=true|width=100%!
> *[Guess]* 
> The task memory usage below reveals that there is not memory drop down. So, 
> the cause may be that the 3.3GB _ExternalAppendOnlyMap_ (E1) is not actually 
> released by the _TaskMemoryManger_. 
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/GCFigure.png?raw=true|width=100%!
> *[Root cause]* 
> After analyzing the heap dump, I found the guess is right (the 3.3GB 
> _ExternalAppendOnlyMap_ is actually not released). The 1.6GB object is 
> _ExternalAppendOnlyMap (E2)_.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/heapdump.png?raw=true|width=100%!
> *[Question]* 
> Why the released _ExternalAppendOnlyMap_ is still in memory?

[jira] [Commented] (SPARK-22713) OOM caused by the memory contention and memory leak in TaskMemoryManager

2018-05-18 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481468#comment-16481468
 ] 

Eyal Farago commented on SPARK-22713:
-

[~jerrylead], excellent investigation and description of the issue, I'll open a 
PR shortly.

> OOM caused by the memory contention and memory leak in TaskMemoryManager
> 
>
> Key: SPARK-22713
> URL: https://issues.apache.org/jira/browse/SPARK-22713
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.1.1, 2.1.2
>Reporter: Lijie Xu
>Priority: Critical
>
> The pdf version of this issue with high-quality figures is available at 
> https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/report/OOM-TaskMemoryManager.pdf.
> *[Abstract]* 
> I recently encountered an OOM error in a PageRank application 
> (_org.apache.spark.examples.SparkPageRank_). After profiling the application, 
> I found the OOM error is related to the memory contention in shuffle spill 
> phase. Here, the memory contention means that a task tries to release some 
> old memory consumers from memory for keeping the new memory consumers. After 
> analyzing the OOM heap dump, I found the root cause is a memory leak in 
> _TaskMemoryManager_. Since memory contention is common in shuffle phase, this 
> is a critical bug/defect. In the following sections, I will use the 
> application dataflow, execution log, heap dump, and source code to identify 
> the root cause.
> *[Application]* 
> This is a PageRank application from Spark’s example library. The following 
> figure shows the application dataflow. The source code is available at \[1\].
> !https://raw.githubusercontent.com/JerryLead/Misc/master/OOM-TasksMemoryManager/figures/PageRankDataflow.png|width=100%!
> *[Failure symptoms]*
> This application has a map stage and many iterative reduce stages. An OOM 
> error occurs in a reduce task (Task-28) as follows.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/Stage.png?raw=true|width=100%!
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/task.png?raw=true|width=100%!
>  
> *[OOM root cause identification]*
> Each executor has 1 CPU core and 6.5GB memory, so it only runs one task at a 
> time. After analyzing the application dataflow, error log, heap dump, and 
> source code, I found the following steps lead to the OOM error. 
> => The MemoryManager found that there is not enough memory to cache the 
> _links:ShuffledRDD_ (rdd-5-28, red circles in the dataflow figure).
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/ShuffledRDD.png?raw=true|width=100%!
> => The task needs to shuffle twice (1st shuffle and 2nd shuffle in the 
> dataflow figure).
> => The task needs to generate two _ExternalAppendOnlyMap_ (E1 for 1st shuffle 
> and E2 for 2nd shuffle) in sequence.
> => The 1st shuffle begins and ends. E1 aggregates all the shuffled data of 
> 1st shuffle and achieves 3.3 GB.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/FirstShuffle.png?raw=true|width=100%!
> => The 2nd shuffle begins. E2 is aggregating the shuffled data of 2nd 
> shuffle, and finding that there is not enough memory left. This triggers the 
> memory contention.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/SecondShuffle.png?raw=true|width=100%!
> => To handle the memory contention, the _TaskMemoryManager_ releases E1 
> (spills it onto disk) and assumes that the 3.3GB space is free now.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/MemoryContention.png?raw=true|width=100%!
> => E2 continues to aggregates the shuffled records of 2nd shuffle. However, 
> E2 encounters an OOM error while shuffling.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/OOMbefore.png?raw=true|width=100%!
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/OOMError.png?raw=true|width=100%!
> *[Guess]* 
> The task memory usage below reveals that there is not memory drop down. So, 
> the cause may be that the 3.3GB _ExternalAppendOnlyMap_ (E1) is not actually 
> released by the _TaskMemoryManger_. 
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/GCFigure.png?raw=true|width=100%!
> *[Root cause]* 
> After analyzing the heap dump, I found the guess is right (the 3.3GB 
> _ExternalAppendOnlyMap_ is actually not released). The 1.6GB object is 
> _ExternalAppendOnlyMap (E2)_.
> !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/heapdump.png?raw=true|width=100%!
> *[Question]* 
> Why the released _ExternalAppendOnlyMap_ is still in memory?
> The source co

[jira] [Commented] (SPARK-21109) union two dataset[A] don't work as expected if one of the datasets is originated from a dataframe

2018-04-26 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453625#comment-16453625
 ] 

Eyal Farago commented on SPARK-21109:
-

I've also encountered this issue in two separate ways,

in one of them I got a cast exception similar to the one [~superwai] 
encountered, however in a slightly different one I got wrong results:


{code:java}
case class DataV1(a: Int)
case class DataV2(a: Int, b: Int)

test("weired spark bug"){
  import sqlContext.implicits._
  val df1 = List(DataV1(1)).toDS().toDF()
  val df2 = List(DataV2(2, 3)).toDS().toDF()

  val df3 = df1
.withColumn("commitUUID", functions lit 1)
.withColumn("b", functions lit 42)

  val df4 = df2
.withColumn("commitUUID", functions lit 2)
.withColumn("b", functions.coalesce($"b", functions lit 42))

  val u = df3 union df4
  u.as[DataV2].show
}
and the output:
+---+--+---+
|  a|commitUUID|  b|
+---+--+---+
|  1| 1| 42|
|  2| 3|  2|
+---+--+---+
{code}

you can see that in the second line value 2 was assigned to column 'b' instead 
of 'commitUUID' which was assigned with b's value (3).
slightly modifying this example and making the commitUUID field a String 
results with an exception similar to the one [~superwai] reported:


{code:java}
Cannot up cast `b` from string to int as it may truncate
The type path of the target object is:
- field (class: "scala.Int", name: "b")
- root class: "com.nrgene.genomagic.pipeline.spark.DataV2"
You can either add an explicit cast to the input data or choose a higher 
precision type of the field in the target object;
org.apache.spark.sql.AnalysisException: Cannot up cast `b` from string to int 
as it may truncate
The type path of the target object is:
- field (class: "scala.Int", name: "b")
- root class: "com.nrgene.genomagic.pipeline.spark.DataV2"
You can either add an explicit cast to the input data or choose a higher 
precision type of the field in the target object;
{code}

[~viirya], I've seen [~rxin] comment on SPARK-21043 and I read your reasoning 
in the comments above, and I must says this looks like 'justifying' the 
original implementation detail. it simply wasn't handled properly when the 
union method was first introduced, then some people wrote code that relied on 
the 'by index' mapping and by then it could no longer be modified...

I think you can see why it doesn't make much sense from the user's perspective, 
since as far as she cares both sides of the union are of the same type, the 
fact union can't see it is a bug in its implementation which is now deemed as a 
feature.

If we put aside the backward compatibility issue, then fixing this is as simple 
as adding an analyzer rules that introduces a project if needed, an alternative 
might be to reject this use case during analysis. the way I see it current 
state of affairs is greatly misleading.

[~superwai], one way of dealing with this situation (aside from upgrading spark 
and using the newly introduced unionByName) is to extract the fields order of 
one side and 'force' it on the other:


{code:java}
//make sure to align the schema
val right2 = right.select(left.columns.map(functions.col _) : _*)
left union right2
{code}



> union two dataset[A] don't work as expected if one of the datasets is 
> originated from a dataframe
> -
>
> Key: SPARK-21109
> URL: https://issues.apache.org/jira/browse/SPARK-21109
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Jerry Lam
>Priority: Major
>
> To reproduce the issue:
> {code}
> case class my_case(id0: Long, id1: Int, id2: Int, id3: String)
> val data1 = Seq(my_case(0L, 0, 0, "0")).toDS
> val data2 = Seq(("1", 1, 1, 1L)).toDF("id3", "id1", "id2", "id0").as[my_case]
> data1.show
> +---+---+---+---+
> |id0|id1|id2|id3|
> +---+---+---+---+
> |  0|  0|  0|  0|
> +---+---+---+---+
> data2.show
> +---+---+---+---+
> |id3|id1|id2|id0|
> +---+---+---+---+
> |  1|  1|  1|  1|
> +---+---+---+---+
> data1.union(data2).show
> org.apache.spark.sql.AnalysisException: Cannot up cast `id0` from string to 
> bigint as it may truncate
> The type path of the target object is:
> - field (class: "scala.Long", name: "id0")
> - root class: "my_case"
> You can either add an explicit cast to the input data or choose a higher 
> precision type of the field in the target object;
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2123)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34$$anonfun$applyOrElse$14.applyOrElse(Analyzer.scala:2153)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer

[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast

2018-02-09 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358668#comment-16358668
 ] 

Eyal Farago commented on SPARK-19870:
-

I'll remember to share relevant future logs😎
Re. The exception code path missing a cleanup, you're definitely right but I'm 
less concerned about this one as this code path is 'reserved' to tasks (I don't 
think Netty threads ever gets to this code) hence cleanup (+warning) is 
guaranteed.

> Repeatable deadlock on BlockInfoManager and TorrentBroadcast
> 
>
> Key: SPARK-19870
> URL: https://issues.apache.org/jira/browse/SPARK-19870
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Shuffle
>Affects Versions: 2.0.2, 2.1.0
> Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, 
> yarn coarse-grained.
>Reporter: Steven Ruppert
>Priority: Major
> Attachments: cs.executor.log, stack.txt
>
>
> Running what I believe to be a fairly vanilla spark job, using the RDD api, 
> with several shuffles, a cached RDD, and finally a conversion to DataFrame to 
> save to parquet. I get a repeatable deadlock at the very last reducers of one 
> of the stages.
> Roughly:
> {noformat}
> "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 
> tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry 
> [0x7fffb95f3000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207)
> - waiting to lock <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x0005b12f2290> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> and 
> {noformat}
> "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 
> tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at 
> org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
> - locked <0x000545736b58> (a 
> org.apache.spark.storage.BlockInfoManager)
> at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
> - locked <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x00059711eb10> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> jav

[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast

2018-02-09 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358458#comment-16358458
 ] 

Eyal Farago commented on SPARK-19870:
-

[~irashid], I'm afraid I don't have a documentation of which executor got to 
this hang, so I can't think of a way to find its logs (on top of this the 
spark-ui via history server seems a bit unreliable, i.e. jobs 'running' in the 
ui are rported to complete in the executor logs).

can you please share, what is it you're looking for in the executor logs? as 
you could see in the one I've shared spark's logging level is set to WARN so 
there's not much into it...

> Repeatable deadlock on BlockInfoManager and TorrentBroadcast
> 
>
> Key: SPARK-19870
> URL: https://issues.apache.org/jira/browse/SPARK-19870
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Shuffle
>Affects Versions: 2.0.2, 2.1.0
> Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, 
> yarn coarse-grained.
>Reporter: Steven Ruppert
>Priority: Major
> Attachments: cs.executor.log, stack.txt
>
>
> Running what I believe to be a fairly vanilla spark job, using the RDD api, 
> with several shuffles, a cached RDD, and finally a conversion to DataFrame to 
> save to parquet. I get a repeatable deadlock at the very last reducers of one 
> of the stages.
> Roughly:
> {noformat}
> "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 
> tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry 
> [0x7fffb95f3000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207)
> - waiting to lock <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x0005b12f2290> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> and 
> {noformat}
> "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 
> tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at 
> org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
> - locked <0x000545736b58> (a 
> org.apache.spark.storage.BlockInfoManager)
> at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
> - locked <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x00059711eb10> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apac

[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast

2018-02-08 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356992#comment-16356992
 ] 

Eyal Farago commented on SPARK-19870:
-

[~irashid], attached a sample executor log

> Repeatable deadlock on BlockInfoManager and TorrentBroadcast
> 
>
> Key: SPARK-19870
> URL: https://issues.apache.org/jira/browse/SPARK-19870
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Shuffle
>Affects Versions: 2.0.2, 2.1.0
> Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, 
> yarn coarse-grained.
>Reporter: Steven Ruppert
>Priority: Major
> Attachments: cs.executor.log, stack.txt
>
>
> Running what I believe to be a fairly vanilla spark job, using the RDD api, 
> with several shuffles, a cached RDD, and finally a conversion to DataFrame to 
> save to parquet. I get a repeatable deadlock at the very last reducers of one 
> of the stages.
> Roughly:
> {noformat}
> "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 
> tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry 
> [0x7fffb95f3000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207)
> - waiting to lock <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x0005b12f2290> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> and 
> {noformat}
> "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 
> tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at 
> org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
> - locked <0x000545736b58> (a 
> org.apache.spark.storage.BlockInfoManager)
> at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
> - locked <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x00059711eb10> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> A full stack trace is attached, but those seem to be the offending threads.
> This happens across several

[jira] [Updated] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast

2018-02-08 Thread Eyal Farago (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eyal Farago updated SPARK-19870:

Attachment: cs.executor.log

> Repeatable deadlock on BlockInfoManager and TorrentBroadcast
> 
>
> Key: SPARK-19870
> URL: https://issues.apache.org/jira/browse/SPARK-19870
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Shuffle
>Affects Versions: 2.0.2, 2.1.0
> Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, 
> yarn coarse-grained.
>Reporter: Steven Ruppert
>Priority: Major
> Attachments: cs.executor.log, stack.txt
>
>
> Running what I believe to be a fairly vanilla spark job, using the RDD api, 
> with several shuffles, a cached RDD, and finally a conversion to DataFrame to 
> save to parquet. I get a repeatable deadlock at the very last reducers of one 
> of the stages.
> Roughly:
> {noformat}
> "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 
> tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry 
> [0x7fffb95f3000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207)
> - waiting to lock <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x0005b12f2290> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> and 
> {noformat}
> "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 
> tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at 
> org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
> - locked <0x000545736b58> (a 
> org.apache.spark.storage.BlockInfoManager)
> at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
> - locked <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x00059711eb10> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> A full stack trace is attached, but those seem to be the offending threads.
> This happens across several different executors, and has persisted through 
> several runs of th

[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast

2018-02-07 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355174#comment-16355174
 ] 

Eyal Farago commented on SPARK-19870:
-

[~irashid], wenth through executors' logs and found no errors.

did find some decent GC pauses and warnings like this: 
{quote}WARN Executor: 1 block locks were not released by TID = 5413: [rdd_53_42]
{quote}
this aligns with my findings of tasks cleaning after themselves, theoretically 
in case of failure as well.

since both distros we're using do not offer any of the relevant versions 
including your fix, I can't test this scenario with your patch (aside from 
other practical constraints :-)) so I can't really verify my assumption about 
the link between SPARK-19870 and SPARK-22083.

[~stevenruppert], [~gdanov], can you guys try running with one of the relevant 
versions?

> Repeatable deadlock on BlockInfoManager and TorrentBroadcast
> 
>
> Key: SPARK-19870
> URL: https://issues.apache.org/jira/browse/SPARK-19870
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Shuffle
>Affects Versions: 2.0.2, 2.1.0
> Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, 
> yarn coarse-grained.
>Reporter: Steven Ruppert
>Priority: Major
> Attachments: stack.txt
>
>
> Running what I believe to be a fairly vanilla spark job, using the RDD api, 
> with several shuffles, a cached RDD, and finally a conversion to DataFrame to 
> save to parquet. I get a repeatable deadlock at the very last reducers of one 
> of the stages.
> Roughly:
> {noformat}
> "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 
> tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry 
> [0x7fffb95f3000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207)
> - waiting to lock <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x0005b12f2290> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> and 
> {noformat}
> "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 
> tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at 
> org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
> - locked <0x000545736b58> (a 
> org.apache.spark.storage.BlockInfoManager)
> at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
> - locked <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x00059711eb10> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(Shuff

[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast

2018-02-06 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353651#comment-16353651
 ] 

Eyal Farago commented on SPARK-19870:
-

we've also seen something very similar (stack traces attach) with spark 2.1.0, 
in our case this happened after several OOM conditions and executors being 
killed and restarted (new AWS account was limited on the chosen instance type 
we chose).

I've stared at these stack traces a lot and 'walked alone' the block manager 
(and block info manager) code paths, my conclusion was that a thread leaks a 
writer lock for at least one block, further investigation shows that tasks are 
registered for cleanup which should eliminate the potential problem (block info 
manager maintains maps of readers/writers per block).

However, there are other threads living inside a spark executor, such as the 
BlockTransferService that are not tasks hence have no ID and guaranteed 
cleanup...

I think I found a potential weak spot in the way spark evicts blocks from 
memory (org.apache.spark.storage.memory.MemoryStore#evictBlocksToFreeSpace) 
that has the potential of keep holding locks in case of failure. SPARK-22083 
seems to fix this so I'd recommend trying out one of the versions including the 
fix (we're currently checking the possibility to do so with the distros we're 
using).

 

[~irashid], you fixed SPARK–22083, do the scenarios referred in this issue 
resembles what you were fixing? does my analysis make sense to you?

 

attaching my stack traces:
{noformat}
Thread ID   Thread Name Thread StateThread Locks

148 Executor task launch worker for task 75639  WAITING 
Lock(java.util.concurrent.ThreadPoolExecutor$Worker@155076597})
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:502)
org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:450)
org.apache.spark.storage.BlockManager.get(BlockManager.scala:636)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:693)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
scala.collection.immutable.List.foreach(List.scala:381)
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
org.apache.spark.scheduler.Task.run(Task.scala:99)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

168 Executor task launch worker for task 75642  WAITING 
Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1540715596})
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:502)
org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:450)
org.apache.spark.storage.BlockManager.get(BlockManager.scala:636)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:693)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
org.apache.spark.rdd.CoGroup

[jira] [Commented] (SPARK-18067) SortMergeJoin adds shuffle if join predicates have non partitioned columns

2018-02-05 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352594#comment-16352594
 ] 

Eyal Farago commented on SPARK-18067:
-

[~tejasp], this issue + associated pull-request seems to relax the distribution 
requirements of the top-level join,

I have an opposite case where the join has one key but one of its children uses 
two keys (including the join key) for aggregation.

 

spark 2.1 plans this (poorly?) with two exchange operators, one for the 
aggregate based on both keys and another for the join (based on one key).

 

I think that in this case the planner could relax distribution requirements of 
the aggregation (child) and rely on the fact the distributing by one key 
implies distributing by both keys (under the limitations that data might get 
skewed, we may need to spill during aggregation, etc...).

 

few questions:
 # is there any specific reason your approach is limited to joins rather that 
general use cases of 'stacked' exchange operators?
 # do you think there's room for a second strategy relaxing the distribution 
requirements of the inner exchange (aggregate in my case).

I'm also attaching an example plan I'm currently getting (slightly modified):
{noformat}
== Physical Plan ==
*Project [v1 AS v1#472, v2#311L AS v2#473L, v3#312L AS v3#474L, key1#355L AS 
k1#475L, key2#385 AS key2#477]
+- *SortMergeJoin [key1#355L], [key1#304L], Inner
  :- *Sort [key1#355L ASC NULLS FIRST], false, 0
  :  +- Exchange hashpartitioning(key1#355L, 8)
  : +- *HashAggregate(keys=[key1#355L, key2#385], functions=[], 
output=[key1#355L, key2#385])
  :+- Exchange hashpartitioning(key1#355L, key2#385, 8)
  :   +- *HashAggregate(keys=[key1#355L, key2#385], functions=[], 
output=[key1#355L, key2#385])
  :  +- *Project [key1#355L, UDF(v4#365) AS key2#385]
  : +- *Filter ((isnotnull(key1#355L) && NOT (key1#355L = -1)) 
&& v4#365 IN (s1,s2,s3,s4))
  :+- *FileScan parquet [key1#355L,v4#365,commitUUID#366] 
Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:
mp/loads_only_samples_in_comparison_list5387414497663000634/008d9ea9-a0c..., 
PartitionCount: 1, PartitionFilters: [commitUUID#366 IN 
(oldCommit,commit-uuid)], PushedFilters: [IsNotNull(key1), 
Not(EqualTo(key1,-1)), In(v4, [s,s2,s3..., ReadSchema: 
struct
  +- *Sort [key1#304L ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(key1#304L, 8)
+- *Project [key1#304L, v1, v2#311L, v3#312L]
{noformat}

notice the two exchanges in the join's first child.

> SortMergeJoin adds shuffle if join predicates have non partitioned columns
> --
>
> Key: SPARK-18067
> URL: https://issues.apache.org/jira/browse/SPARK-18067
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Paul Jones
>Priority: Minor
>
> Basic setup
> {code}
> scala> case class Data1(key: String, value1: Int)
> scala> case class Data2(key: String, value2: Int)
> scala> val partition1 = sc.parallelize(1 to 10).map(x => Data1(s"$x", x))
> .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> scala> val partition2 = sc.parallelize(1 to 10).map(x => Data2(s"$x", x))
> .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> {code}
> Join on key
> {code}
> scala> partition1.join(partition2, "key").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [key#0], [key#12]
>:- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation 
> [key#0,value1#1], true, 1, StorageLevel(true, true, false, true, 1), Sort 
> [key#0 ASC], false, 0, None
>+- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation 
> [key#12,value2#13], true, 1, StorageLevel(true, true, false, true, 1), 
> Sort [key#12 ASC], false, 0, None
> {code}
> And we get a super efficient join with no shuffle.
> But if we add a filter our join gets less efficient and we end up with a 
> shuffle.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" === 
> $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [value1#1,key#0], [value2#13,key#12]
>:- Sort [value1#1 ASC,key#0 ASC], false, 0
>:  +- TungstenExchange hashpartitioning(value1#1,key#0,200), None
>: +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation 
> [key#0,value1#1], true, 1, StorageLevel(true, true, false, true, 1), Sort 
> [key#0 ASC], false, 0, None
>+- Sort [value2#13 ASC,key#12 ASC], false, 0
>   +- TungstenExchange hashpartitioning(value2#13,key#12,200), None
>  +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation 
> [key#12,value2#13], true, 1, St

[jira] [Commented] (SPARK-21867) Support async spilling in UnsafeShuffleWriter

2017-12-12 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16287461#comment-16287461
 ] 

Eyal Farago commented on SPARK-21867:
-

[~ericvandenbergfb], looks good few questions though:
1. initial number of sorters?
2. assuming initial number of sorters >1, doesn't this mean you're potentially 
increasing number of spills? if a single sorter would spill after N records, a 
multi-sorter will spill after N/k records (where k is the number of sorters), 
doesn't this mean more spills and merges?
3. when a sorter hits the spill threshold, does it immediately spill or will it 
keep going if there's available execution memory? it might make sense to spill 
and raise the threshold in this condition...
4. merging spills may require some attention: compression, encryption, etc. 
also avoiding too many open files/buffers at the same time

looking forward for the PR

> Support async spilling in UnsafeShuffleWriter
> -
>
> Key: SPARK-21867
> URL: https://issues.apache.org/jira/browse/SPARK-21867
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Sital Kedia
>Priority: Minor
> Attachments: Async ShuffleExternalSorter.pdf
>
>
> Currently, Spark tasks are single-threaded. But we see it could greatly 
> improve the performance of the jobs, if we can multi-thread some part of it. 
> For example, profiling our map tasks, which reads large amount of data from 
> HDFS and spill to disks, we see that we are blocked on HDFS read and spilling 
> majority of the time. Since both these operations are IO intensive the 
> average CPU consumption during map phase is significantly low. In theory, 
> both HDFS read and spilling can be done in parallel if we had additional 
> memory to store data read from HDFS while we are spilling the last batch read.
> Let's say we have 1G of shuffle memory available per task. Currently, in case 
> of map task, it reads from HDFS and the records are stored in the available 
> memory buffer. Once we hit the memory limit and there is no more space to 
> store the records, we sort and spill the content to disk. While we are 
> spilling to disk, since we do not have any available memory, we can not read 
> from HDFS concurrently. 
> Here we propose supporting async spilling for UnsafeShuffleWriter, so that we 
> can support reading from HDFS when sort and spill is happening 
> asynchronously.  Let's say the total 1G of shuffle memory can be split into 
> two regions - active region and spilling region - each of size 500 MB. We 
> start with reading from HDFS and filling the active region. Once we hit the 
> limit of active region, we issue an asynchronous spill, while fliping the 
> active region and spilling region. While the spil is happening 
> asynchronosuly, we still have 500 MB of memory available to read the data 
> from HDFS. This way we can amortize the high disk/network io cost during 
> spilling.
> We made a prototype hack to implement this feature and we could see our map 
> tasks were as much as 40% faster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming

2017-11-26 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16266102#comment-16266102
 ] 

Eyal Farago commented on SPARK-22579:
-

[~srowen], I couldn't see a place where this data is stored for later use, even 
in the 'big fetch' scenario the file isn't reused as 
org.apache.spark.storage.BlockManager#remoteBlockTempFileManager is only used 
to stream big file - once, it's never used to figure out if the download can be 
avoided. furthermore this blocks are also not registered with the block 
manager, after all they can be used as 'DISK_ONLY' cache on the requesting 
executor and potentially even by other executors residing on the same node.

[~jerryshao], it seems the 'big files' code path actually uses streaming, makes 
me think it can be slightly modified to actually accept a stream handler 
(current behavior can be achieved by passing in a download stream handler).

one thing that does have the potential for troubles is error recovery when more 
than one executor can serve the block, current impl simply moves to the next 
one, a streaming based approach would have to  request the rest of the block 
from an other executor (assuming blocks are identical).

> BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be 
> implemented using streaming
> --
>
> Key: SPARK-22579
> URL: https://issues.apache.org/jira/browse/SPARK-22579
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Spark Core
>Affects Versions: 2.1.0
>Reporter: Eyal Farago
>
> when an RDD partition is cached on an executor bu the task requiring it is 
> running on another executor (process locality ANY), the cached partition is 
> fetched via BlockManager.getRemoteValues which delegates to 
> BlockManager.getRemoteBytes, both calls are blocking.
> in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes 
> cluster, cached to disk. rough math shows that average partition size is 
> 700MB.
> looking at spark UI it was obvious that tasks running with process locality 
> 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I 
> was able to capture thread dumps of executors executing remote tasks and got 
> this stake trace:
> {quote}Thread ID  Thread Name Thread StateThread Locks
> 1521  Executor task launch worker-1000WAITING 
> Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978})
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> scala.concurrent.Await$.result(package.scala:190)
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190)
> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104)
> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582)
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550)
> org.apache.spark.storage.BlockManager.get(BlockManager.scala:638)
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690)
> org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote}
> digging into the code showed that the block manager first fetches all bytes 
> (getRemoteBytes) and then wraps it with a deserialization stream, this has 
> several draw backs:
> 1. blocking, requesting executor is blocked while the remote executor is 
> serving the block.
> 2. potentially large memory footprint on requesting executor, in my use case 
> a 700mb of raw bytes stored in a ChunkedByteBuffer.
> 3. inefficient, requesting side usual

[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming

2017-11-24 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265045#comment-16265045
 ] 

Eyal Farago commented on SPARK-22579:
-

[~jerryshao], SPARK-22062 seems to solve the memory footprint issue, will take 
a deeper look at it (and pray fro mapR to upgrade their spark version).

[~srowen], not sure what do you mean by reading the data twice, getRemoteValues 
returns an iterator, if called twice on the same block it will fetch the data 
twice, even with current implementation.
and yes I agree I need to read the data anyway, but I don't need all of it at 
once, executors consume partitions via iterators, this allows for 
streaming/pagination behind the scenes which is currently not implemented by 
the block manager.

btw, this kind of streaming/pagination can play nicely with SPARK-22062 and any 
potential caching mechanism that night be introduced into the block manager in 
the future.

I'd also like to stress out that what we've experienced was lengthy tasks, 
given that these tasks were 'dead in the water' waiting for the transfer to 
complete is a bit unfortunate as they could have make progress with the part of 
the transfer that already completed. I believe streaming/pagination or some 
other reactive approach can greatly improve this behavior.


> BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be 
> implemented using streaming
> --
>
> Key: SPARK-22579
> URL: https://issues.apache.org/jira/browse/SPARK-22579
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Spark Core
>Affects Versions: 2.1.0
>Reporter: Eyal Farago
>
> when an RDD partition is cached on an executor bu the task requiring it is 
> running on another executor (process locality ANY), the cached partition is 
> fetched via BlockManager.getRemoteValues which delegates to 
> BlockManager.getRemoteBytes, both calls are blocking.
> in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes 
> cluster, cached to disk. rough math shows that average partition size is 
> 700MB.
> looking at spark UI it was obvious that tasks running with process locality 
> 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I 
> was able to capture thread dumps of executors executing remote tasks and got 
> this stake trace:
> {quote}Thread ID  Thread Name Thread StateThread Locks
> 1521  Executor task launch worker-1000WAITING 
> Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978})
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> scala.concurrent.Await$.result(package.scala:190)
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190)
> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104)
> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582)
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550)
> org.apache.spark.storage.BlockManager.get(BlockManager.scala:638)
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690)
> org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote}
> digging into the code showed that the block manager first fetches all bytes 
> (getRemoteBytes) and then wraps it with a deserialization stream, this has 
> several draw backs:
> 1. blocking, requesting executor is blocked while the remote executor is 
> serving the block.
> 2. potentially large memory footprint on requesting executor, in my use ca

[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming

2017-11-22 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262120#comment-16262120
 ] 

Eyal Farago commented on SPARK-22579:
-

CC: [~hvanhovell] (we've discussed this privately), [~joshrosen], 
[~jerryshao2015] (git annotations points at you guys :-) )

> BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be 
> implemented using streaming
> --
>
> Key: SPARK-22579
> URL: https://issues.apache.org/jira/browse/SPARK-22579
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.1.0
>Reporter: Eyal Farago
>
> when an RDD partition is cached on an executor bu the task requiring it is 
> running on another executor (process locality ANY), the cached partition is 
> fetched via BlockManager.getRemoteValues which delegates to 
> BlockManager.getRemoteBytes, both calls are blocking.
> in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes 
> cluster, cached to disk. rough math shows that average partition size is 
> 700MB.
> looking at spark UI it was obvious that tasks running with process locality 
> 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I 
> was able to capture thread dumps of executors executing remote tasks and got 
> this stake trace:
> {quote}Thread ID  Thread Name Thread StateThread Locks
> 1521  Executor task launch worker-1000WAITING 
> Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978})
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> scala.concurrent.Await$.result(package.scala:190)
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190)
> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104)
> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582)
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550)
> org.apache.spark.storage.BlockManager.get(BlockManager.scala:638)
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690)
> org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote}
> digging into the code showed that the block manager first fetches all bytes 
> (getRemoteBytes) and then wraps it with a deserialization stream, this has 
> several draw backs:
> 1. blocking, requesting executor is blocked while the remote executor is 
> serving the block.
> 2. potentially large memory footprint on requesting executor, in my use case 
> a 700mb of raw bytes stored in a ChunkedByteBuffer.
> 3. inefficient, requesting side usually don't need all values at once as it 
> consumes the values via an iterator.
> 4. potentially large memory footprint on serving executor, in case the block 
> is cached in deserialized form the serving executor has to serialize it into 
> a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU 
> intensive, memory footprint can be reduced by using a limited buffer for 
> serialization 'spilling' to the response stream.
> I suggest improving this either by implementing full streaming mechanism or 
> some kind of pagination mechanism, in addition the requesting executor should 
> be able to make progress with the data it already has, blocking only when 
> local buffer is exhausted and remote side didn't deliver the next chunk of 
> the stream (or page in case of pagination) yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-

[jira] [Created] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming

2017-11-21 Thread Eyal Farago (JIRA)
Eyal Farago created SPARK-22579:
---

 Summary: BlockManager.getRemoteValues and 
BlockManager.getRemoteBytes should be implemented using streaming
 Key: SPARK-22579
 URL: https://issues.apache.org/jira/browse/SPARK-22579
 Project: Spark
  Issue Type: Bug
  Components: Block Manager, Spark Core
Affects Versions: 2.1.0
Reporter: Eyal Farago


when an RDD partition is cached on an executor bu the task requiring it is 
running on another executor (process locality ANY), the cached partition is 
fetched via BlockManager.getRemoteValues which delegates to 
BlockManager.getRemoteBytes, both calls are blocking.
in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes 
cluster, cached to disk. rough math shows that average partition size is 700MB.
looking at spark UI it was obvious that tasks running with process locality 
'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I 
was able to capture thread dumps of executors executing remote tasks and got 
this stake trace:

{quote}Thread IDThread Name Thread StateThread Locks
1521Executor task launch worker-1000WAITING 
Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978})
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
scala.concurrent.Await$.result(package.scala:190)
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190)
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104)
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582)
org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550)
org.apache.spark.storage.BlockManager.get(BlockManager.scala:638)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote}

digging into the code showed that the block manager first fetches all bytes 
(getRemoteBytes) and then wraps it with a deserialization stream, this has 
several draw backs:
1. blocking, requesting executor is blocked while the remote executor is 
serving the block.
2. potentially large memory footprint on requesting executor, in my use case a 
700mb of raw bytes stored in a ChunkedByteBuffer.
3. inefficient, requesting side usually don't need all values at once as it 
consumes the values via an iterator.
4. potentially large memory footprint on serving executor, in case the block is 
cached in deserialized form the serving executor has to serialize it into a 
ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU 
intensive, memory footprint can be reduced by using a limited buffer for 
serialization 'spilling' to the response stream.

I suggest improving this either by implementing full streaming mechanism or 
some kind of pagination mechanism, in addition the requesting executor should 
be able to make progress with the data it already has, blocking only when local 
buffer is exhausted and remote side didn't deliver the next chunk of the stream 
(or page in case of pagination) yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21907) NullPointerException in UnsafeExternalSorter.spill()

2017-09-10 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160366#comment-16160366
 ] 

Eyal Farago commented on SPARK-21907:
-

opened PR: https://github.com/apache/spark/pull/19181

> NullPointerException in UnsafeExternalSorter.spill()
> 
>
> Key: SPARK-21907
> URL: https://issues.apache.org/jira/browse/SPARK-21907
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Juliusz Sompolski
>
> I see NPE during sorting with the following stacktrace:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:383)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:63)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:43)
>   at 
> org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:270)
>   at org.apache.spark.util.collection.TimSort.sort(TimSort.java:142)
>   at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:345)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
>   at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>   at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:281)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:90)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset(UnsafeInMemorySorter.java:173)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:221)
>   at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>   at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:281)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:90)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:349)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:400)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:778)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextInnerJoinRows(SortMergeJoinExec.scala:685)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1$$anon$2.advanceNext(SortMergeJoinExec.scala:259)
>   at 
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:346)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.u

[jira] [Commented] (SPARK-21907) NullPointerException in UnsafeExternalSorter.spill()

2017-09-09 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160062#comment-16160062
 ] 

Eyal Farago commented on SPARK-21907:
-

[~juliuszsompolski], I've followed the stack trace you've attached (as much as 
it's possible with master's code) and I tend to agree with your assumption 
about UnsafeInMemorySorter.reset.
it seems that allocateArray did fail on OOM and triggered a nested spill, the 
thing is that by this point array points to an already freed block, the first 
time array is actually accessed (TimSort invokes the comparator) it fails on an 
NPE (assuming asserts are really turned off on your env).

i think the way to solve this is by temporarily setting array (and the relevant 
pos, capacity, etc) to null/zero/some other value indicating a currently 
unreadable/empty buffer.

[~kiszk], what do you think?

> NullPointerException in UnsafeExternalSorter.spill()
> 
>
> Key: SPARK-21907
> URL: https://issues.apache.org/jira/browse/SPARK-21907
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Juliusz Sompolski
>
> I see NPE during sorting with the following stacktrace:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:383)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:63)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:43)
>   at 
> org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:270)
>   at org.apache.spark.util.collection.TimSort.sort(TimSort.java:142)
>   at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:345)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
>   at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>   at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:281)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:90)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset(UnsafeInMemorySorter.java:173)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:221)
>   at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>   at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:281)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:90)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:349)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:400)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:778)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextInnerJoinRows(SortMergeJoinExec.scala:685)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1$$anon$2.advanceNext(SortMergeJoinExec.scala:259)
>   at 
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegen

[jira] [Commented] (SPARK-3151) DiskStore attempts to map any size BlockId without checking MappedByteBuffer limit

2017-08-08 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16118347#comment-16118347
 ] 

Eyal Farago commented on SPARK-3151:


created PR 18855: https://github.com/apache/spark/pull/18855

> DiskStore attempts to map any size BlockId without checking MappedByteBuffer 
> limit
> --
>
> Key: SPARK-3151
> URL: https://issues.apache.org/jira/browse/SPARK-3151
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 1.0.2
> Environment: IBM 64-bit JVM PPC64
>Reporter: Damon Brown
>Priority: Minor
>
> [DiskStore|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/DiskStore.scala]
>  attempts to memory map the block file in {{def getBytes}}.  If the file is 
> larger than 2GB (Integer.MAX_VALUE) as specified by 
> [FileChannel.map|http://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileChannel.html#map%28java.nio.channels.FileChannel.MapMode,%20long,%20long%29],
>  then the memory map fails.
> {code}
> Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length)) # line 
> 104
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3151) DiskStore attempts to map any size BlockId without checking MappedByteBuffer limit

2017-07-05 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074392#comment-16074392
 ] 

Eyal Farago commented on SPARK-3151:


I just hit this with spark 2.1 when processing a disk persisted RDD
while the root cause for this is probably data skew, this seems like a severe 
limitation on spark.
this specific failure is a bit surprising as spark already knows the block size 
on disk at this point (it maps the entire file from offset 0 to file size), so 
it should easily be possible to split this into several blocks, after all the 
code is using ChunkedByteBuffer.
a better approach would be lazily loading the blocks as deserialization 
progresses , and an even better solution (proposed by [~matei] in 
[comment|https://issues.apache.org/jira/browse/SPARK-1476?focusedCommentId=13967947&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13967947]
 for #1476) would be splitting these block in higher levels of spark (a mapper 
that produces multiple blocks, cached RDD partition that consists of several 
blocks...)

I can see the attached PR is closed, is there an expected/in-progress fix for 
this?


2017-07-05 07:04:51,368 UTC WARNtask-result-getter-0
org.apache.spark.scheduler.TaskSetManagerLost task 131.0 in stage 14.0 
(TID 2228, 172.20.1.137, executor 1): java.lang.IllegalArgumentException: Size 
exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at 
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:465)
at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:701)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

> DiskStore attempts to map any size BlockId without checking MappedByteBuffer 
> limit
> --
>
> Key: SPARK-3151
> URL: https://issues.apache.org/jira/browse/SPARK-3151
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 1.0.2
> Environment: IBM 64-bit JVM PPC64
>Reporter: Damon Brown
>Priority: Minor
>
> [DiskStore|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/DiskStore.scala]
>  attempts to memory map the block file in {{def getBytes}}.  If the file is 
> larger than 2GB (Integer.MAX_VALUE) as specified by 
> [FileChannel.map|http://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileChannel.html#map%28java.nio.channels.FileChannel.MapMode,%20long,%20long%29],
>  then the memory map fails.
> {code}
> Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length)) # line 
> 104
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast

2017-04-08 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15961954#comment-15961954
 ] 

Eyal Farago commented on SPARK-19870:
-

[~stevenruppert] logs from the hung executor might shed some light on this.
regarding your comment about jvm machinery not allowing the program to make 
progress is not completely accurate as the lock on the BlockinfoManager isn't 
really held when entering wait(), other locks held up in the stack are not 
mandatory in order to get to the BlockInfoManager's lock, my guess is that this 
block is somehow dropped from memory by other tasks running on this executor 
(drop codepath does not go through the TorrentBrodcast code path hence it can 
make progress without locking it.
[~joshrosen], it seems my 'theory' regarding the completion iterator does not 
hold as it calls the completion on a false returning hasNext() call which just 
doesn't happen in this case.

> Repeatable deadlock on BlockInfoManager and TorrentBroadcast
> 
>
> Key: SPARK-19870
> URL: https://issues.apache.org/jira/browse/SPARK-19870
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Shuffle
>Affects Versions: 2.0.2, 2.1.0
> Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, 
> yarn coarse-grained.
>Reporter: Steven Ruppert
> Attachments: stack.txt
>
>
> Running what I believe to be a fairly vanilla spark job, using the RDD api, 
> with several shuffles, a cached RDD, and finally a conversion to DataFrame to 
> save to parquet. I get a repeatable deadlock at the very last reducers of one 
> of the stages.
> Roughly:
> {noformat}
> "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 
> tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry 
> [0x7fffb95f3000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207)
> - waiting to lock <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x0005b12f2290> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> and 
> {noformat}
> "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 
> tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at 
> org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
> - locked <0x000545736b58> (a 
> org.apache.spark.storage.BlockInfoManager)
> at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
> - locked <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x00059711eb10> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> 

[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast

2017-04-06 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15959169#comment-15959169
 ] 

Eyal Farago commented on SPARK-19870:
-

Steven, I meant traces produced by logging.
Sean, this looks like a missed notification, one thread holds a lock and waits 
for a notification (via Java's synchronized,wait,notify mechanism), second 
thread attempts to sync on same object, while the first waits for 
notification+condition (that never happens).
This code is annotated with lots of traces that can shed some light on the 
issue.

> Repeatable deadlock on BlockInfoManager and TorrentBroadcast
> 
>
> Key: SPARK-19870
> URL: https://issues.apache.org/jira/browse/SPARK-19870
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Shuffle
>Affects Versions: 2.0.2, 2.1.0
> Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, 
> yarn coarse-grained.
>Reporter: Steven Ruppert
> Attachments: stack.txt
>
>
> Running what I believe to be a fairly vanilla spark job, using the RDD api, 
> with several shuffles, a cached RDD, and finally a conversion to DataFrame to 
> save to parquet. I get a repeatable deadlock at the very last reducers of one 
> of the stages.
> Roughly:
> {noformat}
> "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 
> tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry 
> [0x7fffb95f3000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207)
> - waiting to lock <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x0005b12f2290> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> and 
> {noformat}
> "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 
> tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at 
> org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
> - locked <0x000545736b58> (a 
> org.apache.spark.storage.BlockInfoManager)
> at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
> - locked <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x00059711eb10> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at 
> java.util.concurrent.ThreadPoolExecutor.r

[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast

2017-04-06 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15958894#comment-15958894
 ] 

Eyal Farago commented on SPARK-19870:
-

[~stevenruppert] can you attach traces?
[~joshrosen], a glance at the torrent broadcast code shows that it explicitly 
[releases the 
read-lock|https://github.com/apache/spark/blob/d009fb369bbea0df81bbcf9c8028d14cfcaa683b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L214]
 on the block even though the [iterator obtained from the 
BlockResult|https://github.com/apache/spark/blob/d009fb369bbea0df81bbcf9c8028d14cfcaa683b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L497]
 is a completion iterator that releases locks (read lock in this scenario) upon 
completion.

is it possible that the torrent code 'double-releases' the read lock on the 
relevant block putting it in some inconsistent state that prevents the next 
read from obtaining the lock (or somehow not notify on release)?

[~joshrosen], seems at least part of this is related to your work on 
SPARK-12757, hence tagging you.

> Repeatable deadlock on BlockInfoManager and TorrentBroadcast
> 
>
> Key: SPARK-19870
> URL: https://issues.apache.org/jira/browse/SPARK-19870
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Shuffle
>Affects Versions: 2.0.2, 2.1.0
> Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, 
> yarn coarse-grained.
>Reporter: Steven Ruppert
> Attachments: stack.txt
>
>
> Running what I believe to be a fairly vanilla spark job, using the RDD api, 
> with several shuffles, a cached RDD, and finally a conversion to DataFrame to 
> save to parquet. I get a repeatable deadlock at the very last reducers of one 
> of the stages.
> Roughly:
> {noformat}
> "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 
> tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry 
> [0x7fffb95f3000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207)
> - waiting to lock <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x0005b12f2290> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> and 
> {noformat}
> "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 
> tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at 
> org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202)
> - locked <0x000545736b58> (a 
> org.apache.spark.storage.BlockInfoManager)
> at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
> - locked <0x0005445cfc00> (a 
> org.apache.spark.broadcast.TorrentBroadcast$)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
> at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
> - locked <0x00059711eb10> (a 
> org.apache.spark.broadcast.TorrentBroadcast)
> at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
> at 
> org.apa

[jira] [Commented] (SPARK-18736) CreateMap allows non-unique keys

2017-02-02 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15851078#comment-15851078
 ] 

Eyal Farago commented on SPARK-18736:
-

Spark-8601 is making (slow) progress,it's actually in final stages of review 
now.
One thing to notice is keeping direct evaluation,code generation and optimized 
version in sync, spark-8601 follows the current behavior of "first wins" by 
transforming map(...).Get(k) into a caseKeyWhen(k,...).
If we decide to choose a "last wins" approach,the optimized version would 
probably have to reverse the order of the keys.

> CreateMap allows non-unique keys
> 
>
> Key: SPARK-18736
> URL: https://issues.apache.org/jira/browse/SPARK-18736
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Eyal Farago
>  Labels: map, sql, types
>
> Spark-Sql, {{CreateMap}} does not enforce unique keys, i.e. it's possible to 
> create a map with two identical keys: 
> {noformat}
> CreateMap(Literal(1), Literal(11), Literal(1), Literal(12))
> {noformat}
> This does not behave like standard maps in common programming languages.
> proper behavior should be chosen:
> # first 'wins'
> # last 'wins'
> # runtime error.
> {{GetMapValue}} currently implements option #1. Even if this is the desired 
> behavior {{CreateMap}} should return a unique map.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18736) CreateMap allows non-unique keys

2016-12-06 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15726232#comment-15726232
 ] 

Eyal Farago commented on SPARK-18736:
-

@shuai Lin,
I already have a pr in progress that addresses the possible literal keys
optimizations during optimization phase(spark-8601), can you please
implement the proper behavior in runtime and code generation and update the
Jira with the chosen approach? This way we can align the two prs.

Thanks,
Eyal.




> CreateMap allows non-unique keys
> 
>
> Key: SPARK-18736
> URL: https://issues.apache.org/jira/browse/SPARK-18736
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Eyal Farago
>  Labels: map, sql, types
>
> Spark-Sql, {{CreateMap}} does not enforce unique keys, i.e. it's possible to 
> create a map with two identical keys: 
> {noformat}
> CreateMap(Literal(1), Literal(11), Literal(1), Literal(12))
> {noformat}
> This does not behave like standard maps in common programming languages.
> proper behavior should be chosen:
> # first 'wins'
> # last 'wins'
> # runtime error.
> {{GetMapValue}} currently implements option #1. Even if this is the desired 
> behavior {{CreateMap}} should return a unique map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18736) [SQL] CreateMap allow non-unique keys

2016-12-05 Thread Eyal Farago (JIRA)
Eyal Farago created SPARK-18736:
---

 Summary: [SQL] CreateMap allow non-unique keys
 Key: SPARK-18736
 URL: https://issues.apache.org/jira/browse/SPARK-18736
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0
Reporter: Eyal Farago


Spark-Sql, CreateMap does not enforce unique keys, i.e. it's possible to create 
a map with two identical keys: 
CreateMap(Literal(1), Literal(11),
   Literal(1), Literal(12))

This does not behave like standard maps in common programming languages.
proper behavior should be chosen"
1. first 'wins'
2. last 'wins'
3. runtime error.

* currently GetMapValue implements option #1. even if this is the desired 
behavior CreateMap should return a unique map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14804) Graph vertexRDD/EdgeRDD checkpoint results ClassCastException:

2016-10-09 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15561452#comment-15561452
 ] 

Eyal Farago commented on SPARK-14804:
-

I think this relates to SPARK-12431, is it possible to 'merge' both efforts?

> Graph vertexRDD/EdgeRDD checkpoint results ClassCastException: 
> ---
>
> Key: SPARK-14804
> URL: https://issues.apache.org/jira/browse/SPARK-14804
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 1.6.1
>Reporter: SuYan
>Priority: Minor
>
> {code}
> graph3.vertices.checkpoint()
> graph3.vertices.count()
> graph3.vertices.map(_._2).count()
> {code}
> 16/04/21 21:04:43 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0 
> (TID 13, localhost): java.lang.ClassCastException: 
> org.apache.spark.graphx.impl.ShippableVertexPartition cannot be cast to 
> scala.Tuple2
>   at 
> com.xiaomi.infra.codelab.spark.Graph2$$anonfun$main$1.apply(Graph2.scala:80)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
>   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1161)
>   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1161)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1863)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1863)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:91)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:219)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> look at the code:
> {code}
>   private[spark] def computeOrReadCheckpoint(split: Partition, context: 
> TaskContext): Iterator[T] =
>   {
> if (isCheckpointedAndMaterialized) {
>   firstParent[T].iterator(split, context)
> } else {
>   compute(split, context)
> }
>   }
>  private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
>  override def isCheckpointed: Boolean = {
>firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed
>  }
> {code}
> for VertexRDD or EdgeRDD, first parent is its partitionRDD  
> RDD[ShippableVertexPartition[VD]]/RDD[(PartitionID, EdgePartition[ED, VD])]
> 1. we call vertexRDD.checkpoint, it partitionRDD will checkpoint, so 
> VertexRDD.isCheckpointedAndMaterialized=true.
> 2. then we call vertexRDD.iterator, because checkoint=true it called 
> firstParent.iterator(which is not CheckpointRDD, actually is partitionRDD). 
>  
> so returned iterator is iterator[ShippableVertexPartition] not expect 
> iterator[(VertexId, VD)]]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12431) add local checkpointing to GraphX

2016-10-09 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15561437#comment-15561437
 ] 

Eyal Farago commented on SPARK-12431:
-

I think this heavily relates to SPARK-14804, assuming this is an ongoing effort 
I think it actually depends on 14804 resolution.


> add local checkpointing to GraphX
> -
>
> Key: SPARK-12431
> URL: https://issues.apache.org/jira/browse/SPARK-12431
> Project: Spark
>  Issue Type: Improvement
>  Components: GraphX
>Affects Versions: 1.5.2
>Reporter: Edward Seidl
>
> local checkpointing was added to RDD to speed up iterative spark jobs, but 
> this capability hasn't been added to GraphX.  Adding localCheckpoint to 
> GraphImpl, EdgeRDDImpl, and VertexRDDImpl greatly improved the speed of a 
> k-core algorithm I'm using (at the cost of fault tolerance, of course).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16839) CleanupAliases may leave redundant aliases at end of analysis state

2016-08-01 Thread Eyal Farago (JIRA)
Eyal Farago created SPARK-16839:
---

 Summary: CleanupAliases may leave redundant aliases at end of 
analysis state
 Key: SPARK-16839
 URL: https://issues.apache.org/jira/browse/SPARK-16839
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0, 1.6.1
Reporter: Eyal Farago
Priority: Minor


[SPARK-9634] [SPARK-9323] [SQL]  introduced CleanupReferences which removes 
unnecessary Aliases while keeping required ones such as top level Projection 
and struct attributes. this mechanism is implemented by maintaining a boolean 
flag during a top-down expression transformation, I found a case where this 
mechanism leaves redundant aliases in the tree (within a right sibling of a 
create_struct node).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16791) casting structs fails on Timestamp fields (interpreted mode only)

2016-07-29 Thread Eyal Farago (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15398916#comment-15398916
 ] 

Eyal Farago commented on SPARK-16791:
-

created pull request https://github.com/apache/spark/pull/14400

> casting structs fails on Timestamp fields (interpreted mode only)
> -
>
> Key: SPARK-16791
> URL: https://issues.apache.org/jira/browse/SPARK-16791
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1, 2.0.0
>Reporter: Eyal Farago
>Priority: Minor
>  Labels: cast, expressions, sql
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> When casting a struct with a Timestamp field, a MatchError is thrown (pasted 
> below).
> the root cause for this is in 
> org.apache.spark.sql.catalyst.expressions.Cast#cast 
> (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L419).
> case dt if dt == child.dataType => identity[Any]
> should be modified to:
> case dt if dt == from => identity[Any]
> it seems to explode for timestamp because 
> org.apache.spark.sql.catalyst.expressions.Cast#castToTimestamp does'nt have 
> an identity check or fallback case in its pattern matching.
> I'll shortly open a pull request with a failing test case and a fix.
> Caused by: scala.MatchError: TimestampType (of class 
> org.apache.spark.sql.types.TimestampType$)
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast.castToTimestamp(Cast.scala:185)
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$cast(Cast.scala:424)
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$1.apply(Cast.scala:403)
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$1.apply(Cast.scala:402)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast.castStruct(Cast.scala:402)
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$cast(Cast.scala:435)
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:443)
>   at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:443)
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:445)
>   at 
> org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:324)
>   at 
> org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper$class.evaluate(ExpressionEvalHelper.scala:80)
>   at 
> org.apache.spark.sql.catalyst.expressions.CastSuite.evaluate(CastSuite.scala:33)
>   ... 58 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16791) casting structs fails on Timestamp fields (interpreted mode only)

2016-07-29 Thread Eyal Farago (JIRA)
Eyal Farago created SPARK-16791:
---

 Summary: casting structs fails on Timestamp fields (interpreted 
mode only)
 Key: SPARK-16791
 URL: https://issues.apache.org/jira/browse/SPARK-16791
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0, 1.6.1
Reporter: Eyal Farago
Priority: Minor


When casting a struct with a Timestamp field, a MatchError is thrown (pasted 
below).
the root cause for this is in 
org.apache.spark.sql.catalyst.expressions.Cast#cast 
(https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L419).

case dt if dt == child.dataType => identity[Any]

should be modified to:

case dt if dt == from => identity[Any]

it seems to explode for timestamp because 
org.apache.spark.sql.catalyst.expressions.Cast#castToTimestamp does'nt have an 
identity check or fallback case in its pattern matching.

I'll shortly open a pull request with a failing test case and a fix.

Caused by: scala.MatchError: TimestampType (of class 
org.apache.spark.sql.types.TimestampType$)
at 
org.apache.spark.sql.catalyst.expressions.Cast.castToTimestamp(Cast.scala:185)
at 
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$cast(Cast.scala:424)
at 
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$1.apply(Cast.scala:403)
at 
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$1.apply(Cast.scala:402)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at 
org.apache.spark.sql.catalyst.expressions.Cast.castStruct(Cast.scala:402)
at 
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$cast(Cast.scala:435)
at 
org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:443)
at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:443)
at 
org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:445)
at 
org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:324)
at 
org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper$class.evaluate(ExpressionEvalHelper.scala:80)
at 
org.apache.spark.sql.catalyst.expressions.CastSuite.evaluate(CastSuite.scala:33)
... 58 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org