[jira] [Updated] (SPARK-20848) Dangling threads when reading parquet files in local mode

2017-05-22 Thread Nick Pritchard (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nick Pritchard updated SPARK-20848:
---
Attachment: Screen Shot 2017-05-22 at 4.13.52 PM.png

Screen shot of JVisualVM thread visualization.

> Dangling threads when reading parquet files in local mode
> -
>
> Key: SPARK-20848
> URL: https://issues.apache.org/jira/browse/SPARK-20848
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output, SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Nick Pritchard
> Attachments: Screen Shot 2017-05-22 at 4.13.52 PM.png
>
>
> On each call to {{spark.read.parquet}}, a new ForkJoinPool is created. One of 
> the threads in the pool is kept in the {{WAITING}} state, and never stopped, 
> which leads to unbounded growth in number of threads.
> This behavior is a regression from v2.1.0.
> Reproducible example:
> {code}
> val spark = SparkSession
>   .builder()
>   .appName("test")
>   .master("local")
>   .getOrCreate()
> while(true) {
>   spark.read.parquet("/path/to/file")
>   Thread.sleep(5000)
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20848) Dangling threads when reading parquet files in local mode

2017-05-22 Thread Nick Pritchard (JIRA)
Nick Pritchard created SPARK-20848:
--

 Summary: Dangling threads when reading parquet files in local mode
 Key: SPARK-20848
 URL: https://issues.apache.org/jira/browse/SPARK-20848
 Project: Spark
  Issue Type: Bug
  Components: Input/Output, SQL
Affects Versions: 2.1.1, 2.2.0
Reporter: Nick Pritchard


On each call to {{spark.read.parquet}}, a new ForkJoinPool is created. One of 
the threads in the pool is kept in the {{WAITING}} state, and never stopped, 
which leads to unbounded growth in number of threads.

This behavior is a regression from v2.1.0.

Reproducible example:
{code}
val spark = SparkSession
  .builder()
  .appName("test")
  .master("local")
  .getOrCreate()
while(true) {
  spark.read.parquet("/path/to/file")
  Thread.sleep(5000)
}
{code}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-5934) DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times

2015-11-09 Thread Nick Pritchard (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nick Pritchard resolved SPARK-5934.
---
Resolution: Not A Problem

> DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times
> 
>
> Key: SPARK-5934
> URL: https://issues.apache.org/jira/browse/SPARK-5934
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Streaming
>Affects Versions: 1.2.1
>Reporter: Nick Pritchard
>Priority: Minor
>
> It seems that since DStream.clearMetadata calls itself recursively on the 
> dependencies, that it attempts to unpersist the same RDD, which results in 
> warn logs like this:
> {quote}
> WARN BlockManager: Asked to remove block rdd_2_1, which does not exist
> {quote}
> or this:
> {quote}
> WARN BlockManager: Block rdd_2_1 could not be removed as it was not found in 
> either the disk, memory, or tachyon store
> {quote}
> This is preceded by logs like:
> {quote}
> DEBUG TransformedDStream: Unpersisting old RDDs: 2
> DEBUG QueueInputDStream: Unpersisting old RDDs: 2
> {quote}
> Here is a reproducible case:
> {code:scala}
> object Test {
>   def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setMaster("local[2]").setAppName("Test")
> val ssc = new StreamingContext(conf, Seconds(1))
> val queue = new mutable.Queue[RDD[Int]]
> val input = ssc.queueStream(queue)
> val output = input.cache().transform(x => x)
> output.print()
> ssc.start()
> for (i <- 1 to 5) {
>   val rdd = ssc.sparkContext.parallelize(Seq(i))
>   queue.enqueue(rdd)
>   Thread.sleep(1000)
> }
> ssc.stop()
>   }
> }
> {code}
> It doesn't seem to be a fatal error, but the WARN messages are a bit 
> unsettling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5934) DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times

2015-11-09 Thread Nick Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14997335#comment-14997335
 ] 

Nick Pritchard commented on SPARK-5934:
---

[~jerryshao] thanks for your insight, I'll close this issue.

> DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times
> 
>
> Key: SPARK-5934
> URL: https://issues.apache.org/jira/browse/SPARK-5934
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Streaming
>Affects Versions: 1.2.1
>Reporter: Nick Pritchard
>Priority: Minor
>
> It seems that since DStream.clearMetadata calls itself recursively on the 
> dependencies, that it attempts to unpersist the same RDD, which results in 
> warn logs like this:
> {quote}
> WARN BlockManager: Asked to remove block rdd_2_1, which does not exist
> {quote}
> or this:
> {quote}
> WARN BlockManager: Block rdd_2_1 could not be removed as it was not found in 
> either the disk, memory, or tachyon store
> {quote}
> This is preceded by logs like:
> {quote}
> DEBUG TransformedDStream: Unpersisting old RDDs: 2
> DEBUG QueueInputDStream: Unpersisting old RDDs: 2
> {quote}
> Here is a reproducible case:
> {code:scala}
> object Test {
>   def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setMaster("local[2]").setAppName("Test")
> val ssc = new StreamingContext(conf, Seconds(1))
> val queue = new mutable.Queue[RDD[Int]]
> val input = ssc.queueStream(queue)
> val output = input.cache().transform(x => x)
> output.print()
> ssc.start()
> for (i <- 1 to 5) {
>   val rdd = ssc.sparkContext.parallelize(Seq(i))
>   queue.enqueue(rdd)
>   Thread.sleep(1000)
> }
> ssc.stop()
>   }
> }
> {code}
> It doesn't seem to be a fatal error, but the WARN messages are a bit 
> unsettling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11126) A memory leak in SQLListener._stageIdToStageMetrics

2015-10-15 Thread Nick Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959876#comment-14959876
 ] 

Nick Pritchard commented on SPARK-11126:


Is there any workaround to avoid this memory leak?

> A memory leak in SQLListener._stageIdToStageMetrics
> ---
>
> Key: SPARK-11126
> URL: https://issues.apache.org/jira/browse/SPARK-11126
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Shixiong Zhu
>
> SQLListener adds all stage infos to _stageIdToStageMetrics, but only removes  
> stage infos belonging to SQL executions.
> Reported by Terry Hoo in 
> https://www.mail-archive.com/user@spark.apache.org/msg38810.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-11039) Document all UI "retained*" configurations

2015-10-09 Thread Nick Pritchard (JIRA)
Nick Pritchard created SPARK-11039:
--

 Summary: Document all UI "retained*" configurations
 Key: SPARK-11039
 URL: https://issues.apache.org/jira/browse/SPARK-11039
 Project: Spark
  Issue Type: Documentation
  Components: Documentation, Web UI
Affects Versions: 1.5.1
Reporter: Nick Pritchard
Priority: Trivial


Most are documented except these:
- spark.sql.ui.retainedExecutions
- spark.streaming.ui.retainedBatches

They are really helpful for managing the memory usage of the driver application.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10942) Not all cached RDDs are unpersisted

2015-10-08 Thread Nick Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14948928#comment-14948928
 ] 

Nick Pritchard commented on SPARK-10942:


Thanks [~sowen] for trying! I'll let it go.

> Not all cached RDDs are unpersisted
> ---
>
> Key: SPARK-10942
> URL: https://issues.apache.org/jira/browse/SPARK-10942
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Nick Pritchard
>Priority: Minor
> Attachments: SPARK-10942_1.png, SPARK-10942_2.png, SPARK-10942_3.png
>
>
> I have a Spark Streaming application that caches RDDs inside of a 
> {{transform}} closure. Looking at the Spark UI, it seems that most of these 
> RDDs are unpersisted after the batch completes, but not all.
> I have copied a minimal reproducible example below to highlight the problem. 
> I run this and monitor the Spark UI "Storage" tab. The example generates and 
> caches 30 RDDs, and I see most get cleaned up. However in the end, some still 
> remain cached. There is some randomness going on because I see different RDDs 
> remain cached for each run.
> I have marked this as Major because I haven't been able to workaround it and 
> it is a memory leak for my application. I tried setting {{spark.cleaner.ttl}} 
> but that did not change anything.
> {code}
> val inputRDDs = mutable.Queue.tabulate(30) { i =>
>   sc.parallelize(Seq(i))
> }
> val input: DStream[Int] = ssc.queueStream(inputRDDs)
> val output = input.transform { rdd =>
>   if (rdd.isEmpty()) {
> rdd
>   } else {
> val rdd2 = rdd.map(identity)
> rdd2.setName(rdd.first().toString)
> rdd2.cache()
> val rdd3 = rdd2.map(identity)
> rdd3
>   }
> }
> output.print()
> ssc.start()
> ssc.awaitTermination()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10942) Not all cached RDDs are unpersisted

