[jira] [Updated] (SPARK-20848) Dangling threads when reading parquet files in local mode
[ https://issues.apache.org/jira/browse/SPARK-20848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nick Pritchard updated SPARK-20848: --- Attachment: Screen Shot 2017-05-22 at 4.13.52 PM.png Screen shot of JVisualVM thread visualization. > Dangling threads when reading parquet files in local mode > - > > Key: SPARK-20848 > URL: https://issues.apache.org/jira/browse/SPARK-20848 > Project: Spark > Issue Type: Bug > Components: Input/Output, SQL >Affects Versions: 2.1.1, 2.2.0 >Reporter: Nick Pritchard > Attachments: Screen Shot 2017-05-22 at 4.13.52 PM.png > > > On each call to {{spark.read.parquet}}, a new ForkJoinPool is created. One of > the threads in the pool is kept in the {{WAITING}} state, and never stopped, > which leads to unbounded growth in number of threads. > This behavior is a regression from v2.1.0. > Reproducible example: > {code} > val spark = SparkSession > .builder() > .appName("test") > .master("local") > .getOrCreate() > while(true) { > spark.read.parquet("/path/to/file") > Thread.sleep(5000) > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20848) Dangling threads when reading parquet files in local mode
Nick Pritchard created SPARK-20848: -- Summary: Dangling threads when reading parquet files in local mode Key: SPARK-20848 URL: https://issues.apache.org/jira/browse/SPARK-20848 Project: Spark Issue Type: Bug Components: Input/Output, SQL Affects Versions: 2.1.1, 2.2.0 Reporter: Nick Pritchard On each call to {{spark.read.parquet}}, a new ForkJoinPool is created. One of the threads in the pool is kept in the {{WAITING}} state, and never stopped, which leads to unbounded growth in number of threads. This behavior is a regression from v2.1.0. Reproducible example: {code} val spark = SparkSession .builder() .appName("test") .master("local") .getOrCreate() while(true) { spark.read.parquet("/path/to/file") Thread.sleep(5000) } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-5934) DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times
[ https://issues.apache.org/jira/browse/SPARK-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nick Pritchard resolved SPARK-5934. --- Resolution: Not A Problem > DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times > > > Key: SPARK-5934 > URL: https://issues.apache.org/jira/browse/SPARK-5934 > Project: Spark > Issue Type: Bug > Components: Block Manager, Streaming >Affects Versions: 1.2.1 >Reporter: Nick Pritchard >Priority: Minor > > It seems that since DStream.clearMetadata calls itself recursively on the > dependencies, that it attempts to unpersist the same RDD, which results in > warn logs like this: > {quote} > WARN BlockManager: Asked to remove block rdd_2_1, which does not exist > {quote} > or this: > {quote} > WARN BlockManager: Block rdd_2_1 could not be removed as it was not found in > either the disk, memory, or tachyon store > {quote} > This is preceded by logs like: > {quote} > DEBUG TransformedDStream: Unpersisting old RDDs: 2 > DEBUG QueueInputDStream: Unpersisting old RDDs: 2 > {quote} > Here is a reproducible case: > {code:scala} > object Test { > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setMaster("local[2]").setAppName("Test") > val ssc = new StreamingContext(conf, Seconds(1)) > val queue = new mutable.Queue[RDD[Int]] > val input = ssc.queueStream(queue) > val output = input.cache().transform(x => x) > output.print() > ssc.start() > for (i <- 1 to 5) { > val rdd = ssc.sparkContext.parallelize(Seq(i)) > queue.enqueue(rdd) > Thread.sleep(1000) > } > ssc.stop() > } > } > {code} > It doesn't seem to be a fatal error, but the WARN messages are a bit > unsettling. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5934) DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times
[ https://issues.apache.org/jira/browse/SPARK-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14997335#comment-14997335 ] Nick Pritchard commented on SPARK-5934: --- [~jerryshao] thanks for your insight, I'll close this issue. > DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times > > > Key: SPARK-5934 > URL: https://issues.apache.org/jira/browse/SPARK-5934 > Project: Spark > Issue Type: Bug > Components: Block Manager, Streaming >Affects Versions: 1.2.1 >Reporter: Nick Pritchard >Priority: Minor > > It seems that since DStream.clearMetadata calls itself recursively on the > dependencies, that it attempts to unpersist the same RDD, which results in > warn logs like this: > {quote} > WARN BlockManager: Asked to remove block rdd_2_1, which does not exist > {quote} > or this: > {quote} > WARN BlockManager: Block rdd_2_1 could not be removed as it was not found in > either the disk, memory, or tachyon store > {quote} > This is preceded by logs like: > {quote} > DEBUG TransformedDStream: Unpersisting old RDDs: 2 > DEBUG QueueInputDStream: Unpersisting old RDDs: 2 > {quote} > Here is a reproducible case: > {code:scala} > object Test { > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setMaster("local[2]").setAppName("Test") > val ssc = new StreamingContext(conf, Seconds(1)) > val queue = new mutable.Queue[RDD[Int]] > val input = ssc.queueStream(queue) > val output = input.cache().transform(x => x) > output.print() > ssc.start() > for (i <- 1 to 5) { > val rdd = ssc.sparkContext.parallelize(Seq(i)) > queue.enqueue(rdd) > Thread.sleep(1000) > } > ssc.stop() > } > } > {code} > It doesn't seem to be a fatal error, but the WARN messages are a bit > unsettling. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11126) A memory leak in SQLListener._stageIdToStageMetrics
[ https://issues.apache.org/jira/browse/SPARK-11126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959876#comment-14959876 ] Nick Pritchard commented on SPARK-11126: Is there any workaround to avoid this memory leak? > A memory leak in SQLListener._stageIdToStageMetrics > --- > > Key: SPARK-11126 > URL: https://issues.apache.org/jira/browse/SPARK-11126 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Shixiong Zhu > > SQLListener adds all stage infos to _stageIdToStageMetrics, but only removes > stage infos belonging to SQL executions. > Reported by Terry Hoo in > https://www.mail-archive.com/user@spark.apache.org/msg38810.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11039) Document all UI "retained*" configurations
Nick Pritchard created SPARK-11039: -- Summary: Document all UI "retained*" configurations Key: SPARK-11039 URL: https://issues.apache.org/jira/browse/SPARK-11039 Project: Spark Issue Type: Documentation Components: Documentation, Web UI Affects Versions: 1.5.1 Reporter: Nick Pritchard Priority: Trivial Most are documented except these: - spark.sql.ui.retainedExecutions - spark.streaming.ui.retainedBatches They are really helpful for managing the memory usage of the driver application. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10942) Not all cached RDDs are unpersisted
[ https://issues.apache.org/jira/browse/SPARK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14948928#comment-14948928 ] Nick Pritchard commented on SPARK-10942: Thanks [~sowen] for trying! I'll let it go. > Not all cached RDDs are unpersisted > --- > > Key: SPARK-10942 > URL: https://issues.apache.org/jira/browse/SPARK-10942 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Nick Pritchard >Priority: Minor > Attachments: SPARK-10942_1.png, SPARK-10942_2.png, SPARK-10942_3.png > > > I have a Spark Streaming application that caches RDDs inside of a > {{transform}} closure. Looking at the Spark UI, it seems that most of these > RDDs are unpersisted after the batch completes, but not all. > I have copied a minimal reproducible example below to highlight the problem. > I run this and monitor the Spark UI "Storage" tab. The example generates and > caches 30 RDDs, and I see most get cleaned up. However in the end, some still > remain cached. There is some randomness going on because I see different RDDs > remain cached for each run. > I have marked this as Major because I haven't been able to workaround it and > it is a memory leak for my application. I tried setting {{spark.cleaner.ttl}} > but that did not change anything. > {code} > val inputRDDs = mutable.Queue.tabulate(30) { i => > sc.parallelize(Seq(i)) > } > val input: DStream[Int] = ssc.queueStream(inputRDDs) > val output = input.transform { rdd => > if (rdd.isEmpty()) { > rdd > } else { > val rdd2 = rdd.map(identity) > rdd2.setName(rdd.first().toString) > rdd2.cache() > val rdd3 = rdd2.map(identity) > rdd3 > } > } > output.print() > ssc.start() > ssc.awaitTermination() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10942) Not all cached RDDs are unpersisted
[ https://issues.apache.org/jira/browse/SPARK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944478#comment-14944478 ] Nick Pritchard commented on SPARK-10942: Regardless, the documentation for {{spark.streaming.unpersist}} and {{spark.cleaner.ttl}} suggest that unpersisting will be handled automatically, by spark code. > Not all cached RDDs are unpersisted > --- > > Key: SPARK-10942 > URL: https://issues.apache.org/jira/browse/SPARK-10942 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Nick Pritchard > > I have a Spark Streaming application that caches RDDs inside of a > {{transform}} closure. Looking at the Spark UI, it seems that most of these > RDDs are unpersisted after the batch completes, but not all. > I have copied a minimal reproducible example below to highlight the problem. > I run this and monitor the Spark UI "Storage" tab. The example generates and > caches 30 RDDs, and I see most get cleaned up. However in the end, some still > remain cached. There is some randomness going on because I see different RDDs > remain cached for each run. > I have marked this as Major because I haven't been able to workaround it and > it is a memory leak for my application. I tried setting {{spark.cleaner.ttl}} > but that did not change anything. > {code} > val inputRDDs = mutable.Queue.tabulate(30) { i => > sc.parallelize(Seq(i)) > } > val input: DStream[Int] = ssc.queueStream(inputRDDs) > val output = input.transform { rdd => > if (rdd.isEmpty()) { > rdd > } else { > val rdd2 = rdd.map(identity) > rdd2.setName(rdd.first().toString) > rdd2.cache() > val rdd3 = rdd2.map(identity) > rdd3 > } > } > output.print() > ssc.start() > ssc.awaitTermination() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10942) Not all cached RDDs are unpersisted
[ https://issues.apache.org/jira/browse/SPARK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944508#comment-14944508 ] Nick Pritchard commented on SPARK-10942: [~rekhajoshm] Thanks for trying to reproduce it. Since you do not this see the same, this is most likely an issue on my end so I'll downgrade the priority. I am using 1.5.0 so will try 1.6.0-snapshot and also investigate the logs. > Not all cached RDDs are unpersisted > --- > > Key: SPARK-10942 > URL: https://issues.apache.org/jira/browse/SPARK-10942 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Nick Pritchard > Attachments: SPARK-10942_1.png, SPARK-10942_2.png, SPARK-10942_3.png > > > I have a Spark Streaming application that caches RDDs inside of a > {{transform}} closure. Looking at the Spark UI, it seems that most of these > RDDs are unpersisted after the batch completes, but not all. > I have copied a minimal reproducible example below to highlight the problem. > I run this and monitor the Spark UI "Storage" tab. The example generates and > caches 30 RDDs, and I see most get cleaned up. However in the end, some still > remain cached. There is some randomness going on because I see different RDDs > remain cached for each run. > I have marked this as Major because I haven't been able to workaround it and > it is a memory leak for my application. I tried setting {{spark.cleaner.ttl}} > but that did not change anything. > {code} > val inputRDDs = mutable.Queue.tabulate(30) { i => > sc.parallelize(Seq(i)) > } > val input: DStream[Int] = ssc.queueStream(inputRDDs) > val output = input.transform { rdd => > if (rdd.isEmpty()) { > rdd > } else { > val rdd2 = rdd.map(identity) > rdd2.setName(rdd.first().toString) > rdd2.cache() > val rdd3 = rdd2.map(identity) > rdd3 > } > } > output.print() > ssc.start() > ssc.awaitTermination() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10942) Not all cached RDDs are unpersisted
[ https://issues.apache.org/jira/browse/SPARK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nick Pritchard updated SPARK-10942: --- Priority: Minor (was: Major) > Not all cached RDDs are unpersisted > --- > > Key: SPARK-10942 > URL: https://issues.apache.org/jira/browse/SPARK-10942 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Nick Pritchard >Priority: Minor > Attachments: SPARK-10942_1.png, SPARK-10942_2.png, SPARK-10942_3.png > > > I have a Spark Streaming application that caches RDDs inside of a > {{transform}} closure. Looking at the Spark UI, it seems that most of these > RDDs are unpersisted after the batch completes, but not all. > I have copied a minimal reproducible example below to highlight the problem. > I run this and monitor the Spark UI "Storage" tab. The example generates and > caches 30 RDDs, and I see most get cleaned up. However in the end, some still > remain cached. There is some randomness going on because I see different RDDs > remain cached for each run. > I have marked this as Major because I haven't been able to workaround it and > it is a memory leak for my application. I tried setting {{spark.cleaner.ttl}} > but that did not change anything. > {code} > val inputRDDs = mutable.Queue.tabulate(30) { i => > sc.parallelize(Seq(i)) > } > val input: DStream[Int] = ssc.queueStream(inputRDDs) > val output = input.transform { rdd => > if (rdd.isEmpty()) { > rdd > } else { > val rdd2 = rdd.map(identity) > rdd2.setName(rdd.first().toString) > rdd2.cache() > val rdd3 = rdd2.map(identity) > rdd3 > } > } > output.print() > ssc.start() > ssc.awaitTermination() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10942) Not all cached RDDs are unpersisted
Nick Pritchard created SPARK-10942: -- Summary: Not all cached RDDs are unpersisted Key: SPARK-10942 URL: https://issues.apache.org/jira/browse/SPARK-10942 Project: Spark Issue Type: Bug Components: Streaming Reporter: Nick Pritchard I have a Spark Streaming application that caches RDDs inside of a {{transform}} closure. Looking at the Spark UI, it seems that most of these RDDs are unpersisted after the batch completes, but not all. I have copied a minimal reproducible example below to highlight the problem. I run this and monitor the Spark UI "Storage" tab. The example generates and caches 30 RDDs, and I see most get cleaned up. However in the end, some still remain cached. There is some randomness going on because I see different RDDs remain cached for each run. I have marked this as Major because I haven't been able to workaround it and it is a memory leak for my application. I tried setting {{spark.cleaner.ttl}} but that did not change anything. {code} val inputRDDs = mutable.Queue.tabulate(30) { i => sc.parallelize(Seq(i)) } val input: DStream[Int] = ssc.queueStream(inputRDDs) val output = input.transform { rdd => if (rdd.isEmpty()) { rdd } else { val rdd2 = rdd.map(identity) rdd2.setName(rdd.first().toString) rdd2.cache() val rdd3 = rdd2.map(identity) rdd3 } } output.print() ssc.start() ssc.awaitTermination() {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10942) Not all cached RDDs are unpersisted
[ https://issues.apache.org/jira/browse/SPARK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944477#comment-14944477 ] Nick Pritchard commented on SPARK-10942: [~rekhajoshm] Yes, but calling {{rdd2.unpersist()}} negates the call to {{rdd2.cache()}}, no matter where I put it in the {{transform}} closure. This is because all the operations on {{rdd2}} are lazy. > Not all cached RDDs are unpersisted > --- > > Key: SPARK-10942 > URL: https://issues.apache.org/jira/browse/SPARK-10942 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Nick Pritchard > > I have a Spark Streaming application that caches RDDs inside of a > {{transform}} closure. Looking at the Spark UI, it seems that most of these > RDDs are unpersisted after the batch completes, but not all. > I have copied a minimal reproducible example below to highlight the problem. > I run this and monitor the Spark UI "Storage" tab. The example generates and > caches 30 RDDs, and I see most get cleaned up. However in the end, some still > remain cached. There is some randomness going on because I see different RDDs > remain cached for each run. > I have marked this as Major because I haven't been able to workaround it and > it is a memory leak for my application. I tried setting {{spark.cleaner.ttl}} > but that did not change anything. > {code} > val inputRDDs = mutable.Queue.tabulate(30) { i => > sc.parallelize(Seq(i)) > } > val input: DStream[Int] = ssc.queueStream(inputRDDs) > val output = input.transform { rdd => > if (rdd.isEmpty()) { > rdd > } else { > val rdd2 = rdd.map(identity) > rdd2.setName(rdd.first().toString) > rdd2.cache() > val rdd3 = rdd2.map(identity) > rdd3 > } > } > output.print() > ssc.start() > ssc.awaitTermination() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10875) RowMatrix.computeCovariance() result is not exactly symmetric
Nick Pritchard created SPARK-10875: -- Summary: RowMatrix.computeCovariance() result is not exactly symmetric Key: SPARK-10875 URL: https://issues.apache.org/jira/browse/SPARK-10875 Project: Spark Issue Type: Bug Components: MLlib Affects Versions: 1.5.0 Reporter: Nick Pritchard For some matrices, I have seen that the computed covariance matrix is not exactly symmetric, most likely due to some numerical rounding errors. This is problematic when trying to construct an instance of {{MultivariateGaussian}}, because it requires an exactly symmetric covariance matrix. See reproducible example below. I would suggest modifying the implementation so that {{G(i, j)}} and {{G(j, i)}} are set at the same time, with the same value. {code} val rdd = RandomRDDs.normalVectorRDD(sc, 100, 10, 0, 0) val matrix = new RowMatrix(rdd) val mean = matrix.computeColumnSummaryStatistics().mean val cov = matrix.computeCovariance() val dist = new MultivariateGaussian(mean, cov) //throws breeze.linalg.MatrixNotSymmetricException {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10656) select(df(*)) fails when a column has special characters
Nick Pritchard created SPARK-10656: -- Summary: select(df(*)) fails when a column has special characters Key: SPARK-10656 URL: https://issues.apache.org/jira/browse/SPARK-10656 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.5.0 Reporter: Nick Pritchard Best explained with this example: {code} val df = sqlContext.read.json(sqlContext.sparkContext.makeRDD( """{"a.b": "c", "d": "e" }""" :: Nil)) df.select("*").show() //successful df.select(df("*")).show() //throws exception df.withColumnRenamed("d", "f").show() //also fails, possibly related {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10573) IndexToString transformSchema adds output field as DoubleType
Nick Pritchard created SPARK-10573: -- Summary: IndexToString transformSchema adds output field as DoubleType Key: SPARK-10573 URL: https://issues.apache.org/jira/browse/SPARK-10573 Project: Spark Issue Type: Bug Components: ML Affects Versions: 1.5.0 Reporter: Nick Pritchard Reproducible example: {code} val stage = new IndexToString().setInputCol("input").setOutputCol("output") val inSchema = StructType(Seq(StructField("input", DoubleType))) val outSchema = stage.transformSchema(inSchema) assert(outSchema("output").dataType == StringType) {code} The root cause seems to be that it uses {{NominalAttribute.toStructField}} which assumes {{DoubleType}}. It would probably be better to just use {{SchemaUtils.appendColumn}} and explicitly set the data type. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-9141) DataFrame recomputed instead of using cached parent.
Nick Pritchard created SPARK-9141: - Summary: DataFrame recomputed instead of using cached parent. Key: SPARK-9141 URL: https://issues.apache.org/jira/browse/SPARK-9141 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.4.0, 1.4.1 Reporter: Nick Pritchard As I understand, DataFrame.cache() is supposed to work the same as RDD.cache(), so that repeated operations on it will use the cached results and not recompute the entire lineage. However, it seems that some DataFrame operations (e.g. withColumn) change the underlying RDD lineage so that cache doesn't work as expected. Below is a Scala example that demonstrates this. First, I define two UDF's that use println so that it is easy to see when they are being called. Next, I create a simple data frame with one row and two columns. Next, I add a column, cache it, and call count() to force the computation. Lastly, I add another column, cache it, and call count(). I would have expected the last statement to only compute the last column, since everything else was cached. However, because withColumn() changes the lineage, the whole data frame is recomputed. {code:scala} // Examples udf's that println when called val twice = udf { (x: Int) = println(sComputed: twice($x)); x * 2 } val triple = udf { (x: Int) = println(sComputed: triple($x)); x * 3 } // Initial dataset val df1 = sc.parallelize(Seq((a, 1))).toDF(name, value) // Add column by applying twice udf val df2 = df1.withColumn(twice, twice($value)) df2.cache() df2.count() //prints Computed: twice(1) // Add column by applying triple udf val df3 = df2.withColumn(triple, triple($value)) df3.cache() df3.count() //prints Computed: twice(1)\nComputed: triple(1) {code} I found a workaround, which helped me understand what was going on behind the scenes, but doesn't seem like an ideal solution. Basically, I convert to RDD then back DataFrame, which seems to freeze the lineage. The code below shows the workaround for creating the second data frame so cache will work as expected. {code:scala} val df2 = { val tmp = df1.withColumn(twice, twice($value)) sqlContext.createDataFrame(tmp.rdd, tmp.schema) } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8521) Feature Transformers in 1.5
[ https://issues.apache.org/jira/browse/SPARK-8521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14612317#comment-14612317 ] Nick Pritchard commented on SPARK-8521: --- How about a transformer that performs the opposite of StringIndexer? In other words, it would convert a Double to String using the labels metadata. This would be useful for transforming the output of Predictors for external usage. Feature Transformers in 1.5 --- Key: SPARK-8521 URL: https://issues.apache.org/jira/browse/SPARK-8521 Project: Spark Issue Type: Umbrella Components: ML Reporter: Xiangrui Meng Assignee: Xiangrui Meng This is a list of feature transformers we plan to add in Spark 1.5. Feel free to propose useful transformers that are not on the list. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5934) DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times
Nick Pritchard created SPARK-5934: - Summary: DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times Key: SPARK-5934 URL: https://issues.apache.org/jira/browse/SPARK-5934 Project: Spark Issue Type: Bug Components: Block Manager, Streaming Affects Versions: 1.2.1 Reporter: Nick Pritchard Priority: Minor It seems that since DStream.clearMetadata calls itself recursively on the dependencies, that it attempts to unpersist the same RDD, which results in warn logs like this: {quote} WARN BlockManager: Asked to remove block rdd_2_1, which does not exist {quote} or this: {quote} WARN BlockManager: Block rdd_2_1 could not be removed as it was not found in either the disk, memory, or tachyon store {quote} This is preceded by logs like: {quote} DEBUG TransformedDStream: Unpersisting old RDDs: 2 DEBUG QueueInputDStream: Unpersisting old RDDs: 2 {quote} Here is a reproducible case: {code:scala} object Test { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster(local[2]).setAppName(Test) val ssc = new StreamingContext(conf, Seconds(1)) val queue = new mutable.Queue[RDD[Int]] val input = ssc.queueStream(queue) val output = input.cache().transform(x = x) output.print() ssc.start() for (i - 1 to 5) { val rdd = ssc.sparkContext.parallelize(Seq(i)) queue.enqueue(rdd) Thread.sleep(1000) } ssc.stop() } } {code} It doesn't seem to be a fatal error, but the WARN messages are a bit unsettling. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org