[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.

2015-03-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14358958#comment-14358958
 ] 

Ilya Ganelin edited comment on SPARK-4927 at 3/12/15 6:50 PM:
--

Hi Sean - I have a code snippet that reproduced this. Let me send it to you in 
a bit - I don't have the means to run 1.3 in a cluster.

Realized that I already had that code snippet posted. Running the above code 
doesn't reproduce the issue?



was (Author: ilganeli):
Hi Sean - I have a code snippet that reproduced this. Let me send it to you in 
a bit - I don't have the means to run 1.3 in a cluster.



Sent with Good (www.good.com)




 Spark does not clean up properly during long jobs. 
 ---

 Key: SPARK-4927
 URL: https://issues.apache.org/jira/browse/SPARK-4927
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Ilya Ganelin

 On a long running Spark job, Spark will eventually run out of memory on the 
 driver node due to metadata overhead from the shuffle operation. Spark will 
 continue to operate, however with drastically decreased performance (since 
 swapping now occurs with every operation).
 The spark.cleanup.tll parameter allows a user to configure when cleanup 
 happens but the issue with doing this is that it isn’t done safely, e.g. If 
 this clears a cached RDD or active task in the middle of processing a stage, 
 this ultimately causes a KeyNotFoundException when the next stage attempts to 
 reference the cleared RDD or task.
 There should be a sustainable mechanism for cleaning up stale metadata that 
 allows the program to continue running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.

2014-12-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14261724#comment-14261724
 ] 

Ilya Ganelin edited comment on SPARK-4927 at 12/31/14 12:33 AM:


The below code can produce this issue. I've also included some log output.
{code: java}
def showMemoryUsage(sc : SparkContext) = {
  
  val usersPerStep = 25000
  val count = 100
  val numSteps = count/usersPerStep
  
  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 
count).map(s=(s,2)).cache()
  val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000)
.map(s = (s, 4)).cache()
  
  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

usersFiltered.foreach(user = {
  val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head)
  mult.takeOrdered(20)
  
  // Normally this would then be written to disk
  // For the sake of the example this is all we're doing
})
  }
}

{/code}
Example broadcast variable added:
14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
CLIENT_NODE (size: 794.0 B, free: 441.9 MB)

And then if I parse the entire log looking for “free : XXX.X MB” I see the 
available memory slowly ticking away:
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
…
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
And so on.

Clearly the above code is not persisting the intermediate RDD (mult), yet 
memory is never being properly freed up.



was (Author: ilganeli):
The below code can produce this issue. I've also included some log output.
{code: scala}
def showMemoryUsage(sc : SparkContext) = {
  
  val usersPerStep = 25000
  val count = 100
  val numSteps = count/usersPerStep
  
  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 
count).map(s=(s,2)).cache()
  val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000)
.map(s = (s, 4)).cache()
  
  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

usersFiltered.foreach(user = {
  val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head)
  mult.takeOrdered(20)
  
  // Normally this would then be written to disk
  // For the sake of the example this is all we're doing
})
  }
}

{/code}
Example broadcast variable added:
14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
CLIENT_NODE (size: 794.0 B, free: 441.9 MB)

And then if I parse the entire log looking for “free : XXX.X MB” I see the 
available memory slowly ticking away:
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
…
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
And so on.

Clearly the above code is not persisting the intermediate RDD (mult), yet 
memory is never being properly freed up.


 Spark does not clean up properly during long jobs. 
 ---

 Key: SPARK-4927
 URL: https://issues.apache.org/jira/browse/SPARK-4927
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Ilya Ganelin

 On a long running Spark job, Spark will eventually run out of memory on the 
 driver node due to metadata overhead from the shuffle operation. Spark will 
 continue to operate, however with drastically decreased performance (since 
 swapping now occurs with every operation).
 The spark.cleanup.tll parameter allows a user to configure when cleanup 
 happens but the issue with doing this is that it isn’t done safely, e.g. If 
 this clears a cached RDD or active task in the middle of processing a stage, 
 this ultimately causes a KeyNotFoundException when the next stage attempts to 
 reference the cleared RDD or task.
 There should be a sustainable mechanism for cleaning up stale metadata that 
 allows the program to continue running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.