2015-10-05 Thread Nick Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944478#comment-14944478
 ] 

Nick Pritchard commented on SPARK-10942:


Regardless, the documentation for {{spark.streaming.unpersist}} and 
{{spark.cleaner.ttl}} suggest that unpersisting will be handled automatically, 
by spark code.

> Not all cached RDDs are unpersisted
> ---
>
> Key: SPARK-10942
> URL: https://issues.apache.org/jira/browse/SPARK-10942
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Nick Pritchard
>
> I have a Spark Streaming application that caches RDDs inside of a 
> {{transform}} closure. Looking at the Spark UI, it seems that most of these 
> RDDs are unpersisted after the batch completes, but not all.
> I have copied a minimal reproducible example below to highlight the problem. 
> I run this and monitor the Spark UI "Storage" tab. The example generates and 
> caches 30 RDDs, and I see most get cleaned up. However in the end, some still 
> remain cached. There is some randomness going on because I see different RDDs 
> remain cached for each run.
> I have marked this as Major because I haven't been able to workaround it and 
> it is a memory leak for my application. I tried setting {{spark.cleaner.ttl}} 
> but that did not change anything.
> {code}
> val inputRDDs = mutable.Queue.tabulate(30) { i =>
>   sc.parallelize(Seq(i))
> }
> val input: DStream[Int] = ssc.queueStream(inputRDDs)
> val output = input.transform { rdd =>
>   if (rdd.isEmpty()) {
> rdd
>   } else {
> val rdd2 = rdd.map(identity)
> rdd2.setName(rdd.first().toString)
> rdd2.cache()
> val rdd3 = rdd2.map(identity)
> rdd3
>   }
> }
> output.print()
> ssc.start()
> ssc.awaitTermination()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10942) Not all cached RDDs are unpersisted

2015-10-05 Thread Nick Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944508#comment-14944508
 ] 

Nick Pritchard commented on SPARK-10942:


[~rekhajoshm] Thanks for trying to reproduce it. Since you do not this see the 
same, this is most likely an issue on my end so I'll downgrade the priority. I 
am using 1.5.0 so will try 1.6.0-snapshot and also investigate the logs.

> Not all cached RDDs are unpersisted
> ---
>
> Key: SPARK-10942
> URL: https://issues.apache.org/jira/browse/SPARK-10942
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Nick Pritchard
> Attachments: SPARK-10942_1.png, SPARK-10942_2.png, SPARK-10942_3.png
>
>
> I have a Spark Streaming application that caches RDDs inside of a 
> {{transform}} closure. Looking at the Spark UI, it seems that most of these 
> RDDs are unpersisted after the batch completes, but not all.
> I have copied a minimal reproducible example below to highlight the problem. 
> I run this and monitor the Spark UI "Storage" tab. The example generates and 
> caches 30 RDDs, and I see most get cleaned up. However in the end, some still 
> remain cached. There is some randomness going on because I see different RDDs 
> remain cached for each run.
> I have marked this as Major because I haven't been able to workaround it and 
> it is a memory leak for my application. I tried setting {{spark.cleaner.ttl}} 
> but that did not change anything.
> {code}
> val inputRDDs = mutable.Queue.tabulate(30) { i =>
>   sc.parallelize(Seq(i))
> }
> val input: DStream[Int] = ssc.queueStream(inputRDDs)
> val output = input.transform { rdd =>
>   if (rdd.isEmpty()) {
> rdd
>   } else {
> val rdd2 = rdd.map(identity)
> rdd2.setName(rdd.first().toString)
> rdd2.cache()
> val rdd3 = rdd2.map(identity)
> rdd3
>   }
> }
> output.print()
> ssc.start()
> ssc.awaitTermination()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-10942) Not all cached RDDs are unpersisted

