[jira] [Commented] (SPARK-28304) FileFormatWriter introduces an uncoditional sort, even when all attributes are constants
[ https://issues.apache.org/jira/browse/SPARK-28304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16889436#comment-16889436 ] Eyal Farago commented on SPARK-28304: - [~joshrosen], thanks for your comment, I think this is a bit broader subject than just FileFormatWriter as any sort operator can either simplify its ordering or be completely eliminated when some/all of the sort columns are known to be constant. furthermore, in some cases one ordering can be used to satisfy several orderings (or on the other hand, ordering requirements of downstream operators can be relaxed) - so I believe this is best handled by the optimizer/planner, by essentially making EnsureRequirements aware of these kind of cases. as a short term fix, the SortExec operator can filter away constant ordering columns in the execute() method, and in case it's left with no ordering columns simply bypass the sort altogether. BTW, is there any reason the FileFormatWriter doesn't take the regular optimizing/planning path? > FileFormatWriter introduces an uncoditional sort, even when all attributes > are constants > > > Key: SPARK-28304 > URL: https://issues.apache.org/jira/browse/SPARK-28304 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.2 >Reporter: Eyal Farago >Priority: Major > Labels: performance > > FileFormatWriter derives a required sort order based on the partition > columns, bucketing columns and explicitly required ordering. However in some > use cases Some (or even all) of these fields are constant, in these cases the > sort can be skipped. > i.e. in my use-case, we add a GUUID column identifying a specific > (incremental) load, this can be thought of as a batch id. Since we run one > batch at a time, this column is always a constant which means there's no need > to sort based on this column, since we don't use bucketing or require an > explicit ordering the entire sort can be skipped for our case. > > I suggest: > # filter away constant columns from the required ordering calculated by > FileFormatWriter > # generalizing this to any Sort operator in a spark plan. > # introduce optimizer rules to remove constants from sort ordering, > potentially eliminating the sort operator altogether. > # modify EnsureRequirements to be aware of constant field when deciding > whether to introduce a sort or not. -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28304) FileFormatWriter introduces an uncoditional sort, even when all attributes are constants
[ https://issues.apache.org/jira/browse/SPARK-28304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880402#comment-16880402 ] Eyal Farago commented on SPARK-28304: - cc [~cloud_fan],[~hvanhovell] > FileFormatWriter introduces an uncoditional sort, even when all attributes > are constants > > > Key: SPARK-28304 > URL: https://issues.apache.org/jira/browse/SPARK-28304 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.2 >Reporter: Eyal Farago >Priority: Major > Labels: performance > > FileFormatWriter derives a required sort order based on the partition > columns, bucketing columns and explicitly required ordering. However in some > use cases Some (or even all) of these fields are constant, in these cases the > sort can be skipped. > i.e. in my use-case, we add a GUUID column identifying a specific > (incremental) load, this can be thought of as a batch id. Since we run one > batch at a time, this column is always a constant which means there's no need > to sort based on this column, since we don't use bucketing or require an > explicit ordering the entire sort can be skipped for our case. > > I suggest: > # filter away constant columns from the required ordering calculated by > FileFormatWriter > # generalizing this to any Sort operator in a spark plan. > # introduce optimizer rules to remove constants from sort ordering, > potentially eliminating the sort operator altogether. > # modify EnsureRequirements to be aware of constant field when deciding > whether to introduce a sort or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28304) FileFormatWriter introduces an uncoditional sort, even when all attributes are constants
[ https://issues.apache.org/jira/browse/SPARK-28304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eyal Farago updated SPARK-28304: Summary: FileFormatWriter introduces an uncoditional sort, even when all attributes are constants (was: FileFormatWriter introduces an uncoditional join, even when all attributes are constants) > FileFormatWriter introduces an uncoditional sort, even when all attributes > are constants > > > Key: SPARK-28304 > URL: https://issues.apache.org/jira/browse/SPARK-28304 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.2 >Reporter: Eyal Farago >Priority: Major > Labels: performance > > FileFormatWriter derives a required sort order based on the partition > columns, bucketing columns and explicitly required ordering. However in some > use cases Some (or even all) of these fields are constant, in these cases the > sort can be skipped. > i.e. in my use-case, we add a GUUID column identifying a specific > (incremental) load, this can be thought of as a batch id. Since we run one > batch at a time, this column is always a constant which means there's no need > to sort based on this column, since we don't use bucketing or require an > explicit ordering the entire sort can be skipped for our case. > > I suggest: > # filter away constant columns from the required ordering calculated by > FileFormatWriter > # generalizing this to any Sort operator in a spark plan. > # introduce optimizer rules to remove constants from sort ordering, > potentially eliminating the sort operator altogether. > # modify EnsureRequirements to be aware of constant field when deciding > whether to introduce a sort or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28304) FileFormatWriter introduces an uncoditional join, even when all attributes are constants
Eyal Farago created SPARK-28304: --- Summary: FileFormatWriter introduces an uncoditional join, even when all attributes are constants Key: SPARK-28304 URL: https://issues.apache.org/jira/browse/SPARK-28304 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.2 Reporter: Eyal Farago FileFormatWriter derives a required sort order based on the partition columns, bucketing columns and explicitly required ordering. However in some use cases Some (or even all) of these fields are constant, in these cases the sort can be skipped. i.e. in my use-case, we add a GUUID column identifying a specific (incremental) load, this can be thought of as a batch id. Since we run one batch at a time, this column is always a constant which means there's no need to sort based on this column, since we don't use bucketing or require an explicit ordering the entire sort can be skipped for our case. I suggest: # filter away constant columns from the required ordering calculated by FileFormatWriter # generalizing this to any Sort operator in a spark plan. # introduce optimizer rules to remove constants from sort ordering, potentially eliminating the sort operator altogether. # modify EnsureRequirements to be aware of constant field when deciding whether to introduce a sort or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840498#comment-16840498 ] Eyal Farago commented on SPARK-24437: - [~mgaido], looking at this again I suspect in this case loosing the broadcast does not necessarily means loosing the lineage as the broadcast was built by executing a plan and this plan is still available in case the broadcast isn't. I think the usage of broadcast variables inside spark sql is different than that of directly using them in the RDD API. While in the RDD API the broadcasted data is actually originated at the driver and spark has no way of reconstructing this data, in the spark-sql scenario the broadcasted data is built by spark and the knowledge of reconstructing this data is still available for spark so in the rare event both the broadcast and part of the cached partitions are lost it's possible to reconstruct the broadcasted value (by executing the plan that produced this data in the first place) and then recompute the missing partitions again. I think it's at least theoretically possible to identify cached plans based on broadcasted data and change the relevant operators to reference the broadcast variable via a soft/weak reference. unfortunately I believe it'd turn out to be quite difficult, especially in the face of no-deterministic operators. > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Major > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17556) Executor side broadcast for broadcast joins
[ https://issues.apache.org/jira/browse/SPARK-17556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788206#comment-16788206 ] Eyal Farago commented on SPARK-17556: - why was this abandoned? [~viirya]'s pull request seems promising. I think the last comment by [~LI,Xiao] applies for current implementation as well as executors hold the entire broadcast anyway (assuming they ran task that used it) - so memory footprint on the executors side doesn't change, re. performance regression in case of multiple smaller partitions this also applies for current implementation as the RDD partitions has to be calculated and transferred to the driver. one thing I personally think could be improved in [~viirya]'s PR was the requirement for the RDD to be pre-persisted, I think blocks could be evaluated in the mapPartition operation performed in the newly introduced RDD.broadcast method, this would have solved most comments by [~holdenk_amp] in the PR. > Executor side broadcast for broadcast joins > --- > > Key: SPARK-17556 > URL: https://issues.apache.org/jira/browse/SPARK-17556 > Project: Spark > Issue Type: New Feature > Components: Spark Core, SQL >Reporter: Reynold Xin >Priority: Major > Attachments: executor broadcast.pdf, executor-side-broadcast.pdf > > > Currently in Spark SQL, in order to perform a broadcast join, the driver must > collect the result of an RDD and then broadcast it. This introduces some > extra latency. It might be possible to broadcast directly from executors. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730301#comment-16730301 ] Eyal Farago commented on SPARK-22579: - Glanced over this on my cell, seems like spark-25905 only addresses the reading side, when the executor serving this block has all values in memory a similar issue happens on that executor as well. > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago >Priority: Major > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usually don't need all values at once as it > consumes the values via an iterator. > 4. potentially large memory footprint on serving executor, in case the block > is cached in deserialized form the serving executor has to serialize it into > a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU > intensive, memory footprint can be reduced by using a limited buffer for > serialization 'spilling' to the response stream. > I suggest improving this either by implementing full streaming mechanism or > some kind of pagination mechanism, in addition the requesting executor should > be able to make progress with the data it already has, blocking only when > local buffer is exhausted and remote side didn't deliver the next chunk of > the stream (or page in case of pagination) yet. -
[jira] [Commented] (SPARK-17403) Fatal Error: Scan cached strings
[ https://issues.apache.org/jira/browse/SPARK-17403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16682591#comment-16682591 ] Eyal Farago commented on SPARK-17403: - [~paul_lysak], [~hvanhovell], please notice the exception happens when building the column buffers, hence the corrupt string comes from upstream - in the repro code this is the join, not necessarily an in-mem-relation. [~hvanhovell], I'd suspect that the upstream operator somehow overwrites the row/buffer returned from the Iterator.next method while the downstream code (in-mem column building code) doesn't take the necessary precautions (copy). if this is the case I'd suspect the issue is somehow related to code generation either in the join or InMemoryTableScanExec operator. another option, [~paul_lysak] mentioned that the problem happens when the application is 'big enough', given that DataSet's default persistence level is _memory and disk_, is it possible that spark starts evacuating these blocks from memory and then bad things start happening? > Fatal Error: Scan cached strings > > > Key: SPARK-17403 > URL: https://issues.apache.org/jira/browse/SPARK-17403 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: Spark standalone cluster (3 Workers, 47 cores) > Ubuntu 14 > Java 8 >Reporter: Ruben Hernando >Priority: Major > > The process creates views from JDBC (SQL server) source and combines them to > create other views. > Finally it dumps results via JDBC > Error: > {quote} # JRE version: Java(TM) SE Runtime Environment (8.0_101-b13) (build > 1.8.0_101-b13) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.101-b13 mixed mode > linux-amd64 ) > # Problematic frame: > # J 4895 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J > (9 bytes) @ 0x7fbb355dfd6c [0x7fbb355dfd60+0xc] > #{quote} > SQL Query plan (fields truncated): > {noformat} > == Parsed Logical Plan == > 'Project [*] > +- 'UnresolvedRelation `COEQ_63` > == Analyzed Logical Plan == > InstanceId: bigint, price: double, ZoneId: int, priceItemId: int, priceId: int > Project [InstanceId#20236L, price#20237, ZoneId#20239, priceItemId#20242, > priceId#20244] > +- SubqueryAlias coeq_63 >+- Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS > price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244] > +- SubqueryAlias 6__input > +- > Relation[_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,TableP_DCID#148,TableSHID#149,SL_ACT_GI_DTE#150,SL_Xcl_C#151,SL_Xcl_C#152,SL_Css_Cojs#153L,SL_Config#154,SL_CREATEDON# > .. 36 more fields] JDBCRelation((select [SLTables].[_TableSL_SID], > [SLTables]. ... [...] FROM [sch].[SLTables] [SLTables] JOIN sch.TPSLTables > TPSLTables ON [TPSLTables].[_TableSL_SID] = [SLTables].[_TableSL_SID] where > _TP = 24) input) > def pers > == Optimized Logical Plan == > Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS > price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244] > +- InMemoryRelation [_TableSL_SID#143L, _TableP_DC_SID#144L, > _TableSH_SID#145L, ID#146, Name#147, ... 36 more fields], true, 1, > StorageLevel(disk, memory, deserialized, 1 replicas) >: +- *Scan JDBCRelation((select [SLTables].[_TableSL_SID], > [SLTables].[_TableP_DC_SID], [SLTables].[_TableSH_SID], [SLTables].[ID], > [SLTables].[Name], [SLTables].[TableP_DCID], [SLTables].[TableSHID], > [TPSLTables].[SL_ACT_GI_DTE], ... [...] FROM [sch].[SLTables] [SLTables] > JOIN sch.TPSLTables TPSLTables ON [TPSLTables].[_TableSL_SID] = > [SLTables].[_TableSL_SID] where _TP = 24) input) > [_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,TableP_DCID#148,TableSHID#149,SL_ACT_GI_DTE#150,SL_Xcl_C#151,... > 36 more fields] > == Physical Plan == > *Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS > price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244] > +- InMemoryTableScan [_TableSL_SID#143L, SL_RD_ColR_N#189] >: +- InMemoryRelation [_TableSL_SID#143L, _TableP_DC_SID#144L, > _TableSH_SID#145L, ID#146, Name#147, ... 36 more fields], true, 1, > StorageLevel(disk, memory, deserialized, 1 replicas) >: : +- *Scan JDBCRelation((select [SLTables].[_TableSL_SID], > [SLTables].[_TableP_DC_SID], [SLTables].[_TableSH_SID], [SLTables].[ID], > [SLTables].[Name], [SLTables].[TableP_DCID], ... [...] FROM [sch].[SLTables] > [SLTables] JOIN sch.TPSLTables TPSLTables ON [TPSLTables].[_TableSL_SID] = > [SLTables].[_TableSL_SID] where _TP = 24) input) > [_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,,... > 36 more fields] > {noformat} -- This message was
[jira] [Updated] (SPARK-17403) Fatal Error: Scan cached strings
[ https://issues.apache.org/jira/browse/SPARK-17403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eyal Farago updated SPARK-17403: Description: The process creates views from JDBC (SQL server) source and combines them to create other views. Finally it dumps results via JDBC Error: {quote} # JRE version: Java(TM) SE Runtime Environment (8.0_101-b13) (build 1.8.0_101-b13) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.101-b13 mixed mode linux-amd64 ) # Problematic frame: # J 4895 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J (9 bytes) @ 0x7fbb355dfd6c [0x7fbb355dfd60+0xc] #{quote} SQL Query plan (fields truncated): {noformat} == Parsed Logical Plan == 'Project [*] +- 'UnresolvedRelation `COEQ_63` == Analyzed Logical Plan == InstanceId: bigint, price: double, ZoneId: int, priceItemId: int, priceId: int Project [InstanceId#20236L, price#20237, ZoneId#20239, priceItemId#20242, priceId#20244] +- SubqueryAlias coeq_63 +- Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244] +- SubqueryAlias 6__input +- Relation[_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,TableP_DCID#148,TableSHID#149,SL_ACT_GI_DTE#150,SL_Xcl_C#151,SL_Xcl_C#152,SL_Css_Cojs#153L,SL_Config#154,SL_CREATEDON# .. 36 more fields] JDBCRelation((select [SLTables].[_TableSL_SID], [SLTables]. ... [...] FROM [sch].[SLTables] [SLTables] JOIN sch.TPSLTables TPSLTables ON [TPSLTables].[_TableSL_SID] = [SLTables].[_TableSL_SID] where _TP = 24) input) def pers == Optimized Logical Plan == Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244] +- InMemoryRelation [_TableSL_SID#143L, _TableP_DC_SID#144L, _TableSH_SID#145L, ID#146, Name#147, ... 36 more fields], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) : +- *Scan JDBCRelation((select [SLTables].[_TableSL_SID], [SLTables].[_TableP_DC_SID], [SLTables].[_TableSH_SID], [SLTables].[ID], [SLTables].[Name], [SLTables].[TableP_DCID], [SLTables].[TableSHID], [TPSLTables].[SL_ACT_GI_DTE], ... [...] FROM [sch].[SLTables] [SLTables] JOIN sch.TPSLTables TPSLTables ON [TPSLTables].[_TableSL_SID] = [SLTables].[_TableSL_SID] where _TP = 24) input) [_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,TableP_DCID#148,TableSHID#149,SL_ACT_GI_DTE#150,SL_Xcl_C#151,... 36 more fields] == Physical Plan == *Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244] +- InMemoryTableScan [_TableSL_SID#143L, SL_RD_ColR_N#189] : +- InMemoryRelation [_TableSL_SID#143L, _TableP_DC_SID#144L, _TableSH_SID#145L, ID#146, Name#147, ... 36 more fields], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) : : +- *Scan JDBCRelation((select [SLTables].[_TableSL_SID], [SLTables].[_TableP_DC_SID], [SLTables].[_TableSH_SID], [SLTables].[ID], [SLTables].[Name], [SLTables].[TableP_DCID], ... [...] FROM [sch].[SLTables] [SLTables] JOIN sch.TPSLTables TPSLTables ON [TPSLTables].[_TableSL_SID] = [SLTables].[_TableSL_SID] where _TP = 24) input) [_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,,... 36 more fields] {noformat} was: The process creates views from JDBC (SQL server) source and combines them to create other views. Finally it dumps results via JDBC Error: {quote} # JRE version: Java(TM) SE Runtime Environment (8.0_101-b13) (build 1.8.0_101-b13) # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.101-b13 mixed mode linux-amd64 ) # Problematic frame: # J 4895 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J (9 bytes) @ 0x7fbb355dfd6c [0x7fbb355dfd60+0xc] # {quote} SQL Query plan (fields truncated): {noformat} == Parsed Logical Plan == 'Project [*] +- 'UnresolvedRelation `COEQ_63` == Analyzed Logical Plan == InstanceId: bigint, price: double, ZoneId: int, priceItemId: int, priceId: int Project [InstanceId#20236L, price#20237, ZoneId#20239, priceItemId#20242, priceId#20244] +- SubqueryAlias coeq_63 +- Project [_TableSL_SID#143L AS InstanceId#20236L, SL_RD_ColR_N#189 AS price#20237, 24 AS ZoneId#20239, 6 AS priceItemId#20242, 63 AS priceId#20244] +- SubqueryAlias 6__input +- Relation[_TableSL_SID#143L,_TableP_DC_SID#144L,_TableSH_SID#145L,ID#146,Name#147,TableP_DCID#148,TableSHID#149,SL_ACT_GI_DTE#150,SL_Xcl_C#151,SL_Xcl_C#152,SL_Css_Cojs#153L,SL_Config#154,SL_CREATEDON# .. 36 more fields] JDBCRelation((select [SLTables].[_TableSL_SID], [SLTables]. ... [...] FROM [sch].[SLTables] [SLTables] JOIN sch.TPSLTables TPSLTables ON [TPSLTables].[_TableSL_SID] = [SLTables].[_TableSL_SID] where _TP = 24) input) == Optimized Logical Plan == Pro
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679840#comment-16679840 ] Eyal Farago commented on SPARK-24437: - [~dvogelbacher], I think what you actually want is somewhat described in SPARK-17556 which seems to be stuck at the moment. > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679560#comment-16679560 ] Eyal Farago commented on SPARK-24437: - [~dvogelbacher], what about the _checkpoint_ approach? another possibility: if the queries are actually rather small can you force them into memory and then convert them into _DataSet_s and cache these? this way you're getting completely rid of the broadcasts and lineage, effectively storing what you need. this still has a minor drawback as your DataSets are now built on top of a parallelized collection RDD which still has a memory footprint in the driver's heap. re. your question about why is the broadcast being kept as part of the lineage, it'd require a long trip down the rabbit hole to understand the way the plan is being transformed and represented once being cached... as you wrote yourself this is a rather unusual use-case so it might require unusual handling on your side... > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675719#comment-16675719 ] Eyal Farago commented on SPARK-24437: - [~dvogelbacher] this is a bit puzzling, spark will usually choose to broadcast a relation if it's small enough, is this not the case here? I'd usually expect the join to be larger than its components. can you specify the sizes here? how many queries are cached at any given moment? few workarounds I can think of: tweak the broadcast threshold configuration key, I don't remember the exact details but I think you can disable it altogether. another option might be the _checkpoint_ operation which trades disk space for truncated lineage, according to the scaladocs it's 'evolving' and available since spark 2.1, might be worth a try. > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675347#comment-16675347 ] Eyal Farago commented on SPARK-24437: - [~mgaido], I think we agree on most of the details :) I do think that once the plan is already executed, keeping the broadcast around is kind of a waste as this data is effectively cached as part of the cached relation. on the other side, 'loosing' the broadcast means losing the ability to recover from a node(s) failure (spark relied on the RDD lineage for that), so I guess it's the least worst way of caching this relation. BTW, [~dvogelbacher], what's consuming most of the memory on your scenario? are these the broadcast variables or the cached relations? I'd expect the later to be much heavier in most use cases. > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25548) In the PruneFileSourcePartitions optimizer, replace the nonPartitionOps field with true in the And(partitionOps, nonPartitionOps) to make the partition can be pruned
[ https://issues.apache.org/jira/browse/SPARK-25548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673957#comment-16673957 ] Eyal Farago commented on SPARK-25548: - [~eaton], I think there are two possible approaches to handle this: first would be extracting the partitions predicate and _And_ing it with the original predicate: {code:java} select * from src_par where (P_d in (2,3)) and ((p_d=2 and key=2) or (p_d=3 and key=3)) {code} second approach would be transforming this into a union: {code:java} select * from src_par where (p_d=2 and key=2) UNION ALL select * from src_par where (p_d=3 and key=3) {code} I think second approach is easier to implement but it'd require additional rules to make sure partitioned are not scanned multiple times, ie. consider what'd happen if your predicate looked like this: {code:java} (p_d=2 and key=2) or (p_d=3 and key=3) or (p_d=2 and key=33) {code} a naive approach would scan partition #2 twice while it's pretty obvious this can be avoided by _OR_ing the first and third conditions. The first approach seems a bit more complicated by I think it somewhat resembles what you've started implementing in your pr, [~cloud_fan] your thoughts? > In the PruneFileSourcePartitions optimizer, replace the nonPartitionOps field > with true in the And(partitionOps, nonPartitionOps) to make the partition can > be pruned > - > > Key: SPARK-25548 > URL: https://issues.apache.org/jira/browse/SPARK-25548 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.2 >Reporter: eaton >Assignee: Apache Spark >Priority: Critical > > In the PruneFileSourcePartitions optimizer, the partition files will not be > pruned if we use partition filter and non partition filter together, for > example: > sql("CREATE TABLE IF NOT EXISTS src_par (key INT, value STRING) partitioned > by(p_d int) stored as parquet ") > sql("insert overwrite table src_par partition(p_d=2) select 2 as key, '4' as > value") > sql("insert overwrite table src_par partition(p_d=3) select 3 as key, '4' as > value") > sql("insert overwrite table src_par partition(p_d=4) select 4 as key, '4' as > value") > The sql below will scan all the partition files, in which, the partition > **p_d=4** should be pruned. > **sql("select * from src_par where (p_d=2 and key=2) or (p_d=3 and > key=3)").show** -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673949#comment-16673949 ] Eyal Farago commented on SPARK-24437: - [~dvogelbacher], haven't looked to deep into this, but here's two immediate conclusions from the screen shot you've attached: # broadcast is referenced from MapPartitionsRDD's f member, this seems reasonable for a broadcast join. # the entire thing is cached (CachedRDDBuilder), is it possible you're caching this DataSet? unlike RDDs DataSet's persistence is manually managed - hence they're not automatically garbage collected once the last reference is dropped. having that said, I'd still expect spark to cache only the in-memory representation and not the entire RDD lineage, so this does look like some sort of a bug, something like an over capturing function/closure in the caching code. > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png, Screen Shot 2018-11-01 at 10.38.30 AM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables
[ https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614683#comment-16614683 ] Eyal Farago commented on SPARK-24410: - [~viirya], I see that the PR is now closed (postponed) due to concerns of performance regression when partitioning is not required by parent operators. I think this can be solved by introducing a rule that identifies an Exchange over Union that already supports the required partitioning, in this case the union operator can be replaced by a partitioning aware union operator that merges the partitions (basically implements your approach as presented in the PR). I think this approach limits the regression mentioned in the pull request and improves performance when upstream requires a matching partitioning, do you think this approach also has to be governed by a cost based approach? > Missing optimization for Union on bucketed tables > - > > Key: SPARK-24410 > URL: https://issues.apache.org/jira/browse/SPARK-24410 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ohad Raviv >Priority: Major > > A common use-case we have is of a partially aggregated table and daily > increments that we need to further aggregate. we do this my unioning the two > tables and aggregating again. > we tried to optimize this process by bucketing the tables, but currently it > seems that the union operator doesn't leverage the tables being bucketed > (like the join operator). > for example, for two bucketed tables a1,a2: > {code} > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write > .mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("t1") > .saveAsTable("a1") > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write.mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("t1") > .saveAsTable("a2") > {code} > for the join query we get the "SortMergeJoin" > {code} > select * from a1 join a2 on (a1.key=a2.key) > == Physical Plan == > *(3) SortMergeJoin [key#24L], [key#27L], Inner > :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0 > : +- *(1) Project [key#24L, t1#25L, t2#26L] > : +- *(1) Filter isnotnull(key#24L) > :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct > +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0 >+- *(2) Project [key#27L, t1#28L, t2#29L] > +- *(2) Filter isnotnull(key#27L) > +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct > {code} > but for aggregation after union we get a shuffle: > {code} > select key,count(*) from (select * from a1 union all select * from a2)z group > by key > == Physical Plan == > *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, > count(1)#36L]) > +- Exchange hashpartitioning(key#25L, 1) >+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], > output=[key#25L, count#38L]) > +- Union > :- *(1) Project [key#25L] > : +- *(1) FileScan parquet default.a1[key#25L] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], > PushedFilters: [], ReadSchema: struct > +- *(2) Project [key#28L] > +- *(2) FileScan parquet default.a2[key#28L] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], > PushedFilters: [], ReadSchema: struct > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables
[ https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614678#comment-16614678 ] Eyal Farago commented on SPARK-24410: - [~viirya], I've opened SPARK-25203 because of your answer on August 13th. however looking at your PR it seems you opted to solve the general case, and bucketed table is just a private case of the general case. do you think SPARK-25203 should be closed as a duplicate of this? if so can you please add the example used there to this issue as well? > Missing optimization for Union on bucketed tables > - > > Key: SPARK-24410 > URL: https://issues.apache.org/jira/browse/SPARK-24410 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ohad Raviv >Priority: Major > > A common use-case we have is of a partially aggregated table and daily > increments that we need to further aggregate. we do this my unioning the two > tables and aggregating again. > we tried to optimize this process by bucketing the tables, but currently it > seems that the union operator doesn't leverage the tables being bucketed > (like the join operator). > for example, for two bucketed tables a1,a2: > {code} > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write > .mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("t1") > .saveAsTable("a1") > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write.mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("t1") > .saveAsTable("a2") > {code} > for the join query we get the "SortMergeJoin" > {code} > select * from a1 join a2 on (a1.key=a2.key) > == Physical Plan == > *(3) SortMergeJoin [key#24L], [key#27L], Inner > :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0 > : +- *(1) Project [key#24L, t1#25L, t2#26L] > : +- *(1) Filter isnotnull(key#24L) > :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct > +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0 >+- *(2) Project [key#27L, t1#28L, t2#29L] > +- *(2) Filter isnotnull(key#27L) > +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct > {code} > but for aggregation after union we get a shuffle: > {code} > select key,count(*) from (select * from a1 union all select * from a2)z group > by key > == Physical Plan == > *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, > count(1)#36L]) > +- Exchange hashpartitioning(key#25L, 1) >+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], > output=[key#25L, count#38L]) > +- Union > :- *(1) Project [key#25L] > : +- *(1) FileScan parquet default.a1[key#25L] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], > PushedFilters: [], ReadSchema: struct > +- *(2) Project [key#28L] > +- *(2) FileScan parquet default.a2[key#28L] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], > PushedFilters: [], ReadSchema: struct > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25203) spark sql, union all does not propagate child partitioning (when possible)
[ https://issues.apache.org/jira/browse/SPARK-25203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589356#comment-16589356 ] Eyal Farago commented on SPARK-25203: - seems I was wrong regarding the resulting distribution, modified the query a bit: {code:java} select *, spark_partition_id() as P from (select * from t1DU distribute by c1) -- !query 15 schema struct -- !query 15 output 1 a 43 2 a 174 2 b 174 3 b 51 {code} it's now obvious that records are properly clustered. (I guess the original query computed the partition id BEFORE the repartitioning). > spark sql, union all does not propagate child partitioning (when possible) > -- > > Key: SPARK-25203 > URL: https://issues.apache.org/jira/browse/SPARK-25203 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.2.0, 2.3.0, 2.4.0 >Reporter: Eyal Farago >Priority: Major > > in spark-sql, union all does not propagate partitioning when all child plans > have the same partitioning, this causes introduction of non necessary > Exchange nodes when parent operator requires a distribution satisfied by this > partitioning. > > {code:java} > CREATE OR REPLACE TEMPORARY VIEW t1 AS VALUES (1, 'a'), (2, 'b') tbl(c1, c2); > CREATE OR REPLACE TEMPORARY VIEW t1D1 AS select c1, c2 from t1 distribute by > c1; > CREATE OR REPLACE TEMPORARY VIEW t1D2 AS select c1 + 1 as c11, c2 from t1 > distribute by c11; > create or REPLACE TEMPORARY VIEW t1DU as > select * from t1D1 > UNION ALL > select * from t1D2; > EXPLAIN select * from t1DU distribute by c1; > == Physical Plan == > Exchange hashpartitioning(c1#x, 200) > +- Union >:- Exchange hashpartitioning(c1#x, 200) >: +- LocalTableScan [c1#x, c2#x] >+- Exchange hashpartitioning(c11#x, 200) > +- LocalTableScan [c11#x, c2#x] > {code} > the Exchange introduced in the last query is unnecessary since the unioned > data is already partitioned by column _c1_, in fact the equivalent RDD > operation identifies this scenario and introduces a PartitionerAwareUnionRDD > which maintains children's shared partitioner. > I suggest modifying modifying org.apache.spark.sql.execution.UnionExec by > overriding _outputPartitioning_ in a way that identifies common partitioning > among child plans and use that (falling back to default implementation > otherwise). > furthermore, it seems current implementation does not properly clusters data: > {code:java} > select *, spark_partition_id() as P from t1DU distribute by c1 > -- !query 15 schema > struct > -- !query 15 output > 1 a 43 > 2 a 374 > 2 b 174 > 3 b 251 > {code} > notice _c1=2_ in partitions 174 and 374. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25203) spark sql, union all does not propagate child partitioning (when possible)
[ https://issues.apache.org/jira/browse/SPARK-25203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589332#comment-16589332 ] Eyal Farago commented on SPARK-25203: - CC: [~hvanhovell], [~cloud_fan] > spark sql, union all does not propagate child partitioning (when possible) > -- > > Key: SPARK-25203 > URL: https://issues.apache.org/jira/browse/SPARK-25203 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.2.0, 2.3.0, 2.4.0 >Reporter: Eyal Farago >Priority: Major > > in spark-sql, union all does not propagate partitioning when all child plans > have the same partitioning, this causes introduction of non necessary > Exchange nodes when parent operator requires a distribution satisfied by this > partitioning. > > {code:java} > CREATE OR REPLACE TEMPORARY VIEW t1 AS VALUES (1, 'a'), (2, 'b') tbl(c1, c2); > CREATE OR REPLACE TEMPORARY VIEW t1D1 AS select c1, c2 from t1 distribute by > c1; > CREATE OR REPLACE TEMPORARY VIEW t1D2 AS select c1 + 1 as c11, c2 from t1 > distribute by c11; > create or REPLACE TEMPORARY VIEW t1DU as > select * from t1D1 > UNION ALL > select * from t1D2; > EXPLAIN select * from t1DU distribute by c1; > == Physical Plan == > Exchange hashpartitioning(c1#x, 200) > +- Union >:- Exchange hashpartitioning(c1#x, 200) >: +- LocalTableScan [c1#x, c2#x] >+- Exchange hashpartitioning(c11#x, 200) > +- LocalTableScan [c11#x, c2#x] > {code} > the Exchange introduced in the last query is unnecessary since the unioned > data is already partitioned by column _c1_, in fact the equivalent RDD > operation identifies this scenario and introduces a PartitionerAwareUnionRDD > which maintains children's shared partitioner. > I suggest modifying modifying org.apache.spark.sql.execution.UnionExec by > overriding _outputPartitioning_ in a way that identifies common partitioning > among child plans and use that (falling back to default implementation > otherwise). > furthermore, it seems current implementation does not properly clusters data: > {code:java} > select *, spark_partition_id() as P from t1DU distribute by c1 > -- !query 15 schema > struct > -- !query 15 output > 1 a 43 > 2 a 374 > 2 b 174 > 3 b 251 > {code} > notice _c1=2_ in partitions 174 and 374. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25203) spark sql, union all does not propagate child partitioning (when possible)
Eyal Farago created SPARK-25203: --- Summary: spark sql, union all does not propagate child partitioning (when possible) Key: SPARK-25203 URL: https://issues.apache.org/jira/browse/SPARK-25203 Project: Spark Issue Type: Bug Components: Optimizer, SQL Affects Versions: 2.3.0, 2.2.0, 2.4.0 Reporter: Eyal Farago in spark-sql, union all does not propagate partitioning when all child plans have the same partitioning, this causes introduction of non necessary Exchange nodes when parent operator requires a distribution satisfied by this partitioning. {code:java} CREATE OR REPLACE TEMPORARY VIEW t1 AS VALUES (1, 'a'), (2, 'b') tbl(c1, c2); CREATE OR REPLACE TEMPORARY VIEW t1D1 AS select c1, c2 from t1 distribute by c1; CREATE OR REPLACE TEMPORARY VIEW t1D2 AS select c1 + 1 as c11, c2 from t1 distribute by c11; create or REPLACE TEMPORARY VIEW t1DU as select * from t1D1 UNION ALL select * from t1D2; EXPLAIN select * from t1DU distribute by c1; == Physical Plan == Exchange hashpartitioning(c1#x, 200) +- Union :- Exchange hashpartitioning(c1#x, 200) : +- LocalTableScan [c1#x, c2#x] +- Exchange hashpartitioning(c11#x, 200) +- LocalTableScan [c11#x, c2#x] {code} the Exchange introduced in the last query is unnecessary since the unioned data is already partitioned by column _c1_, in fact the equivalent RDD operation identifies this scenario and introduces a PartitionerAwareUnionRDD which maintains children's shared partitioner. I suggest modifying modifying org.apache.spark.sql.execution.UnionExec by overriding _outputPartitioning_ in a way that identifies common partitioning among child plans and use that (falling back to default implementation otherwise). furthermore, it seems current implementation does not properly clusters data: {code:java} select *, spark_partition_id() as P from t1DU distribute by c1 -- !query 15 schema struct -- !query 15 output 1 a 43 2 a 374 2 b 174 3 b 251 {code} notice _c1=2_ in partitions 174 and 374. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables
[ https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578789#comment-16578789 ] Eyal Farago commented on SPARK-24410: - [~viirya], my bad :) seems there are two distinct issues here: one is general behavior of join/aggregate over unions, the other is the guarantees of bucketed partitioning. looking more carefully at the results of your query it seems that the two DFs are not co-partitioned (which is a bit surprising), so my apologies. having that said, there's a more general issue with pushing down shuffle related operations over a union, do you guys think this deserves a separate issue? > Missing optimization for Union on bucketed tables > - > > Key: SPARK-24410 > URL: https://issues.apache.org/jira/browse/SPARK-24410 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ohad Raviv >Priority: Major > > A common use-case we have is of a partially aggregated table and daily > increments that we need to further aggregate. we do this my unioning the two > tables and aggregating again. > we tried to optimize this process by bucketing the tables, but currently it > seems that the union operator doesn't leverage the tables being bucketed > (like the join operator). > for example, for two bucketed tables a1,a2: > {code} > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write > .mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("t1") > .saveAsTable("a1") > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write.mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("t1") > .saveAsTable("a2") > {code} > for the join query we get the "SortMergeJoin" > {code} > select * from a1 join a2 on (a1.key=a2.key) > == Physical Plan == > *(3) SortMergeJoin [key#24L], [key#27L], Inner > :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0 > : +- *(1) Project [key#24L, t1#25L, t2#26L] > : +- *(1) Filter isnotnull(key#24L) > :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct > +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0 >+- *(2) Project [key#27L, t1#28L, t2#29L] > +- *(2) Filter isnotnull(key#27L) > +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct > {code} > but for aggregation after union we get a shuffle: > {code} > select key,count(*) from (select * from a1 union all select * from a2)z group > by key > == Physical Plan == > *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, > count(1)#36L]) > +- Exchange hashpartitioning(key#25L, 1) >+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], > output=[key#25L, count#38L]) > +- Union > :- *(1) Project [key#25L] > : +- *(1) FileScan parquet default.a1[key#25L] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], > PushedFilters: [], ReadSchema: struct > +- *(2) Project [key#28L] > +- *(2) FileScan parquet default.a2[key#28L] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], > PushedFilters: [], ReadSchema: struct > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25103) CompletionIterator may delay GC of completed resources
[ https://issues.apache.org/jira/browse/SPARK-25103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578519#comment-16578519 ] Eyal Farago commented on SPARK-25103: - CC: [~cloud_fan], [~hvanhovell] > CompletionIterator may delay GC of completed resources > -- > > Key: SPARK-25103 > URL: https://issues.apache.org/jira/browse/SPARK-25103 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.1, 2.1.0, 2.2.0, 2.3.0 >Reporter: Eyal Farago >Priority: Major > > while working on SPARK-22713 , I fund (and partially fixed) a scenario in > which an iterator is already exhausted but still holds a reference to some > resources that can be GCed at this point. > However, these resources can not be GCed because of this reference. > the specific fix applied in SPARK-22713 was to wrap the iterator with a > CompletionIterator that cleans it when exhausted, thing is that it's quite > easy to get this wrong by closing over local variables or _this_ reference in > the cleanup function itself. > I propose solving this by modifying CompletionIterator to discard references > to the wrapped iterator and cleanup function once exhausted. > > * a dive into the code showed that most CompletionIterators are eventually > used by > {code:java} > org.apache.spark.scheduler.ShuffleMapTask#runTask{code} > which does: > {code:java} > writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: > Product2[Any, Any]]]){code} > looking at > {code:java} > org.apache.spark.shuffle.ShuffleWriter#write{code} > implementations, it seems all of them first exhaust the iterator and then > perform some kind of post-processing: i.e. merging spills, sorting, writing > partitions files and then concatenating them into a single file... bottom > line the Iterator may actually be 'sitting' for some time after being > exhausted. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25103) CompletionIterator may delay GC of completed resources
Eyal Farago created SPARK-25103: --- Summary: CompletionIterator may delay GC of completed resources Key: SPARK-25103 URL: https://issues.apache.org/jira/browse/SPARK-25103 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0, 2.2.0, 2.1.0, 2.0.1 Reporter: Eyal Farago while working on SPARK-22713 , I fund (and partially fixed) a scenario in which an iterator is already exhausted but still holds a reference to some resources that can be GCed at this point. However, these resources can not be GCed because of this reference. the specific fix applied in SPARK-22713 was to wrap the iterator with a CompletionIterator that cleans it when exhausted, thing is that it's quite easy to get this wrong by closing over local variables or _this_ reference in the cleanup function itself. I propose solving this by modifying CompletionIterator to discard references to the wrapped iterator and cleanup function once exhausted. * a dive into the code showed that most CompletionIterators are eventually used by {code:java} org.apache.spark.scheduler.ShuffleMapTask#runTask{code} which does: {code:java} writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]){code} looking at {code:java} org.apache.spark.shuffle.ShuffleWriter#write{code} implementations, it seems all of them first exhaust the iterator and then perform some kind of post-processing: i.e. merging spills, sorting, writing partitions files and then concatenating them into a single file... bottom line the Iterator may actually be 'sitting' for some time after being exhausted. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables
[ https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578489#comment-16578489 ] Eyal Farago commented on SPARK-24410: - [~viirya], I think your conclusion about co-partitioning is wrong, the following code segment from your comment: {code:java} val df1 = spark.table("a1").select(spark_partition_id(), $"key") val df2 = spark.table("a2").select(spark_partition_id(), $"key") df1.union(df2).select(spark_partition_id(), $"key").collect {code} this prints the partition ids as assigned by union, assuming union simply concatenates the partitions from df1 and df2 assigning them a running number id, it really makes sense you'd get two partitions per key: one coming from df1 and the other from df2. applying this select on each dataframe separately you'd get the exact same results meaning a given key will have the same partition id in both dataframes. I think this code fragment basically shows what's wrong with current implementation of Union, no that we can't optimize unions of co-partitioned relations. if union was a bit more 'partitioning aware' it'd be able to identify that both children have the same partitioning scheme and 'inherit' it. as you actually showed this might be a bit tricky as the same logical attribute from different children has a different expression id, but Union eventually maps these child attributes into a single output attribute, so this information can be used to resolve the partitioning columns and determine their equality. furthermore, Union being smarter on its output partitioning won't cut it, few rules have to be added/modified: 1. applying exchange on a union should sometimes be pushed to the children (children can be partitioned to those supporting the required partitioning and others not supporting it, the exchange can be applied to a union of the non-supporting children and then unioned with the rest of the children) 2. partial aggregate also has to be pushed to the children resulting with a union of partial aggregations, again it's possible to partition children according to their support of the required partitioning. 3. final aggregation over a union introduces an exchange which will then be pushed to the children, the aggregation is then applied on top of the partitioning aware union (think of the way PartitionerAwareUnionRDD handles partitioning). * partition children = partitioning an array by a predicate (scala.collection.TraversableLike#partition) * other operators like join may require additional rules. * some of this ideas were discussed offline with [~hvanhovell] > Missing optimization for Union on bucketed tables > - > > Key: SPARK-24410 > URL: https://issues.apache.org/jira/browse/SPARK-24410 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ohad Raviv >Priority: Major > > A common use-case we have is of a partially aggregated table and daily > increments that we need to further aggregate. we do this my unioning the two > tables and aggregating again. > we tried to optimize this process by bucketing the tables, but currently it > seems that the union operator doesn't leverage the tables being bucketed > (like the join operator). > for example, for two bucketed tables a1,a2: > {code} > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write > .mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("t1") > .saveAsTable("a1") > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write.mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("t1") > .saveAsTable("a2") > {code} > for the join query we get the "SortMergeJoin" > {code} > select * from a1 join a2 on (a1.key=a2.key) > == Physical Plan == > *(3) SortMergeJoin [key#24L], [key#27L], Inner > :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0 > : +- *(1) Project [key#24L, t1#25L, t2#26L] > : +- *(1) Filter isnotnull(key#24L) > :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct > +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0 >+- *(2) Project [key#27L, t1#28L, t2#29L] > +- *(2) Filter isnotnull(key#27L) > +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], > PushedFilters:
[jira] [Commented] (SPARK-22713) OOM caused by the memory contention and memory leak in TaskMemoryManager
[ https://issues.apache.org/jira/browse/SPARK-22713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578322#comment-16578322 ] Eyal Farago commented on SPARK-22713: - [~jerrylead], can you please test this? > OOM caused by the memory contention and memory leak in TaskMemoryManager > > > Key: SPARK-22713 > URL: https://issues.apache.org/jira/browse/SPARK-22713 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.1.1, 2.1.2 >Reporter: Lijie Xu >Assignee: Eyal Farago >Priority: Critical > Fix For: 2.4.0 > > > The pdf version of this issue with high-quality figures is available at > https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/report/OOM-TaskMemoryManager.pdf. > *[Abstract]* > I recently encountered an OOM error in a PageRank application > (_org.apache.spark.examples.SparkPageRank_). After profiling the application, > I found the OOM error is related to the memory contention in shuffle spill > phase. Here, the memory contention means that a task tries to release some > old memory consumers from memory for keeping the new memory consumers. After > analyzing the OOM heap dump, I found the root cause is a memory leak in > _TaskMemoryManager_. Since memory contention is common in shuffle phase, this > is a critical bug/defect. In the following sections, I will use the > application dataflow, execution log, heap dump, and source code to identify > the root cause. > *[Application]* > This is a PageRank application from Spark’s example library. The following > figure shows the application dataflow. The source code is available at \[1\]. > !https://raw.githubusercontent.com/JerryLead/Misc/master/OOM-TasksMemoryManager/figures/PageRankDataflow.png|width=100%! > *[Failure symptoms]* > This application has a map stage and many iterative reduce stages. An OOM > error occurs in a reduce task (Task-28) as follows. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/Stage.png?raw=true|width=100%! > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/task.png?raw=true|width=100%! > > *[OOM root cause identification]* > Each executor has 1 CPU core and 6.5GB memory, so it only runs one task at a > time. After analyzing the application dataflow, error log, heap dump, and > source code, I found the following steps lead to the OOM error. > => The MemoryManager found that there is not enough memory to cache the > _links:ShuffledRDD_ (rdd-5-28, red circles in the dataflow figure). > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/ShuffledRDD.png?raw=true|width=100%! > => The task needs to shuffle twice (1st shuffle and 2nd shuffle in the > dataflow figure). > => The task needs to generate two _ExternalAppendOnlyMap_ (E1 for 1st shuffle > and E2 for 2nd shuffle) in sequence. > => The 1st shuffle begins and ends. E1 aggregates all the shuffled data of > 1st shuffle and achieves 3.3 GB. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/FirstShuffle.png?raw=true|width=100%! > => The 2nd shuffle begins. E2 is aggregating the shuffled data of 2nd > shuffle, and finding that there is not enough memory left. This triggers the > memory contention. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/SecondShuffle.png?raw=true|width=100%! > => To handle the memory contention, the _TaskMemoryManager_ releases E1 > (spills it onto disk) and assumes that the 3.3GB space is free now. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/MemoryContention.png?raw=true|width=100%! > => E2 continues to aggregates the shuffled records of 2nd shuffle. However, > E2 encounters an OOM error while shuffling. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/OOMbefore.png?raw=true|width=100%! > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/OOMError.png?raw=true|width=100%! > *[Guess]* > The task memory usage below reveals that there is not memory drop down. So, > the cause may be that the 3.3GB _ExternalAppendOnlyMap_ (E1) is not actually > released by the _TaskMemoryManger_. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/GCFigure.png?raw=true|width=100%! > *[Root cause]* > After analyzing the heap dump, I found the guess is right (the 3.3GB > _ExternalAppendOnlyMap_ is actually not released). The 1.6GB object is > _ExternalAppendOnlyMap (E2)_. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/heapdump.png?raw=true|width=100%! > *[Question]* > Why the released _ExternalAppendOnlyMap_ is still in memory? >
[jira] [Commented] (SPARK-22286) OutOfMemoryError caused by memory leak and large serializer batch size in ExternalAppendOnlyMap
[ https://issues.apache.org/jira/browse/SPARK-22286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481961#comment-16481961 ] Eyal Farago commented on SPARK-22286: - [~jerrylead], don't you think actual root cause here is the use of Java serialization? any way that's what I grasp from looking at the last image, the one showing ObjectInputStream$HandleTable summing up to 400MB. in potential solution 1st action item, do you mean removing items from the map as the iterator makes progress? I'm not sure that'll make much of a difference here, in any case this involves some book keeping as the memory has to be reported back to the TaskMemoryManager (-which seems not to be informed on any allocation by this class- which in informed of memory allocation via calls to maybeSpill() from insertAll()). one thing that troubles me about this class is the fact it keeps a potentially large buffer per spilled map, I'm referring to: {code:java} private val spilledMaps = new ArrayBuffer[DiskMapIterator] {code} where each DiskMapIterator has a buffer large enough to store a batch of records (1 by default), this combined with the fact this buffer size is effectively unbounded may result with a substantially sized buffer (per spill). re. action item #3, I think Kryo does the trick, btw why use batches at all? this is basically a sequential read deserializing one record at a time, so it makes sense to buffer disk reads, but why batch? (unless this was designed with Java serialization pitfalls in mind in the first place). > OutOfMemoryError caused by memory leak and large serializer batch size in > ExternalAppendOnlyMap > --- > > Key: SPARK-22286 > URL: https://issues.apache.org/jira/browse/SPARK-22286 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.1.1, 2.1.2 >Reporter: Lijie Xu >Priority: Critical > > *[Abstract]* > I recently encountered an OOM error in a simple _groupByKey_ application. > After profiling the application, I found the OOM error is related to the > shuffle spill and records (de)serialization. After analyzing the OOM heap > dump, I found the root causes are (1) memory leak in ExternalAppendOnlyMap, > (2) large static serializer batch size (_spark.shuffle.spill.batchSize_ > =1) defined in ExternalAppendOnlyMap, and (3) memory leak in the > deserializer. Since almost all the Spark applications rely on > ExternalAppendOnlyMap to perform shuffle and reduce, this is a critical > bug/defect. In the following sections, I will detail the testing application, > data, environment, failure symptoms, diagnosing procedure, identified root > causes, and potential solutions. > *[Application]* > This is a simple GroupBy application as follows. > {code} > table.map(row => (row.sourceIP[1,7], row)).groupByKey().saveAsTextFile() > {code} > The _sourceIP_ (an IP address like 127.100.101.102) is a column of the > _UserVisits_ table. This application has the same logic as the aggregation > query in Berkeley SQL benchmark (https://amplab.cs.berkeley.edu/benchmark/) > as follows. > {code} > SELECT * FROM UserVisits > GROUP BY SUBSTR(sourceIP, 1, 7); > {code} > The application code is available at \[1\]. > *[Data]* > The UserVisits table size is 16GB (9 columns, 132,000,000 rows) with uniform > distribution. The HDFS block size is 128MB. The data generator is available > at \[2\]. > *[Environment]* > Spark 2.1 (Spark 2.2 may also have this error), Oracle Java Hotspot 1.8.0, 1 > master and 8 workers as follows. > !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Workers.png|width=100%! > This application launched 32 executors. Each executor has 1 core and 7GB > memory. The detailed application configuration is > {code} >total-executor-cores = 32 >executor-cores = 1 >executor-memory = 7G >spark.default.parallelism=32 >spark.serializer = JavaSerializer (KryoSerializer also has OOM error) > {code} > *[Failure symptoms]* > This application has a map stage and a reduce stage. An OOM error occurs in a > reduce task (Task-17) as follows. > !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Stage.png|width=100%! > !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/Tasks.png|width=100%! > > Task-17 generated an OOM error. It shuffled ~1GB data and spilled 3.6GB data > onto the disk. > Task-17 log below shows that this task is reading the next record by invoking > _ExternalAppendOnlyMap.hasNext_(). From the OOM stack traces and the above > shuffle metrics, we cannot identify the OOM root causes. > !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/OOM/OOMStackTrace.png|
[jira] [Commented] (SPARK-22713) OOM caused by the memory contention and memory leak in TaskMemoryManager
[ https://issues.apache.org/jira/browse/SPARK-22713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481881#comment-16481881 ] Eyal Farago commented on SPARK-22713: - [~jerrylead], what's the relation to SPARK-22286? does solving this at least partially solve that as well? > OOM caused by the memory contention and memory leak in TaskMemoryManager > > > Key: SPARK-22713 > URL: https://issues.apache.org/jira/browse/SPARK-22713 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.1.1, 2.1.2 >Reporter: Lijie Xu >Priority: Critical > > The pdf version of this issue with high-quality figures is available at > https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/report/OOM-TaskMemoryManager.pdf. > *[Abstract]* > I recently encountered an OOM error in a PageRank application > (_org.apache.spark.examples.SparkPageRank_). After profiling the application, > I found the OOM error is related to the memory contention in shuffle spill > phase. Here, the memory contention means that a task tries to release some > old memory consumers from memory for keeping the new memory consumers. After > analyzing the OOM heap dump, I found the root cause is a memory leak in > _TaskMemoryManager_. Since memory contention is common in shuffle phase, this > is a critical bug/defect. In the following sections, I will use the > application dataflow, execution log, heap dump, and source code to identify > the root cause. > *[Application]* > This is a PageRank application from Spark’s example library. The following > figure shows the application dataflow. The source code is available at \[1\]. > !https://raw.githubusercontent.com/JerryLead/Misc/master/OOM-TasksMemoryManager/figures/PageRankDataflow.png|width=100%! > *[Failure symptoms]* > This application has a map stage and many iterative reduce stages. An OOM > error occurs in a reduce task (Task-28) as follows. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/Stage.png?raw=true|width=100%! > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/task.png?raw=true|width=100%! > > *[OOM root cause identification]* > Each executor has 1 CPU core and 6.5GB memory, so it only runs one task at a > time. After analyzing the application dataflow, error log, heap dump, and > source code, I found the following steps lead to the OOM error. > => The MemoryManager found that there is not enough memory to cache the > _links:ShuffledRDD_ (rdd-5-28, red circles in the dataflow figure). > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/ShuffledRDD.png?raw=true|width=100%! > => The task needs to shuffle twice (1st shuffle and 2nd shuffle in the > dataflow figure). > => The task needs to generate two _ExternalAppendOnlyMap_ (E1 for 1st shuffle > and E2 for 2nd shuffle) in sequence. > => The 1st shuffle begins and ends. E1 aggregates all the shuffled data of > 1st shuffle and achieves 3.3 GB. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/FirstShuffle.png?raw=true|width=100%! > => The 2nd shuffle begins. E2 is aggregating the shuffled data of 2nd > shuffle, and finding that there is not enough memory left. This triggers the > memory contention. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/SecondShuffle.png?raw=true|width=100%! > => To handle the memory contention, the _TaskMemoryManager_ releases E1 > (spills it onto disk) and assumes that the 3.3GB space is free now. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/MemoryContention.png?raw=true|width=100%! > => E2 continues to aggregates the shuffled records of 2nd shuffle. However, > E2 encounters an OOM error while shuffling. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/OOMbefore.png?raw=true|width=100%! > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/OOMError.png?raw=true|width=100%! > *[Guess]* > The task memory usage below reveals that there is not memory drop down. So, > the cause may be that the 3.3GB _ExternalAppendOnlyMap_ (E1) is not actually > released by the _TaskMemoryManger_. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/GCFigure.png?raw=true|width=100%! > *[Root cause]* > After analyzing the heap dump, I found the guess is right (the 3.3GB > _ExternalAppendOnlyMap_ is actually not released). The 1.6GB object is > _ExternalAppendOnlyMap (E2)_. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/heapdump.png?raw=true|width=100%! > *[Question]* > Why the released _ExternalAppendOnlyMap_ is still in memory?
[jira] [Commented] (SPARK-22713) OOM caused by the memory contention and memory leak in TaskMemoryManager
[ https://issues.apache.org/jira/browse/SPARK-22713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481468#comment-16481468 ] Eyal Farago commented on SPARK-22713: - [~jerrylead], excellent investigation and description of the issue, I'll open a PR shortly. > OOM caused by the memory contention and memory leak in TaskMemoryManager > > > Key: SPARK-22713 > URL: https://issues.apache.org/jira/browse/SPARK-22713 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.1.1, 2.1.2 >Reporter: Lijie Xu >Priority: Critical > > The pdf version of this issue with high-quality figures is available at > https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/report/OOM-TaskMemoryManager.pdf. > *[Abstract]* > I recently encountered an OOM error in a PageRank application > (_org.apache.spark.examples.SparkPageRank_). After profiling the application, > I found the OOM error is related to the memory contention in shuffle spill > phase. Here, the memory contention means that a task tries to release some > old memory consumers from memory for keeping the new memory consumers. After > analyzing the OOM heap dump, I found the root cause is a memory leak in > _TaskMemoryManager_. Since memory contention is common in shuffle phase, this > is a critical bug/defect. In the following sections, I will use the > application dataflow, execution log, heap dump, and source code to identify > the root cause. > *[Application]* > This is a PageRank application from Spark’s example library. The following > figure shows the application dataflow. The source code is available at \[1\]. > !https://raw.githubusercontent.com/JerryLead/Misc/master/OOM-TasksMemoryManager/figures/PageRankDataflow.png|width=100%! > *[Failure symptoms]* > This application has a map stage and many iterative reduce stages. An OOM > error occurs in a reduce task (Task-28) as follows. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/Stage.png?raw=true|width=100%! > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/task.png?raw=true|width=100%! > > *[OOM root cause identification]* > Each executor has 1 CPU core and 6.5GB memory, so it only runs one task at a > time. After analyzing the application dataflow, error log, heap dump, and > source code, I found the following steps lead to the OOM error. > => The MemoryManager found that there is not enough memory to cache the > _links:ShuffledRDD_ (rdd-5-28, red circles in the dataflow figure). > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/ShuffledRDD.png?raw=true|width=100%! > => The task needs to shuffle twice (1st shuffle and 2nd shuffle in the > dataflow figure). > => The task needs to generate two _ExternalAppendOnlyMap_ (E1 for 1st shuffle > and E2 for 2nd shuffle) in sequence. > => The 1st shuffle begins and ends. E1 aggregates all the shuffled data of > 1st shuffle and achieves 3.3 GB. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/FirstShuffle.png?raw=true|width=100%! > => The 2nd shuffle begins. E2 is aggregating the shuffled data of 2nd > shuffle, and finding that there is not enough memory left. This triggers the > memory contention. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/SecondShuffle.png?raw=true|width=100%! > => To handle the memory contention, the _TaskMemoryManager_ releases E1 > (spills it onto disk) and assumes that the 3.3GB space is free now. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/MemoryContention.png?raw=true|width=100%! > => E2 continues to aggregates the shuffled records of 2nd shuffle. However, > E2 encounters an OOM error while shuffling. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/OOMbefore.png?raw=true|width=100%! > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/OOMError.png?raw=true|width=100%! > *[Guess]* > The task memory usage below reveals that there is not memory drop down. So, > the cause may be that the 3.3GB _ExternalAppendOnlyMap_ (E1) is not actually > released by the _TaskMemoryManger_. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/GCFigure.png?raw=true|width=100%! > *[Root cause]* > After analyzing the heap dump, I found the guess is right (the 3.3GB > _ExternalAppendOnlyMap_ is actually not released). The 1.6GB object is > _ExternalAppendOnlyMap (E2)_. > !https://github.com/JerryLead/Misc/blob/master/OOM-TasksMemoryManager/figures/heapdump.png?raw=true|width=100%! > *[Question]* > Why the released _ExternalAppendOnlyMap_ is still in memory? > The source co
[jira] [Commented] (SPARK-21109) union two dataset[A] don't work as expected if one of the datasets is originated from a dataframe
[ https://issues.apache.org/jira/browse/SPARK-21109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453625#comment-16453625 ] Eyal Farago commented on SPARK-21109: - I've also encountered this issue in two separate ways, in one of them I got a cast exception similar to the one [~superwai] encountered, however in a slightly different one I got wrong results: {code:java} case class DataV1(a: Int) case class DataV2(a: Int, b: Int) test("weired spark bug"){ import sqlContext.implicits._ val df1 = List(DataV1(1)).toDS().toDF() val df2 = List(DataV2(2, 3)).toDS().toDF() val df3 = df1 .withColumn("commitUUID", functions lit 1) .withColumn("b", functions lit 42) val df4 = df2 .withColumn("commitUUID", functions lit 2) .withColumn("b", functions.coalesce($"b", functions lit 42)) val u = df3 union df4 u.as[DataV2].show } and the output: +---+--+---+ | a|commitUUID| b| +---+--+---+ | 1| 1| 42| | 2| 3| 2| +---+--+---+ {code} you can see that in the second line value 2 was assigned to column 'b' instead of 'commitUUID' which was assigned with b's value (3). slightly modifying this example and making the commitUUID field a String results with an exception similar to the one [~superwai] reported: {code:java} Cannot up cast `b` from string to int as it may truncate The type path of the target object is: - field (class: "scala.Int", name: "b") - root class: "com.nrgene.genomagic.pipeline.spark.DataV2" You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object; org.apache.spark.sql.AnalysisException: Cannot up cast `b` from string to int as it may truncate The type path of the target object is: - field (class: "scala.Int", name: "b") - root class: "com.nrgene.genomagic.pipeline.spark.DataV2" You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object; {code} [~viirya], I've seen [~rxin] comment on SPARK-21043 and I read your reasoning in the comments above, and I must says this looks like 'justifying' the original implementation detail. it simply wasn't handled properly when the union method was first introduced, then some people wrote code that relied on the 'by index' mapping and by then it could no longer be modified... I think you can see why it doesn't make much sense from the user's perspective, since as far as she cares both sides of the union are of the same type, the fact union can't see it is a bug in its implementation which is now deemed as a feature. If we put aside the backward compatibility issue, then fixing this is as simple as adding an analyzer rules that introduces a project if needed, an alternative might be to reject this use case during analysis. the way I see it current state of affairs is greatly misleading. [~superwai], one way of dealing with this situation (aside from upgrading spark and using the newly introduced unionByName) is to extract the fields order of one side and 'force' it on the other: {code:java} //make sure to align the schema val right2 = right.select(left.columns.map(functions.col _) : _*) left union right2 {code} > union two dataset[A] don't work as expected if one of the datasets is > originated from a dataframe > - > > Key: SPARK-21109 > URL: https://issues.apache.org/jira/browse/SPARK-21109 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1 >Reporter: Jerry Lam >Priority: Major > > To reproduce the issue: > {code} > case class my_case(id0: Long, id1: Int, id2: Int, id3: String) > val data1 = Seq(my_case(0L, 0, 0, "0")).toDS > val data2 = Seq(("1", 1, 1, 1L)).toDF("id3", "id1", "id2", "id0").as[my_case] > data1.show > +---+---+---+---+ > |id0|id1|id2|id3| > +---+---+---+---+ > | 0| 0| 0| 0| > +---+---+---+---+ > data2.show > +---+---+---+---+ > |id3|id1|id2|id0| > +---+---+---+---+ > | 1| 1| 1| 1| > +---+---+---+---+ > data1.union(data2).show > org.apache.spark.sql.AnalysisException: Cannot up cast `id0` from string to > bigint as it may truncate > The type path of the target object is: > - field (class: "scala.Long", name: "id0") > - root class: "my_case" > You can either add an explicit cast to the input data or choose a higher > precision type of the field in the target object; > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2123) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34$$anonfun$applyOrElse$14.applyOrElse(Analyzer.scala:2153) > at > org.apache.spark.sql.catalyst.analysis.Analyzer
[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358668#comment-16358668 ] Eyal Farago commented on SPARK-19870: - I'll remember to share relevant future logs😎 Re. The exception code path missing a cleanup, you're definitely right but I'm less concerned about this one as this code path is 'reserved' to tasks (I don't think Netty threads ever gets to this code) hence cleanup (+warning) is guaranteed. > Repeatable deadlock on BlockInfoManager and TorrentBroadcast > > > Key: SPARK-19870 > URL: https://issues.apache.org/jira/browse/SPARK-19870 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 2.0.2, 2.1.0 > Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, > yarn coarse-grained. >Reporter: Steven Ruppert >Priority: Major > Attachments: cs.executor.log, stack.txt > > > Running what I believe to be a fairly vanilla spark job, using the RDD api, > with several shuffles, a cached RDD, and finally a conversion to DataFrame to > save to parquet. I get a repeatable deadlock at the very last reducers of one > of the stages. > Roughly: > {noformat} > "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 > tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry > [0x7fffb95f3000] >java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207) > - waiting to lock <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x0005b12f2290> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > and > {noformat} > "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 > tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202) > - locked <0x000545736b58> (a > org.apache.spark.storage.BlockInfoManager) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210) > - locked <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x00059711eb10> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > jav
[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358458#comment-16358458 ] Eyal Farago commented on SPARK-19870: - [~irashid], I'm afraid I don't have a documentation of which executor got to this hang, so I can't think of a way to find its logs (on top of this the spark-ui via history server seems a bit unreliable, i.e. jobs 'running' in the ui are rported to complete in the executor logs). can you please share, what is it you're looking for in the executor logs? as you could see in the one I've shared spark's logging level is set to WARN so there's not much into it... > Repeatable deadlock on BlockInfoManager and TorrentBroadcast > > > Key: SPARK-19870 > URL: https://issues.apache.org/jira/browse/SPARK-19870 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 2.0.2, 2.1.0 > Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, > yarn coarse-grained. >Reporter: Steven Ruppert >Priority: Major > Attachments: cs.executor.log, stack.txt > > > Running what I believe to be a fairly vanilla spark job, using the RDD api, > with several shuffles, a cached RDD, and finally a conversion to DataFrame to > save to parquet. I get a repeatable deadlock at the very last reducers of one > of the stages. > Roughly: > {noformat} > "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 > tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry > [0x7fffb95f3000] >java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207) > - waiting to lock <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x0005b12f2290> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > and > {noformat} > "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 > tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202) > - locked <0x000545736b58> (a > org.apache.spark.storage.BlockInfoManager) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210) > - locked <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x00059711eb10> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apac
[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356992#comment-16356992 ] Eyal Farago commented on SPARK-19870: - [~irashid], attached a sample executor log > Repeatable deadlock on BlockInfoManager and TorrentBroadcast > > > Key: SPARK-19870 > URL: https://issues.apache.org/jira/browse/SPARK-19870 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 2.0.2, 2.1.0 > Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, > yarn coarse-grained. >Reporter: Steven Ruppert >Priority: Major > Attachments: cs.executor.log, stack.txt > > > Running what I believe to be a fairly vanilla spark job, using the RDD api, > with several shuffles, a cached RDD, and finally a conversion to DataFrame to > save to parquet. I get a repeatable deadlock at the very last reducers of one > of the stages. > Roughly: > {noformat} > "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 > tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry > [0x7fffb95f3000] >java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207) > - waiting to lock <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x0005b12f2290> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > and > {noformat} > "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 > tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202) > - locked <0x000545736b58> (a > org.apache.spark.storage.BlockInfoManager) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210) > - locked <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x00059711eb10> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > A full stack trace is attached, but those seem to be the offending threads. > This happens across several
[jira] [Updated] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eyal Farago updated SPARK-19870: Attachment: cs.executor.log > Repeatable deadlock on BlockInfoManager and TorrentBroadcast > > > Key: SPARK-19870 > URL: https://issues.apache.org/jira/browse/SPARK-19870 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 2.0.2, 2.1.0 > Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, > yarn coarse-grained. >Reporter: Steven Ruppert >Priority: Major > Attachments: cs.executor.log, stack.txt > > > Running what I believe to be a fairly vanilla spark job, using the RDD api, > with several shuffles, a cached RDD, and finally a conversion to DataFrame to > save to parquet. I get a repeatable deadlock at the very last reducers of one > of the stages. > Roughly: > {noformat} > "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 > tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry > [0x7fffb95f3000] >java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207) > - waiting to lock <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x0005b12f2290> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > and > {noformat} > "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 > tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202) > - locked <0x000545736b58> (a > org.apache.spark.storage.BlockInfoManager) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210) > - locked <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x00059711eb10> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > A full stack trace is attached, but those seem to be the offending threads. > This happens across several different executors, and has persisted through > several runs of th
[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355174#comment-16355174 ] Eyal Farago commented on SPARK-19870: - [~irashid], wenth through executors' logs and found no errors. did find some decent GC pauses and warnings like this: {quote}WARN Executor: 1 block locks were not released by TID = 5413: [rdd_53_42] {quote} this aligns with my findings of tasks cleaning after themselves, theoretically in case of failure as well. since both distros we're using do not offer any of the relevant versions including your fix, I can't test this scenario with your patch (aside from other practical constraints :-)) so I can't really verify my assumption about the link between SPARK-19870 and SPARK-22083. [~stevenruppert], [~gdanov], can you guys try running with one of the relevant versions? > Repeatable deadlock on BlockInfoManager and TorrentBroadcast > > > Key: SPARK-19870 > URL: https://issues.apache.org/jira/browse/SPARK-19870 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 2.0.2, 2.1.0 > Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, > yarn coarse-grained. >Reporter: Steven Ruppert >Priority: Major > Attachments: stack.txt > > > Running what I believe to be a fairly vanilla spark job, using the RDD api, > with several shuffles, a cached RDD, and finally a conversion to DataFrame to > save to parquet. I get a repeatable deadlock at the very last reducers of one > of the stages. > Roughly: > {noformat} > "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 > tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry > [0x7fffb95f3000] >java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207) > - waiting to lock <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x0005b12f2290> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > and > {noformat} > "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 > tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202) > - locked <0x000545736b58> (a > org.apache.spark.storage.BlockInfoManager) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210) > - locked <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x00059711eb10> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(Shuff
[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353651#comment-16353651 ] Eyal Farago commented on SPARK-19870: - we've also seen something very similar (stack traces attach) with spark 2.1.0, in our case this happened after several OOM conditions and executors being killed and restarted (new AWS account was limited on the chosen instance type we chose). I've stared at these stack traces a lot and 'walked alone' the block manager (and block info manager) code paths, my conclusion was that a thread leaks a writer lock for at least one block, further investigation shows that tasks are registered for cleanup which should eliminate the potential problem (block info manager maintains maps of readers/writers per block). However, there are other threads living inside a spark executor, such as the BlockTransferService that are not tasks hence have no ID and guaranteed cleanup... I think I found a potential weak spot in the way spark evicts blocks from memory (org.apache.spark.storage.memory.MemoryStore#evictBlocksToFreeSpace) that has the potential of keep holding locks in case of failure. SPARK-22083 seems to fix this so I'd recommend trying out one of the versions including the fix (we're currently checking the possibility to do so with the distros we're using). [~irashid], you fixed SPARK–22083, do the scenarios referred in this issue resembles what you were fixing? does my analysis make sense to you? attaching my stack traces: {noformat} Thread ID Thread Name Thread StateThread Locks 148 Executor task launch worker for task 75639 WAITING Lock(java.util.concurrent.ThreadPoolExecutor$Worker@155076597}) java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:502) org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202) org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:450) org.apache.spark.storage.BlockManager.get(BlockManager.scala:636) org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:693) org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) org.apache.spark.rdd.RDD.iterator(RDD.scala:285) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) scala.collection.immutable.List.foreach(List.scala:381) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) org.apache.spark.scheduler.Task.run(Task.scala:99) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) 168 Executor task launch worker for task 75642 WAITING Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1540715596}) java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:502) org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202) org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:450) org.apache.spark.storage.BlockManager.get(BlockManager.scala:636) org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:693) org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) org.apache.spark.rdd.RDD.iterator(RDD.scala:285) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) org.apache.spark.rdd.CoGroup
[jira] [Commented] (SPARK-18067) SortMergeJoin adds shuffle if join predicates have non partitioned columns
[ https://issues.apache.org/jira/browse/SPARK-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352594#comment-16352594 ] Eyal Farago commented on SPARK-18067: - [~tejasp], this issue + associated pull-request seems to relax the distribution requirements of the top-level join, I have an opposite case where the join has one key but one of its children uses two keys (including the join key) for aggregation. spark 2.1 plans this (poorly?) with two exchange operators, one for the aggregate based on both keys and another for the join (based on one key). I think that in this case the planner could relax distribution requirements of the aggregation (child) and rely on the fact the distributing by one key implies distributing by both keys (under the limitations that data might get skewed, we may need to spill during aggregation, etc...). few questions: # is there any specific reason your approach is limited to joins rather that general use cases of 'stacked' exchange operators? # do you think there's room for a second strategy relaxing the distribution requirements of the inner exchange (aggregate in my case). I'm also attaching an example plan I'm currently getting (slightly modified): {noformat} == Physical Plan == *Project [v1 AS v1#472, v2#311L AS v2#473L, v3#312L AS v3#474L, key1#355L AS k1#475L, key2#385 AS key2#477] +- *SortMergeJoin [key1#355L], [key1#304L], Inner :- *Sort [key1#355L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key1#355L, 8) : +- *HashAggregate(keys=[key1#355L, key2#385], functions=[], output=[key1#355L, key2#385]) :+- Exchange hashpartitioning(key1#355L, key2#385, 8) : +- *HashAggregate(keys=[key1#355L, key2#385], functions=[], output=[key1#355L, key2#385]) : +- *Project [key1#355L, UDF(v4#365) AS key2#385] : +- *Filter ((isnotnull(key1#355L) && NOT (key1#355L = -1)) && v4#365 IN (s1,s2,s3,s4)) :+- *FileScan parquet [key1#355L,v4#365,commitUUID#366] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file: mp/loads_only_samples_in_comparison_list5387414497663000634/008d9ea9-a0c..., PartitionCount: 1, PartitionFilters: [commitUUID#366 IN (oldCommit,commit-uuid)], PushedFilters: [IsNotNull(key1), Not(EqualTo(key1,-1)), In(v4, [s,s2,s3..., ReadSchema: struct +- *Sort [key1#304L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key1#304L, 8) +- *Project [key1#304L, v1, v2#311L, v3#312L] {noformat} notice the two exchanges in the join's first child. > SortMergeJoin adds shuffle if join predicates have non partitioned columns > -- > > Key: SPARK-18067 > URL: https://issues.apache.org/jira/browse/SPARK-18067 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 1.6.1 >Reporter: Paul Jones >Priority: Minor > > Basic setup > {code} > scala> case class Data1(key: String, value1: Int) > scala> case class Data2(key: String, value2: Int) > scala> val partition1 = sc.parallelize(1 to 10).map(x => Data1(s"$x", x)) > .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache > scala> val partition2 = sc.parallelize(1 to 10).map(x => Data2(s"$x", x)) > .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache > {code} > Join on key > {code} > scala> partition1.join(partition2, "key").explain > == Physical Plan == > Project [key#0,value1#1,value2#13] > +- SortMergeJoin [key#0], [key#12] >:- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation > [key#0,value1#1], true, 1, StorageLevel(true, true, false, true, 1), Sort > [key#0 ASC], false, 0, None >+- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation > [key#12,value2#13], true, 1, StorageLevel(true, true, false, true, 1), > Sort [key#12 ASC], false, 0, None > {code} > And we get a super efficient join with no shuffle. > But if we add a filter our join gets less efficient and we end up with a > shuffle. > {code} > scala> partition1.join(partition2, "key").filter($"value1" === > $"value2").explain > == Physical Plan == > Project [key#0,value1#1,value2#13] > +- SortMergeJoin [value1#1,key#0], [value2#13,key#12] >:- Sort [value1#1 ASC,key#0 ASC], false, 0 >: +- TungstenExchange hashpartitioning(value1#1,key#0,200), None >: +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation > [key#0,value1#1], true, 1, StorageLevel(true, true, false, true, 1), Sort > [key#0 ASC], false, 0, None >+- Sort [value2#13 ASC,key#12 ASC], false, 0 > +- TungstenExchange hashpartitioning(value2#13,key#12,200), None > +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation > [key#12,value2#13], true, 1, St
[jira] [Commented] (SPARK-21867) Support async spilling in UnsafeShuffleWriter
[ https://issues.apache.org/jira/browse/SPARK-21867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16287461#comment-16287461 ] Eyal Farago commented on SPARK-21867: - [~ericvandenbergfb], looks good few questions though: 1. initial number of sorters? 2. assuming initial number of sorters >1, doesn't this mean you're potentially increasing number of spills? if a single sorter would spill after N records, a multi-sorter will spill after N/k records (where k is the number of sorters), doesn't this mean more spills and merges? 3. when a sorter hits the spill threshold, does it immediately spill or will it keep going if there's available execution memory? it might make sense to spill and raise the threshold in this condition... 4. merging spills may require some attention: compression, encryption, etc. also avoiding too many open files/buffers at the same time looking forward for the PR > Support async spilling in UnsafeShuffleWriter > - > > Key: SPARK-21867 > URL: https://issues.apache.org/jira/browse/SPARK-21867 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Sital Kedia >Priority: Minor > Attachments: Async ShuffleExternalSorter.pdf > > > Currently, Spark tasks are single-threaded. But we see it could greatly > improve the performance of the jobs, if we can multi-thread some part of it. > For example, profiling our map tasks, which reads large amount of data from > HDFS and spill to disks, we see that we are blocked on HDFS read and spilling > majority of the time. Since both these operations are IO intensive the > average CPU consumption during map phase is significantly low. In theory, > both HDFS read and spilling can be done in parallel if we had additional > memory to store data read from HDFS while we are spilling the last batch read. > Let's say we have 1G of shuffle memory available per task. Currently, in case > of map task, it reads from HDFS and the records are stored in the available > memory buffer. Once we hit the memory limit and there is no more space to > store the records, we sort and spill the content to disk. While we are > spilling to disk, since we do not have any available memory, we can not read > from HDFS concurrently. > Here we propose supporting async spilling for UnsafeShuffleWriter, so that we > can support reading from HDFS when sort and spill is happening > asynchronously. Let's say the total 1G of shuffle memory can be split into > two regions - active region and spilling region - each of size 500 MB. We > start with reading from HDFS and filling the active region. Once we hit the > limit of active region, we issue an asynchronous spill, while fliping the > active region and spilling region. While the spil is happening > asynchronosuly, we still have 500 MB of memory available to read the data > from HDFS. This way we can amortize the high disk/network io cost during > spilling. > We made a prototype hack to implement this feature and we could see our map > tasks were as much as 40% faster. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16266102#comment-16266102 ] Eyal Farago commented on SPARK-22579: - [~srowen], I couldn't see a place where this data is stored for later use, even in the 'big fetch' scenario the file isn't reused as org.apache.spark.storage.BlockManager#remoteBlockTempFileManager is only used to stream big file - once, it's never used to figure out if the download can be avoided. furthermore this blocks are also not registered with the block manager, after all they can be used as 'DISK_ONLY' cache on the requesting executor and potentially even by other executors residing on the same node. [~jerryshao], it seems the 'big files' code path actually uses streaming, makes me think it can be slightly modified to actually accept a stream handler (current behavior can be achieved by passing in a download stream handler). one thing that does have the potential for troubles is error recovery when more than one executor can serve the block, current impl simply moves to the next one, a streaming based approach would have to request the rest of the block from an other executor (assuming blocks are identical). > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usual
[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265045#comment-16265045 ] Eyal Farago commented on SPARK-22579: - [~jerryshao], SPARK-22062 seems to solve the memory footprint issue, will take a deeper look at it (and pray fro mapR to upgrade their spark version). [~srowen], not sure what do you mean by reading the data twice, getRemoteValues returns an iterator, if called twice on the same block it will fetch the data twice, even with current implementation. and yes I agree I need to read the data anyway, but I don't need all of it at once, executors consume partitions via iterators, this allows for streaming/pagination behind the scenes which is currently not implemented by the block manager. btw, this kind of streaming/pagination can play nicely with SPARK-22062 and any potential caching mechanism that night be introduced into the block manager in the future. I'd also like to stress out that what we've experienced was lengthy tasks, given that these tasks were 'dead in the water' waiting for the transfer to complete is a bit unfortunate as they could have make progress with the part of the transfer that already completed. I believe streaming/pagination or some other reactive approach can greatly improve this behavior. > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use ca
[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262120#comment-16262120 ] Eyal Farago commented on SPARK-22579: - CC: [~hvanhovell] (we've discussed this privately), [~joshrosen], [~jerryshao2015] (git annotations points at you guys :-) ) > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usually don't need all values at once as it > consumes the values via an iterator. > 4. potentially large memory footprint on serving executor, in case the block > is cached in deserialized form the serving executor has to serialize it into > a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU > intensive, memory footprint can be reduced by using a limited buffer for > serialization 'spilling' to the response stream. > I suggest improving this either by implementing full streaming mechanism or > some kind of pagination mechanism, in addition the requesting executor should > be able to make progress with the data it already has, blocking only when > local buffer is exhausted and remote side didn't deliver the next chunk of > the stream (or page in case of pagination) yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029) -
[jira] [Created] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
Eyal Farago created SPARK-22579: --- Summary: BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming Key: SPARK-22579 URL: https://issues.apache.org/jira/browse/SPARK-22579 Project: Spark Issue Type: Bug Components: Block Manager, Spark Core Affects Versions: 2.1.0 Reporter: Eyal Farago when an RDD partition is cached on an executor bu the task requiring it is running on another executor (process locality ANY), the cached partition is fetched via BlockManager.getRemoteValues which delegates to BlockManager.getRemoteBytes, both calls are blocking. in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes cluster, cached to disk. rough math shows that average partition size is 700MB. looking at spark UI it was obvious that tasks running with process locality 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I was able to capture thread dumps of executors executing remote tasks and got this stake trace: {quote}Thread IDThread Name Thread StateThread Locks 1521Executor task launch worker-1000WAITING Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) scala.concurrent.Await$.result(package.scala:190) org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) org.apache.spark.rdd.RDD.iterator(RDD.scala:285) org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} digging into the code showed that the block manager first fetches all bytes (getRemoteBytes) and then wraps it with a deserialization stream, this has several draw backs: 1. blocking, requesting executor is blocked while the remote executor is serving the block. 2. potentially large memory footprint on requesting executor, in my use case a 700mb of raw bytes stored in a ChunkedByteBuffer. 3. inefficient, requesting side usually don't need all values at once as it consumes the values via an iterator. 4. potentially large memory footprint on serving executor, in case the block is cached in deserialized form the serving executor has to serialize it into a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU intensive, memory footprint can be reduced by using a limited buffer for serialization 'spilling' to the response stream. I suggest improving this either by implementing full streaming mechanism or some kind of pagination mechanism, in addition the requesting executor should be able to make progress with the data it already has, blocking only when local buffer is exhausted and remote side didn't deliver the next chunk of the stream (or page in case of pagination) yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21907) NullPointerException in UnsafeExternalSorter.spill()
[ https://issues.apache.org/jira/browse/SPARK-21907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160366#comment-16160366 ] Eyal Farago commented on SPARK-21907: - opened PR: https://github.com/apache/spark/pull/19181 > NullPointerException in UnsafeExternalSorter.spill() > > > Key: SPARK-21907 > URL: https://issues.apache.org/jira/browse/SPARK-21907 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Juliusz Sompolski > > I see NPE during sorting with the following stacktrace: > {code} > java.lang.NullPointerException > at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:383) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:63) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:43) > at > org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:270) > at org.apache.spark.util.collection.TimSort.sort(TimSort.java:142) > at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:345) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206) > at > org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203) > at > org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:281) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:90) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset(UnsafeInMemorySorter.java:173) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:221) > at > org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203) > at > org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:281) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:90) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:349) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:400) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) > at > org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) > at > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:778) > at > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextInnerJoinRows(SortMergeJoinExec.scala:685) > at > org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1$$anon$2.advanceNext(SortMergeJoinExec.scala:259) > at > org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:346) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.u
[jira] [Commented] (SPARK-21907) NullPointerException in UnsafeExternalSorter.spill()
[ https://issues.apache.org/jira/browse/SPARK-21907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160062#comment-16160062 ] Eyal Farago commented on SPARK-21907: - [~juliuszsompolski], I've followed the stack trace you've attached (as much as it's possible with master's code) and I tend to agree with your assumption about UnsafeInMemorySorter.reset. it seems that allocateArray did fail on OOM and triggered a nested spill, the thing is that by this point array points to an already freed block, the first time array is actually accessed (TimSort invokes the comparator) it fails on an NPE (assuming asserts are really turned off on your env). i think the way to solve this is by temporarily setting array (and the relevant pos, capacity, etc) to null/zero/some other value indicating a currently unreadable/empty buffer. [~kiszk], what do you think? > NullPointerException in UnsafeExternalSorter.spill() > > > Key: SPARK-21907 > URL: https://issues.apache.org/jira/browse/SPARK-21907 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Juliusz Sompolski > > I see NPE during sorting with the following stacktrace: > {code} > java.lang.NullPointerException > at > org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:383) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:63) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:43) > at > org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:270) > at org.apache.spark.util.collection.TimSort.sort(TimSort.java:142) > at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:345) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206) > at > org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203) > at > org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:281) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:90) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset(UnsafeInMemorySorter.java:173) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:221) > at > org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203) > at > org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:281) > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:90) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:349) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:400) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) > at > org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) > at > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:778) > at > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextInnerJoinRows(SortMergeJoinExec.scala:685) > at > org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1$$anon$2.advanceNext(SortMergeJoinExec.scala:259) > at > org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegen
[jira] [Commented] (SPARK-3151) DiskStore attempts to map any size BlockId without checking MappedByteBuffer limit
[ https://issues.apache.org/jira/browse/SPARK-3151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16118347#comment-16118347 ] Eyal Farago commented on SPARK-3151: created PR 18855: https://github.com/apache/spark/pull/18855 > DiskStore attempts to map any size BlockId without checking MappedByteBuffer > limit > -- > > Key: SPARK-3151 > URL: https://issues.apache.org/jira/browse/SPARK-3151 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 1.0.2 > Environment: IBM 64-bit JVM PPC64 >Reporter: Damon Brown >Priority: Minor > > [DiskStore|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/DiskStore.scala] > attempts to memory map the block file in {{def getBytes}}. If the file is > larger than 2GB (Integer.MAX_VALUE) as specified by > [FileChannel.map|http://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileChannel.html#map%28java.nio.channels.FileChannel.MapMode,%20long,%20long%29], > then the memory map fails. > {code} > Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length)) # line > 104 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3151) DiskStore attempts to map any size BlockId without checking MappedByteBuffer limit
[ https://issues.apache.org/jira/browse/SPARK-3151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074392#comment-16074392 ] Eyal Farago commented on SPARK-3151: I just hit this with spark 2.1 when processing a disk persisted RDD while the root cause for this is probably data skew, this seems like a severe limitation on spark. this specific failure is a bit surprising as spark already knows the block size on disk at this point (it maps the entire file from offset 0 to file size), so it should easily be possible to split this into several blocks, after all the code is using ChunkedByteBuffer. a better approach would be lazily loading the blocks as deserialization progresses , and an even better solution (proposed by [~matei] in [comment|https://issues.apache.org/jira/browse/SPARK-1476?focusedCommentId=13967947&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13967947] for #1476) would be splitting these block in higher levels of spark (a mapper that produces multiple blocks, cached RDD partition that consists of several blocks...) I can see the attached PR is closed, is there an expected/in-progress fix for this? 2017-07-05 07:04:51,368 UTC WARNtask-result-getter-0 org.apache.spark.scheduler.TaskSetManagerLost task 131.0 in stage 14.0 (TID 2228, 172.20.1.137, executor 1): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:465) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:701) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) > DiskStore attempts to map any size BlockId without checking MappedByteBuffer > limit > -- > > Key: SPARK-3151 > URL: https://issues.apache.org/jira/browse/SPARK-3151 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 1.0.2 > Environment: IBM 64-bit JVM PPC64 >Reporter: Damon Brown >Priority: Minor > > [DiskStore|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/DiskStore.scala] > attempts to memory map the block file in {{def getBytes}}. If the file is > larger than 2GB (Integer.MAX_VALUE) as specified by > [FileChannel.map|http://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileChannel.html#map%28java.nio.channels.FileChannel.MapMode,%20long,%20long%29], > then the memory map fails. > {code} > Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length)) # line > 104 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15961954#comment-15961954 ] Eyal Farago commented on SPARK-19870: - [~stevenruppert] logs from the hung executor might shed some light on this. regarding your comment about jvm machinery not allowing the program to make progress is not completely accurate as the lock on the BlockinfoManager isn't really held when entering wait(), other locks held up in the stack are not mandatory in order to get to the BlockInfoManager's lock, my guess is that this block is somehow dropped from memory by other tasks running on this executor (drop codepath does not go through the TorrentBrodcast code path hence it can make progress without locking it. [~joshrosen], it seems my 'theory' regarding the completion iterator does not hold as it calls the completion on a false returning hasNext() call which just doesn't happen in this case. > Repeatable deadlock on BlockInfoManager and TorrentBroadcast > > > Key: SPARK-19870 > URL: https://issues.apache.org/jira/browse/SPARK-19870 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 2.0.2, 2.1.0 > Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, > yarn coarse-grained. >Reporter: Steven Ruppert > Attachments: stack.txt > > > Running what I believe to be a fairly vanilla spark job, using the RDD api, > with several shuffles, a cached RDD, and finally a conversion to DataFrame to > save to parquet. I get a repeatable deadlock at the very last reducers of one > of the stages. > Roughly: > {noformat} > "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 > tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry > [0x7fffb95f3000] >java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207) > - waiting to lock <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x0005b12f2290> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > and > {noformat} > "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 > tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202) > - locked <0x000545736b58> (a > org.apache.spark.storage.BlockInfoManager) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210) > - locked <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x00059711eb10> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at >
[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15959169#comment-15959169 ] Eyal Farago commented on SPARK-19870: - Steven, I meant traces produced by logging. Sean, this looks like a missed notification, one thread holds a lock and waits for a notification (via Java's synchronized,wait,notify mechanism), second thread attempts to sync on same object, while the first waits for notification+condition (that never happens). This code is annotated with lots of traces that can shed some light on the issue. > Repeatable deadlock on BlockInfoManager and TorrentBroadcast > > > Key: SPARK-19870 > URL: https://issues.apache.org/jira/browse/SPARK-19870 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 2.0.2, 2.1.0 > Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, > yarn coarse-grained. >Reporter: Steven Ruppert > Attachments: stack.txt > > > Running what I believe to be a fairly vanilla spark job, using the RDD api, > with several shuffles, a cached RDD, and finally a conversion to DataFrame to > save to parquet. I get a repeatable deadlock at the very last reducers of one > of the stages. > Roughly: > {noformat} > "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 > tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry > [0x7fffb95f3000] >java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207) > - waiting to lock <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x0005b12f2290> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > and > {noformat} > "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 > tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202) > - locked <0x000545736b58> (a > org.apache.spark.storage.BlockInfoManager) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210) > - locked <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x00059711eb10> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.r
[jira] [Commented] (SPARK-19870) Repeatable deadlock on BlockInfoManager and TorrentBroadcast
[ https://issues.apache.org/jira/browse/SPARK-19870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15958894#comment-15958894 ] Eyal Farago commented on SPARK-19870: - [~stevenruppert] can you attach traces? [~joshrosen], a glance at the torrent broadcast code shows that it explicitly [releases the read-lock|https://github.com/apache/spark/blob/d009fb369bbea0df81bbcf9c8028d14cfcaa683b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L214] on the block even though the [iterator obtained from the BlockResult|https://github.com/apache/spark/blob/d009fb369bbea0df81bbcf9c8028d14cfcaa683b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L497] is a completion iterator that releases locks (read lock in this scenario) upon completion. is it possible that the torrent code 'double-releases' the read lock on the relevant block putting it in some inconsistent state that prevents the next read from obtaining the lock (or somehow not notify on release)? [~joshrosen], seems at least part of this is related to your work on SPARK-12757, hence tagging you. > Repeatable deadlock on BlockInfoManager and TorrentBroadcast > > > Key: SPARK-19870 > URL: https://issues.apache.org/jira/browse/SPARK-19870 > Project: Spark > Issue Type: Bug > Components: Block Manager, Shuffle >Affects Versions: 2.0.2, 2.1.0 > Environment: ubuntu linux 14.04 x86_64 on ec2, hadoop cdh 5.10.0, > yarn coarse-grained. >Reporter: Steven Ruppert > Attachments: stack.txt > > > Running what I believe to be a fairly vanilla spark job, using the RDD api, > with several shuffles, a cached RDD, and finally a conversion to DataFrame to > save to parquet. I get a repeatable deadlock at the very last reducers of one > of the stages. > Roughly: > {noformat} > "Executor task launch worker-6" #56 daemon prio=5 os_prio=0 > tid=0x7fffd88d3000 nid=0x1022b9 waiting for monitor entry > [0x7fffb95f3000] >java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:207) > - waiting to lock <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x0005b12f2290> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > and > {noformat} > "Executor task launch worker-5" #55 daemon prio=5 os_prio=0 > tid=0x7fffd88d nid=0x1022b8 in Object.wait() [0x7fffb96f4000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:202) > - locked <0x000545736b58> (a > org.apache.spark.storage.BlockInfoManager) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:444) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210) > - locked <0x0005445cfc00> (a > org.apache.spark.broadcast.TorrentBroadcast$) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) > - locked <0x00059711eb10> (a > org.apache.spark.broadcast.TorrentBroadcast) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) > at > org.apa
[jira] [Commented] (SPARK-18736) CreateMap allows non-unique keys
[ https://issues.apache.org/jira/browse/SPARK-18736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15851078#comment-15851078 ] Eyal Farago commented on SPARK-18736: - Spark-8601 is making (slow) progress,it's actually in final stages of review now. One thing to notice is keeping direct evaluation,code generation and optimized version in sync, spark-8601 follows the current behavior of "first wins" by transforming map(...).Get(k) into a caseKeyWhen(k,...). If we decide to choose a "last wins" approach,the optimized version would probably have to reverse the order of the keys. > CreateMap allows non-unique keys > > > Key: SPARK-18736 > URL: https://issues.apache.org/jira/browse/SPARK-18736 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Eyal Farago > Labels: map, sql, types > > Spark-Sql, {{CreateMap}} does not enforce unique keys, i.e. it's possible to > create a map with two identical keys: > {noformat} > CreateMap(Literal(1), Literal(11), Literal(1), Literal(12)) > {noformat} > This does not behave like standard maps in common programming languages. > proper behavior should be chosen: > # first 'wins' > # last 'wins' > # runtime error. > {{GetMapValue}} currently implements option #1. Even if this is the desired > behavior {{CreateMap}} should return a unique map. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18736) CreateMap allows non-unique keys
[ https://issues.apache.org/jira/browse/SPARK-18736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15726232#comment-15726232 ] Eyal Farago commented on SPARK-18736: - @shuai Lin, I already have a pr in progress that addresses the possible literal keys optimizations during optimization phase(spark-8601), can you please implement the proper behavior in runtime and code generation and update the Jira with the chosen approach? This way we can align the two prs. Thanks, Eyal. > CreateMap allows non-unique keys > > > Key: SPARK-18736 > URL: https://issues.apache.org/jira/browse/SPARK-18736 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Eyal Farago > Labels: map, sql, types > > Spark-Sql, {{CreateMap}} does not enforce unique keys, i.e. it's possible to > create a map with two identical keys: > {noformat} > CreateMap(Literal(1), Literal(11), Literal(1), Literal(12)) > {noformat} > This does not behave like standard maps in common programming languages. > proper behavior should be chosen: > # first 'wins' > # last 'wins' > # runtime error. > {{GetMapValue}} currently implements option #1. Even if this is the desired > behavior {{CreateMap}} should return a unique map. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18736) [SQL] CreateMap allow non-unique keys
Eyal Farago created SPARK-18736: --- Summary: [SQL] CreateMap allow non-unique keys Key: SPARK-18736 URL: https://issues.apache.org/jira/browse/SPARK-18736 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.0 Reporter: Eyal Farago Spark-Sql, CreateMap does not enforce unique keys, i.e. it's possible to create a map with two identical keys: CreateMap(Literal(1), Literal(11), Literal(1), Literal(12)) This does not behave like standard maps in common programming languages. proper behavior should be chosen" 1. first 'wins' 2. last 'wins' 3. runtime error. * currently GetMapValue implements option #1. even if this is the desired behavior CreateMap should return a unique map. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14804) Graph vertexRDD/EdgeRDD checkpoint results ClassCastException:
[ https://issues.apache.org/jira/browse/SPARK-14804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15561452#comment-15561452 ] Eyal Farago commented on SPARK-14804: - I think this relates to SPARK-12431, is it possible to 'merge' both efforts? > Graph vertexRDD/EdgeRDD checkpoint results ClassCastException: > --- > > Key: SPARK-14804 > URL: https://issues.apache.org/jira/browse/SPARK-14804 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 1.6.1 >Reporter: SuYan >Priority: Minor > > {code} > graph3.vertices.checkpoint() > graph3.vertices.count() > graph3.vertices.map(_._2).count() > {code} > 16/04/21 21:04:43 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0 > (TID 13, localhost): java.lang.ClassCastException: > org.apache.spark.graphx.impl.ShippableVertexPartition cannot be cast to > scala.Tuple2 > at > com.xiaomi.infra.codelab.spark.Graph2$$anonfun$main$1.apply(Graph2.scala:80) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1161) > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1161) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1863) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1863) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:91) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:219) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > look at the code: > {code} > private[spark] def computeOrReadCheckpoint(split: Partition, context: > TaskContext): Iterator[T] = > { > if (isCheckpointedAndMaterialized) { > firstParent[T].iterator(split, context) > } else { > compute(split, context) > } > } > private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed > override def isCheckpointed: Boolean = { >firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed > } > {code} > for VertexRDD or EdgeRDD, first parent is its partitionRDD > RDD[ShippableVertexPartition[VD]]/RDD[(PartitionID, EdgePartition[ED, VD])] > 1. we call vertexRDD.checkpoint, it partitionRDD will checkpoint, so > VertexRDD.isCheckpointedAndMaterialized=true. > 2. then we call vertexRDD.iterator, because checkoint=true it called > firstParent.iterator(which is not CheckpointRDD, actually is partitionRDD). > > so returned iterator is iterator[ShippableVertexPartition] not expect > iterator[(VertexId, VD)]] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12431) add local checkpointing to GraphX
[ https://issues.apache.org/jira/browse/SPARK-12431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15561437#comment-15561437 ] Eyal Farago commented on SPARK-12431: - I think this heavily relates to SPARK-14804, assuming this is an ongoing effort I think it actually depends on 14804 resolution. > add local checkpointing to GraphX > - > > Key: SPARK-12431 > URL: https://issues.apache.org/jira/browse/SPARK-12431 > Project: Spark > Issue Type: Improvement > Components: GraphX >Affects Versions: 1.5.2 >Reporter: Edward Seidl > > local checkpointing was added to RDD to speed up iterative spark jobs, but > this capability hasn't been added to GraphX. Adding localCheckpoint to > GraphImpl, EdgeRDDImpl, and VertexRDDImpl greatly improved the speed of a > k-core algorithm I'm using (at the cost of fault tolerance, of course). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16839) CleanupAliases may leave redundant aliases at end of analysis state
Eyal Farago created SPARK-16839: --- Summary: CleanupAliases may leave redundant aliases at end of analysis state Key: SPARK-16839 URL: https://issues.apache.org/jira/browse/SPARK-16839 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.0, 1.6.1 Reporter: Eyal Farago Priority: Minor [SPARK-9634] [SPARK-9323] [SQL] introduced CleanupReferences which removes unnecessary Aliases while keeping required ones such as top level Projection and struct attributes. this mechanism is implemented by maintaining a boolean flag during a top-down expression transformation, I found a case where this mechanism leaves redundant aliases in the tree (within a right sibling of a create_struct node). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16791) casting structs fails on Timestamp fields (interpreted mode only)
[ https://issues.apache.org/jira/browse/SPARK-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15398916#comment-15398916 ] Eyal Farago commented on SPARK-16791: - created pull request https://github.com/apache/spark/pull/14400 > casting structs fails on Timestamp fields (interpreted mode only) > - > > Key: SPARK-16791 > URL: https://issues.apache.org/jira/browse/SPARK-16791 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1, 2.0.0 >Reporter: Eyal Farago >Priority: Minor > Labels: cast, expressions, sql > Original Estimate: 1h > Remaining Estimate: 1h > > When casting a struct with a Timestamp field, a MatchError is thrown (pasted > below). > the root cause for this is in > org.apache.spark.sql.catalyst.expressions.Cast#cast > (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L419). > case dt if dt == child.dataType => identity[Any] > should be modified to: > case dt if dt == from => identity[Any] > it seems to explode for timestamp because > org.apache.spark.sql.catalyst.expressions.Cast#castToTimestamp does'nt have > an identity check or fallback case in its pattern matching. > I'll shortly open a pull request with a failing test case and a fix. > Caused by: scala.MatchError: TimestampType (of class > org.apache.spark.sql.types.TimestampType$) > at > org.apache.spark.sql.catalyst.expressions.Cast.castToTimestamp(Cast.scala:185) > at > org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$cast(Cast.scala:424) > at > org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$1.apply(Cast.scala:403) > at > org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$1.apply(Cast.scala:402) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > at > org.apache.spark.sql.catalyst.expressions.Cast.castStruct(Cast.scala:402) > at > org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$cast(Cast.scala:435) > at > org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:443) > at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:443) > at > org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:445) > at > org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:324) > at > org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper$class.evaluate(ExpressionEvalHelper.scala:80) > at > org.apache.spark.sql.catalyst.expressions.CastSuite.evaluate(CastSuite.scala:33) > ... 58 more -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16791) casting structs fails on Timestamp fields (interpreted mode only)
Eyal Farago created SPARK-16791: --- Summary: casting structs fails on Timestamp fields (interpreted mode only) Key: SPARK-16791 URL: https://issues.apache.org/jira/browse/SPARK-16791 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.0, 1.6.1 Reporter: Eyal Farago Priority: Minor When casting a struct with a Timestamp field, a MatchError is thrown (pasted below). the root cause for this is in org.apache.spark.sql.catalyst.expressions.Cast#cast (https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L419). case dt if dt == child.dataType => identity[Any] should be modified to: case dt if dt == from => identity[Any] it seems to explode for timestamp because org.apache.spark.sql.catalyst.expressions.Cast#castToTimestamp does'nt have an identity check or fallback case in its pattern matching. I'll shortly open a pull request with a failing test case and a fix. Caused by: scala.MatchError: TimestampType (of class org.apache.spark.sql.types.TimestampType$) at org.apache.spark.sql.catalyst.expressions.Cast.castToTimestamp(Cast.scala:185) at org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$cast(Cast.scala:424) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$1.apply(Cast.scala:403) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$1.apply(Cast.scala:402) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.catalyst.expressions.Cast.castStruct(Cast.scala:402) at org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$cast(Cast.scala:435) at org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:443) at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:443) at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:445) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:324) at org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper$class.evaluate(ExpressionEvalHelper.scala:80) at org.apache.spark.sql.catalyst.expressions.CastSuite.evaluate(CastSuite.scala:33) ... 58 more -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org