2014-12-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14261724#comment-14261724
 ] 

Ilya Ganelin edited comment on SPARK-4927 at 12/31/14 12:32 AM:


The below code can produce this issue. I've also included some log output.
{code: scala}
def showMemoryUsage(sc : SparkContext) = {
  
  val usersPerStep = 25000
  val count = 100
  val numSteps = count/usersPerStep
  
  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 
count).map(s=(s,2)).cache()
  val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000)
.map(s = (s, 4)).cache()
  
  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

usersFiltered.foreach(user = {
  val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head)
  mult.takeOrdered(20)
  
  // Normally this would then be written to disk
  // For the sake of the example this is all we're doing
})
  }
}

{/code}
Example broadcast variable added:
14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
CLIENT_NODE (size: 794.0 B, free: 441.9 MB)

And then if I parse the entire log looking for “free : XXX.X MB” I see the 
available memory slowly ticking away:
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
…
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
And so on.

Clearly the above code is not persisting the intermediate RDD (mult), yet 
memory is never being properly freed up.



was (Author: ilganeli):
The below code can produce this issue. I've also included some log output.
{code:Scala}
def showMemoryUsage(sc : SparkContext) = {
  
  val usersPerStep = 25000
  val count = 100
  val numSteps = count/usersPerStep
  
  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 
count).map(s=(s,2)).cache()
  val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000)
.map(s = (s, 4)).cache()
  
  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

usersFiltered.foreach(user = {
  val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head)
  mult.takeOrdered(20)
  
  // Normally this would then be written to disk
  // For the sake of the example this is all we're doing
})
  }
}

{/code}
Example broadcast variable added:
14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
CLIENT_NODE (size: 794.0 B, free: 441.9 MB)

And then if I parse the entire log looking for “free : XXX.X MB” I see the 
available memory slowly ticking away:
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
…
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
And so on.

Clearly the above code is not persisting the intermediate RDD (mult), yet 
memory is never being properly freed up.


 Spark does not clean up properly during long jobs. 
 ---

 Key: SPARK-4927
 URL: https://issues.apache.org/jira/browse/SPARK-4927
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Ilya Ganelin

 On a long running Spark job, Spark will eventually run out of memory on the 
 driver node due to metadata overhead from the shuffle operation. Spark will 
 continue to operate, however with drastically decreased performance (since 
 swapping now occurs with every operation).
 The spark.cleanup.tll parameter allows a user to configure when cleanup 
 happens but the issue with doing this is that it isn’t done safely, e.g. If 
 this clears a cached RDD or active task in the middle of processing a stage, 
 this ultimately causes a KeyNotFoundException when the next stage attempts to 
 reference the cleared RDD or task.
 There should be a sustainable mechanism for cleaning up stale metadata that 
 allows the program to continue running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.

2014-12-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14261724#comment-14261724
 ] 

Ilya Ganelin edited comment on SPARK-4927 at 12/31/14 12:33 AM:


The below code can produce this issue. I've also included some log output.
{code: java}
def showMemoryUsage(sc : SparkContext) = {
  
  val usersPerStep = 25000
  val count = 100
  val numSteps = count/usersPerStep
  
  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 
count).map(s=(s,2)).cache()
  val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000)
.map(s = (s, 4)).cache()
  
  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

usersFiltered.foreach(user = {
  val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head)
  mult.takeOrdered(20)
  
  // Normally this would then be written to disk
  // For the sake of the example this is all we're doing
})
  }
}

{code}
Example broadcast variable added:
14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
CLIENT_NODE (size: 794.0 B, free: 441.9 MB)

And then if I parse the entire log looking for “free : XXX.X MB” I see the 
available memory slowly ticking away:
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
…
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
And so on.

Clearly the above code is not persisting the intermediate RDD (mult), yet 
memory is never being properly freed up.



was (Author: ilganeli):
The below code can produce this issue. I've also included some log output.
{code: java}
def showMemoryUsage(sc : SparkContext) = {
  
  val usersPerStep = 25000
  val count = 100
  val numSteps = count/usersPerStep
  
  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 
count).map(s=(s,2)).cache()
  val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000)
.map(s = (s, 4)).cache()
  
  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