2015-10-05 Thread Nick Pritchard (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nick Pritchard updated SPARK-10942:
---
Priority: Minor  (was: Major)

> Not all cached RDDs are unpersisted
> ---
>
> Key: SPARK-10942
> URL: https://issues.apache.org/jira/browse/SPARK-10942
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Nick Pritchard
>Priority: Minor
> Attachments: SPARK-10942_1.png, SPARK-10942_2.png, SPARK-10942_3.png
>
>
> I have a Spark Streaming application that caches RDDs inside of a 
> {{transform}} closure. Looking at the Spark UI, it seems that most of these 
> RDDs are unpersisted after the batch completes, but not all.
> I have copied a minimal reproducible example below to highlight the problem. 
> I run this and monitor the Spark UI "Storage" tab. The example generates and 
> caches 30 RDDs, and I see most get cleaned up. However in the end, some still 
> remain cached. There is some randomness going on because I see different RDDs 
> remain cached for each run.
> I have marked this as Major because I haven't been able to workaround it and 
> it is a memory leak for my application. I tried setting {{spark.cleaner.ttl}} 
> but that did not change anything.
> {code}
> val inputRDDs = mutable.Queue.tabulate(30) { i =>
>   sc.parallelize(Seq(i))
> }
> val input: DStream[Int] = ssc.queueStream(inputRDDs)
> val output = input.transform { rdd =>
>   if (rdd.isEmpty()) {
> rdd
>   } else {
> val rdd2 = rdd.map(identity)
> rdd2.setName(rdd.first().toString)
> rdd2.cache()
> val rdd3 = rdd2.map(identity)
> rdd3
>   }
> }
> output.print()
> ssc.start()
> ssc.awaitTermination()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-10942) Not all cached RDDs are unpersisted

2015-10-05 Thread Nick Pritchard (JIRA)
Nick Pritchard created SPARK-10942:
--

 Summary: Not all cached RDDs are unpersisted
 Key: SPARK-10942
 URL: https://issues.apache.org/jira/browse/SPARK-10942
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Reporter: Nick Pritchard


I have a Spark Streaming application that caches RDDs inside of a {{transform}} 
closure. Looking at the Spark UI, it seems that most of these RDDs are 
unpersisted after the batch completes, but not all.

I have copied a minimal reproducible example below to highlight the problem. I 
run this and monitor the Spark UI "Storage" tab. The example generates and 
caches 30 RDDs, and I see most get cleaned up. However in the end, some still 
remain cached. There is some randomness going on because I see different RDDs 
remain cached for each run.

I have marked this as Major because I haven't been able to workaround it and it 
is a memory leak for my application. I tried setting {{spark.cleaner.ttl}} but 
that did not change anything.

{code}
val inputRDDs = mutable.Queue.tabulate(30) { i =>
  sc.parallelize(Seq(i))
}
val input: DStream[Int] = ssc.queueStream(inputRDDs)

val output = input.transform { rdd =>
  if (rdd.isEmpty()) {
rdd
  } else {
val rdd2 = rdd.map(identity)
rdd2.setName(rdd.first().toString)
rdd2.cache()
val rdd3 = rdd2.map(identity)
rdd3
  }
}
output.print()

ssc.start()
ssc.awaitTermination()
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10942) Not all cached RDDs are unpersisted

2015-10-05 Thread Nick Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944477#comment-14944477
 ] 

Nick Pritchard commented on SPARK-10942:


[~rekhajoshm] Yes, but calling {{rdd2.unpersist()}} negates the call to 
{{rdd2.cache()}}, no matter where I put it in the {{transform}} closure. This 
is because all the operations on {{rdd2}} are lazy.

