Re: Shrinking the DataFrame lineage
Sorry for the slow response. I agree with Hamel on #1. GraphFrames are mostly wrappers for GraphX algorithms. There are a few which are not: * BFS: This is an iterative DataFrame alg. Though it has unit tests, I have not pushed it in scaling to see how far it can go. * Belief Propagation example: This uses the conversion to and from an RDD. Not great, but it's really just an example for now. I definitely want to get this issue fixed ASAP! On Sun, May 15, 2016 at 7:15 AM, Hamel Kothari <hamelkoth...@gmail.com> wrote: > I don't know about the second one but for question #1: > When you convert from a cached DF to an RDD (via a map function or the > "rdd" value) the types are converted from the off-heap types to on-heap > types. If your rows are fairly large/complex this can have a pretty big > performance impact so I would watch out for that. > > On Fri, May 13, 2016 at 5:29 PM Ulanov, Alexander < > alexander.ula...@hpe.com> wrote: > >> Hi Joseph, >> >> >> >> Thank you for the link! Two follow up questions >> >> 1)Suppose I have the original DataFrame in Tungsen, i.e. catalyst types >> and cached in off-heap store. It might be quite useful for iterative >> workloads due to lower GC overhead. Then I convert it to RDD and then >> backto DF. Will the resulting DF remain off-heap or it will be on heap as >> regular RDD? >> >> 2)How is the mentioned problem handled in GraphFrames? Suppose, I want to >> use aggregateMessages in the iterative loop, for implementing PageRank. >> >> >> >> Best regards, Alexander >> >> >> >> *From:* Joseph Bradley [mailto:jos...@databricks.com] >> *Sent:* Friday, May 13, 2016 12:38 PM >> *To:* Ulanov, Alexander <alexander.ula...@hpe.com> >> *Cc:* dev@spark.apache.org >> *Subject:* Re: Shrinking the DataFrame lineage >> >> >> >> Here's a JIRA for it: https://issues.apache.org/jira/browse/SPARK-13346 >> >> >> >> I don't have a great method currently, but hacks can get around it: >> convert the DataFrame to an RDD and back to truncate the query plan lineage. >> >> >> >> Joseph >> >> >> >> On Wed, May 11, 2016 at 12:46 PM, Ulanov, Alexander < >> alexander.ula...@hpe.com> wrote: >> >> Dear Spark developers, >> >> >> >> Recently, I was trying to switch my code from RDDs to DataFrames in order >> to compare the performance. The code computes RDD in a loop. I use >> RDD.persist followed by RDD.count to force Spark compute the RDD and cache >> it, so that it does not need to re-compute it on each iteration. However, >> it does not seem to work for DataFrame: >> >> >> >> import scala.util.Random >> >> val rdd = sc.parallelize(1 to 10, 2).map(x => (Random(5), Random(5)) >> >> val edges = sqlContext.createDataFrame(rdd).toDF("from", "to") >> >> val vertices = >> edges.select("from").unionAll(edges.select("to")).distinct().cache() >> >> vertices.count >> >> [Stage 34:=> (65 + 4) >> / 200] >> >> [Stage 34:> (90 + 5) >> / 200] >> >> [Stage 34:==> (114 + 4) >> / 200] >> >> [Stage 34:> (137 + 4) >> / 200] >> >> [Stage 34:==> (157 + 4) >> / 200] >> >> [Stage 34:=>(182 + 4) >> / 200] >> >> >> >> res25: Long = 5 >> >> If I run count again, it recomputes it again instead of using the cached >> result: >> >> scala> vertices.count >> >> [Stage 37:=> (49 + 4) >> / 200] >> >> [Stage 37:==>(66 + 4) >> / 200] >> >> [Stage 37:> (90 + 4) >> / 200] >> >> [Stage 37:=>(110 + 4) >> / 200] >> >> [Stage 37:===> (133 + 4) >> / 200] >> >> [Stage 37:==> (157 + 4) >> / 200] >> >> [Stage 37:> (178 + 5) >> / 200] >> >> res26: Long = 5 >> >> >> >> Could you suggest how to schrink the DataFrame lineage ? >> >> >> >> Best regards, Alexander >> >> >> >
Re: Shrinking the DataFrame lineage
I don't know about the second one but for question #1: When you convert from a cached DF to an RDD (via a map function or the "rdd" value) the types are converted from the off-heap types to on-heap types. If your rows are fairly large/complex this can have a pretty big performance impact so I would watch out for that. On Fri, May 13, 2016 at 5:29 PM Ulanov, Alexander <alexander.ula...@hpe.com> wrote: > Hi Joseph, > > > > Thank you for the link! Two follow up questions > > 1)Suppose I have the original DataFrame in Tungsen, i.e. catalyst types > and cached in off-heap store. It might be quite useful for iterative > workloads due to lower GC overhead. Then I convert it to RDD and then > backto DF. Will the resulting DF remain off-heap or it will be on heap as > regular RDD? > > 2)How is the mentioned problem handled in GraphFrames? Suppose, I want to > use aggregateMessages in the iterative loop, for implementing PageRank. > > > > Best regards, Alexander > > > > *From:* Joseph Bradley [mailto:jos...@databricks.com] > *Sent:* Friday, May 13, 2016 12:38 PM > *To:* Ulanov, Alexander <alexander.ula...@hpe.com> > *Cc:* dev@spark.apache.org > *Subject:* Re: Shrinking the DataFrame lineage > > > > Here's a JIRA for it: https://issues.apache.org/jira/browse/SPARK-13346 > > > > I don't have a great method currently, but hacks can get around it: > convert the DataFrame to an RDD and back to truncate the query plan lineage. > > > > Joseph > > > > On Wed, May 11, 2016 at 12:46 PM, Ulanov, Alexander < > alexander.ula...@hpe.com> wrote: > > Dear Spark developers, > > > > Recently, I was trying to switch my code from RDDs to DataFrames in order > to compare the performance. The code computes RDD in a loop. I use > RDD.persist followed by RDD.count to force Spark compute the RDD and cache > it, so that it does not need to re-compute it on each iteration. However, > it does not seem to work for DataFrame: > > > > import scala.util.Random > > val rdd = sc.parallelize(1 to 10, 2).map(x => (Random(5), Random(5)) > > val edges = sqlContext.createDataFrame(rdd).toDF("from", "to") > > val vertices = > edges.select("from").unionAll(edges.select("to")).distinct().cache() > > vertices.count > > [Stage 34:=> (65 + 4) > / 200] > > [Stage 34:> (90 + 5) > / 200] > > [Stage 34:==> (114 + 4) > / 200] > > [Stage 34:> (137 + 4) > / 200] > > [Stage 34:==> (157 + 4) > / 200] > > [Stage 34:=>(182 + 4) > / 200] > > > > res25: Long = 5 > > If I run count again, it recomputes it again instead of using the cached > result: > > scala> vertices.count > > [Stage 37:=> (49 + 4) > / 200] > > [Stage 37:==>(66 + 4) > / 200] > > [Stage 37:> (90 + 4) > / 200] > > [Stage 37:=>(110 + 4) > / 200] > > [Stage 37:===> (133 + 4) > / 200] > > [Stage 37:==> (157 + 4) > / 200] > > [Stage 37:> (178 + 5) > / 200] > > res26: Long = 5 > > > > Could you suggest how to schrink the DataFrame lineage ? > > > > Best regards, Alexander > > >
RE: Shrinking the DataFrame lineage
Hi Joseph, Thank you for the link! Two follow up questions 1)Suppose I have the original DataFrame in Tungsen, i.e. catalyst types and cached in off-heap store. It might be quite useful for iterative workloads due to lower GC overhead. Then I convert it to RDD and then backto DF. Will the resulting DF remain off-heap or it will be on heap as regular RDD? 2)How is the mentioned problem handled in GraphFrames? Suppose, I want to use aggregateMessages in the iterative loop, for implementing PageRank. Best regards, Alexander From: Joseph Bradley [mailto:jos...@databricks.com] Sent: Friday, May 13, 2016 12:38 PM To: Ulanov, Alexander <alexander.ula...@hpe.com> Cc: dev@spark.apache.org Subject: Re: Shrinking the DataFrame lineage Here's a JIRA for it: https://issues.apache.org/jira/browse/SPARK-13346 I don't have a great method currently, but hacks can get around it: convert the DataFrame to an RDD and back to truncate the query plan lineage. Joseph On Wed, May 11, 2016 at 12:46 PM, Ulanov, Alexander <alexander.ula...@hpe.com<mailto:alexander.ula...@hpe.com>> wrote: Dear Spark developers, Recently, I was trying to switch my code from RDDs to DataFrames in order to compare the performance. The code computes RDD in a loop. I use RDD.persist followed by RDD.count to force Spark compute the RDD and cache it, so that it does not need to re-compute it on each iteration. However, it does not seem to work for DataFrame: import scala.util.Random val rdd = sc.parallelize(1 to 10, 2).map(x => (Random(5), Random(5)) val edges = sqlContext.createDataFrame(rdd).toDF("from", "to") val vertices = edges.select("from").unionAll(edges.select("to")).distinct().cache() vertices.count [Stage 34:=> (65 + 4) / 200] [Stage 34:> (90 + 5) / 200] [Stage 34:==> (114 + 4) / 200] [Stage 34:> (137 + 4) / 200] [Stage 34:==> (157 + 4) / 200] [Stage 34:=>(182 + 4) / 200] res25: Long = 5 If I run count again, it recomputes it again instead of using the cached result: scala> vertices.count [Stage 37:=> (49 + 4) / 200] [Stage 37:==>(66 + 4) / 200] [Stage 37:> (90 + 4) / 200] [Stage 37:=>(110 + 4) / 200] [Stage 37:===> (133 + 4) / 200] [Stage 37:==> (157 + 4) / 200] [Stage 37:> (178 + 5) / 200] res26: Long = 5 Could you suggest how to schrink the DataFrame lineage ? Best regards, Alexander
Re: Shrinking the DataFrame lineage
Here's a JIRA for it: https://issues.apache.org/jira/browse/SPARK-13346 I don't have a great method currently, but hacks can get around it: convert the DataFrame to an RDD and back to truncate the query plan lineage. Joseph On Wed, May 11, 2016 at 12:46 PM, Ulanov, Alexander < alexander.ula...@hpe.com> wrote: > Dear Spark developers, > > > > Recently, I was trying to switch my code from RDDs to DataFrames in order > to compare the performance. The code computes RDD in a loop. I use > RDD.persist followed by RDD.count to force Spark compute the RDD and cache > it, so that it does not need to re-compute it on each iteration. However, > it does not seem to work for DataFrame: > > > > import scala.util.Random > > val rdd = sc.parallelize(1 to 10, 2).map(x => (Random(5), Random(5)) > > val edges = sqlContext.createDataFrame(rdd).toDF("from", "to") > > val vertices = > edges.select("from").unionAll(edges.select("to")).distinct().cache() > > vertices.count > > [Stage 34:=> (65 + 4) > / 200] > > [Stage 34:> (90 + 5) > / 200] > > [Stage 34:==> (114 + 4) > / 200] > > [Stage 34:> (137 + 4) > / 200] > > [Stage 34:==> (157 + 4) > / 200] > > [Stage 34:=>(182 + 4) > / 200] > > > > res25: Long = 5 > > If I run count again, it recomputes it again instead of using the cached > result: > > scala> vertices.count > > [Stage 37:=> (49 + 4) > / 200] > > [Stage 37:==>(66 + 4) > / 200] > > [Stage 37:> (90 + 4) > / 200] > > [Stage 37:=>(110 + 4) > / 200] > > [Stage 37:===> (133 + 4) > / 200] > > [Stage 37:==> (157 + 4) > / 200] > > [Stage 37:> (178 + 5) > / 200] > > res26: Long = 5 > > > > Could you suggest how to schrink the DataFrame lineage ? > > > > Best regards, Alexander >