usersFiltered.foreach(user = {
  val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head)
  mult.takeOrdered(20)
  
  // Normally this would then be written to disk
  // For the sake of the example this is all we're doing
})
  }
}

{/code}
Example broadcast variable added:
14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
CLIENT_NODE (size: 794.0 B, free: 441.9 MB)

And then if I parse the entire log looking for “free : XXX.X MB” I see the 
available memory slowly ticking away:
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
…
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
And so on.

Clearly the above code is not persisting the intermediate RDD (mult), yet 
memory is never being properly freed up.


 Spark does not clean up properly during long jobs. 
 ---

 Key: SPARK-4927
 URL: https://issues.apache.org/jira/browse/SPARK-4927
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Ilya Ganelin

 On a long running Spark job, Spark will eventually run out of memory on the 
 driver node due to metadata overhead from the shuffle operation. Spark will 
 continue to operate, however with drastically decreased performance (since 
 swapping now occurs with every operation).
 The spark.cleanup.tll parameter allows a user to configure when cleanup 
 happens but the issue with doing this is that it isn’t done safely, e.g. If 
 this clears a cached RDD or active task in the middle of processing a stage, 
 this ultimately causes a KeyNotFoundException when the next stage attempts to 
 reference the cleared RDD or task.
 There should be a sustainable mechanism for cleaning up stale metadata that 
 allows the program to continue running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.

2014-12-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14261724#comment-14261724
 ] 

Ilya Ganelin edited comment on SPARK-4927 at 12/31/14 12:33 AM:


The below code can produce this issue. I've also included some log output.
{code: scala}
def showMemoryUsage(sc : SparkContext) = {
  
  val usersPerStep = 25000
  val count = 100
  val numSteps = count/usersPerStep
  
  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 
count).map(s=(s,2)).cache()
  val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000)
.map(s = (s, 4)).cache()
  
  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

usersFiltered.foreach(user = {
  val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head)
  mult.takeOrdered(20)
  
  // Normally this would then be written to disk
  // For the sake of the example this is all we're doing
})
  }
}

{code}
Example broadcast variable added:
14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
CLIENT_NODE (size: 794.0 B, free: 441.9 MB)

And then if I parse the entire log looking for “free : XXX.X MB” I see the 
available memory slowly ticking away:
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
…
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
And so on.

Clearly the above code is not persisting the intermediate RDD (mult), yet 
memory is never being properly freed up.



was (Author: ilganeli):
The below code can produce this issue. I've also included some log output.
{code: java}
def showMemoryUsage(sc : SparkContext) = {
  
  val usersPerStep = 25000
  val count = 100
  val numSteps = count/usersPerStep
  
  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 
count).map(s=(s,2)).cache()
  val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000)
.map(s = (s, 4)).cache()
  
  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

usersFiltered.foreach(user = {
  val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head)
  mult.takeOrdered(20)
  
  // Normally this would then be written to disk
  // For the sake of the example this is all we're doing
})
  }
}

{code}
Example broadcast variable added:
14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
CLIENT_NODE (size: 794.0 B, free: 441.9 MB)

And then if I parse the entire log looking for “free : XXX.X MB” I see the 
available memory slowly ticking away:
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
Free 441.8 MB
…
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
Free 441.7 MB
And so on.

Clearly the above code is not persisting the intermediate RDD (mult), yet 
memory is never being properly freed up.


 Spark does not clean up properly during long jobs. 
 ---

 Key: SPARK-4927
 URL: https://issues.apache.org/jira/browse/SPARK-4927
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Ilya Ganelin

 On a long running Spark job, Spark will eventually run out of memory on the 
 driver node due to metadata overhead from the shuffle operation. Spark will 
 continue to operate, however with drastically decreased performance (since 
 swapping now occurs with every operation).
 The spark.cleanup.tll parameter allows a user to configure when cleanup 
 happens but the issue with doing this is that it isn’t done safely, e.g. If 
 this clears a cached RDD or active task in the middle of processing a stage, 
 this ultimately causes a KeyNotFoundException when the next stage attempts to 
 reference the cleared RDD or task.
 There should be a sustainable mechanism for cleaning up stale metadata that 
 allows the program to continue running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org