> Not all cached RDDs are unpersisted
> ---
>
> Key: SPARK-10942
> URL: https://issues.apache.org/jira/browse/SPARK-10942
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Nick Pritchard
>
> I have a Spark Streaming application that caches RDDs inside of a 
> {{transform}} closure. Looking at the Spark UI, it seems that most of these 
> RDDs are unpersisted after the batch completes, but not all.
> I have copied a minimal reproducible example below to highlight the problem. 
> I run this and monitor the Spark UI "Storage" tab. The example generates and 
> caches 30 RDDs, and I see most get cleaned up. However in the end, some still 
> remain cached. There is some randomness going on because I see different RDDs 
> remain cached for each run.
> I have marked this as Major because I haven't been able to workaround it and 
> it is a memory leak for my application. I tried setting {{spark.cleaner.ttl}} 
> but that did not change anything.
> {code}
> val inputRDDs = mutable.Queue.tabulate(30) { i =>
>   sc.parallelize(Seq(i))
> }
> val input: DStream[Int] = ssc.queueStream(inputRDDs)
> val output = input.transform { rdd =>
>   if (rdd.isEmpty()) {
> rdd
>   } else {
> val rdd2 = rdd.map(identity)
> rdd2.setName(rdd.first().toString)
> rdd2.cache()
> val rdd3 = rdd2.map(identity)
> rdd3
>   }
> }
> output.print()
> ssc.start()
> ssc.awaitTermination()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-10875) RowMatrix.computeCovariance() result is not exactly symmetric

2015-09-29 Thread Nick Pritchard (JIRA)
Nick Pritchard created SPARK-10875:
--

 Summary: RowMatrix.computeCovariance() result is not exactly 
symmetric
 Key: SPARK-10875
 URL: https://issues.apache.org/jira/browse/SPARK-10875
 Project: Spark
  Issue Type: Bug
  Components: MLlib
Affects Versions: 1.5.0
Reporter: Nick Pritchard


For some matrices, I have seen that the computed covariance matrix is not 
exactly symmetric, most likely due to some numerical rounding errors. This is 
problematic when trying to construct an instance of {{MultivariateGaussian}}, 
because it requires an exactly symmetric covariance matrix. See reproducible 
example below.

I would suggest modifying the implementation so that {{G(i, j)}} and {{G(j, 
i)}} are set at the same time, with the same value.

{code}
val rdd = RandomRDDs.normalVectorRDD(sc, 100, 10, 0, 0)
val matrix = new RowMatrix(rdd)
val mean = matrix.computeColumnSummaryStatistics().mean
val cov = matrix.computeCovariance()
val dist = new MultivariateGaussian(mean, cov) //throws 
breeze.linalg.MatrixNotSymmetricException
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-10656) select(df(*)) fails when a column has special characters

2015-09-16 Thread Nick Pritchard (JIRA)
Nick Pritchard created SPARK-10656:
--

 Summary: select(df(*)) fails when a column has special characters
 Key: SPARK-10656
 URL: https://issues.apache.org/jira/browse/SPARK-10656
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.5.0
Reporter: Nick Pritchard


Best explained with this example:
{code}
val df = sqlContext.read.json(sqlContext.sparkContext.makeRDD(
  """{"a.b": "c", "d": "e" }""" :: Nil))
df.select("*").show() //successful
df.select(df("*")).show() //throws exception
df.withColumnRenamed("d", "f").show() //also fails, possibly related
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-10573) IndexToString transformSchema adds output field as DoubleType

2015-09-11 Thread Nick Pritchard (JIRA)
Nick Pritchard created SPARK-10573:
--

 Summary: IndexToString transformSchema adds output field as 
DoubleType
 Key: SPARK-10573
 URL: https://issues.apache.org/jira/browse/SPARK-10573
 Project: Spark
  Issue Type: Bug
  Components: ML
Affects Versions: 1.5.0
Reporter: Nick Pritchard


Reproducible example:
{code}
val stage = new IndexToString().setInputCol("input").setOutputCol("output")
val inSchema = StructType(Seq(StructField("input", DoubleType)))
val outSchema = stage.transformSchema(inSchema)
assert(outSchema("output").dataType == StringType)
{code}

The root cause seems to be that it uses {{NominalAttribute.toStructField}} 
which assumes {{DoubleType}}. It would probably be better to just use 
{{SchemaUtils.appendColumn}} and explicitly set the data type.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-9141) DataFrame recomputed instead of using cached parent.

2015-07-17 Thread Nick Pritchard (JIRA)
Nick Pritchard created SPARK-9141:
-

 Summary: DataFrame recomputed instead of using cached parent.
 Key: SPARK-9141
 URL: https://issues.apache.org/jira/browse/SPARK-9141
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.4.0, 1.4.1
Reporter: Nick Pritchard


As I understand, DataFrame.cache() is supposed to work the same as RDD.cache(), 
so that repeated operations on it will use the cached results and not recompute 
the entire lineage. However, it seems that some DataFrame operations (e.g. 
withColumn) change the underlying RDD lineage so that cache doesn't work as 
expected.

Below is a Scala example that demonstrates this. First, I define two UDF's that 
 use println so that it is easy to see when they are being called. Next, I 
create a simple data frame with one row and two columns. Next, I add a column, 
cache it, and call count() to force the computation. Lastly, I add another 
column, cache it, and call count().

I would have expected the last statement to only compute the last column, since 
everything else was cached. However, because withColumn() changes the lineage, 
the whole data frame is recomputed.

{code:scala}
// Examples udf's that println when called 
val twice = udf { (x: Int) = println(sComputed: twice($x)); x * 2 } 
val triple = udf { (x: Int) = println(sComputed: triple($x)); x * 3 } 

// Initial dataset 
val df1 = sc.parallelize(Seq((a, 1))).toDF(name, value) 

// Add column by applying twice udf 
val df2 = df1.withColumn(twice, twice($value)) 
df2.cache() 
df2.count() //prints Computed: twice(1) 

// Add column by applying triple udf 
val df3 = df2.withColumn(triple, triple($value)) 
df3.cache() 
df3.count() //prints Computed: twice(1)\nComputed: triple(1) 
{code}

I found a workaround, which helped me understand what was going on behind the 
scenes, but doesn't seem like an ideal solution. Basically, I convert to RDD 
then back DataFrame, which seems to freeze the lineage. The code below shows 
the workaround for creating the second data frame so cache will work as 
expected.

{code:scala}
val df2 = {
  val tmp = df1.withColumn(twice, twice($value))
  sqlContext.createDataFrame(tmp.rdd, tmp.schema)
}
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8521) Feature Transformers in 1.5

2015-07-02 Thread Nick Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14612317#comment-14612317
 ] 

Nick Pritchard commented on SPARK-8521:
---

How about a transformer that performs the opposite of StringIndexer? In other 
words, it would convert a Double to String using the labels metadata. This 
would be useful for transforming the output of Predictors for external usage.

 Feature Transformers in 1.5
 ---

 Key: SPARK-8521
 URL: https://issues.apache.org/jira/browse/SPARK-8521
 Project: Spark
  Issue Type: Umbrella
  Components: ML
Reporter: Xiangrui Meng
Assignee: Xiangrui Meng

 This is a list of feature transformers we plan to add in Spark 1.5. Feel free 
 to propose useful transformers that are not on the list.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5934) DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times

2015-02-20 Thread Nick Pritchard (JIRA)
Nick Pritchard created SPARK-5934:
-

 Summary: DStreamGraph.clearMetadata attempts to unpersist the same 
RDD multiple times
 Key: SPARK-5934
 URL: https://issues.apache.org/jira/browse/SPARK-5934
 Project: Spark
  Issue Type: Bug
  Components: Block Manager, Streaming
Affects Versions: 1.2.1
Reporter: Nick Pritchard
Priority: Minor


It seems that since DStream.clearMetadata calls itself recursively on the 
dependencies, that it attempts to unpersist the same RDD, which results in warn 
logs like this:
{quote}
WARN BlockManager: Asked to remove block rdd_2_1, which does not exist
{quote}

or this:
{quote}
WARN BlockManager: Block rdd_2_1 could not be removed as it was not found in 
either the disk, memory, or tachyon store
{quote}

This is preceded by logs like:
{quote}
DEBUG TransformedDStream: Unpersisting old RDDs: 2
DEBUG QueueInputDStream: Unpersisting old RDDs: 2
{quote}

Here is a reproducible case:
{code:scala}
object Test {
  def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster(local[2]).setAppName(Test)
val ssc = new StreamingContext(conf, Seconds(1))
val queue = new mutable.Queue[RDD[Int]]

val input = ssc.queueStream(queue)
val output = input.cache().transform(x = x)
output.print()

ssc.start()
for (i - 1 to 5) {
  val rdd = ssc.sparkContext.parallelize(Seq(i))
  queue.enqueue(rdd)
  Thread.sleep(1000)
}
ssc.stop()
  }
}
{code}

It doesn't seem to be a fatal error, but the WARN messages are a bit unsettling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org