[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35968583
  
Hmm okay I'm still wondering if there is a path for in-memory data where 
two copies are created:

Cache manager runs:
```
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
```

Block manager runs:
```
doPut(blockId, Left(values.toIterator), level, tellMaster)
```

DoPut runs:
```
val res = memoryStore.putValues(blockId, values, level, true)
```

MemoryStore runs:
```
val valueEntries = new ArrayBuffer[Any]()
  valueEntries ++= values
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/incubator-spark/pull/180#discussion_r10022161
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala ---
@@ -25,7 +25,22 @@ import org.apache.spark.SparkConf
 
 private[spark] class JavaSerializationStream(out: OutputStream) extends 
SerializationStream {
   val objOut = new ObjectOutputStream(out)
-  def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); 
this }
+  var counter = 0
+  /* Calling reset to avoid memory leak:
+   * 
http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
+   * But only call it every 1000th time to avoid bloated serialization 
streams (when
+   * the stream 'resets' object class descriptions have to be re-written)  
+   */
+  def writeObject[T](t: T): SerializationStream = {
+objOut.writeObject(t)
+if (counter >= 1000) {
+  objOut.reset()
+  counter = 0
+} else {
--- End diff --

This (1000) should be configurable - really helps when using flatMap (in 
particular) to not create multiple instances of same object during ser-deser 
time.
But otherwise, very good idea !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/incubator-spark/pull/180#discussion_r10022071
  
--- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala ---
@@ -71,10 +71,21 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
   val computedValues = rdd.computeOrReadCheckpoint(split, context)
   // Persist the result, so long as the task is not running locally
   if (context.runningLocally) { return computedValues }
-  val elements = new ArrayBuffer[Any]
-  elements ++= computedValues
-  blockManager.put(key, elements, storageLevel, tellMaster = true)
-  elements.iterator.asInstanceOf[Iterator[T]]
+  if (storageLevel.useDisk && !storageLevel.useMemory) {
--- End diff --

/cc @RongGu make sure you catch this correctly in the Tachyon PR if this is 
merged... this will be easy to miss.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35967757
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35967767
  
One or more automated tests failed
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12841/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35967758
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35967766
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35967547
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35967536
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35967549
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12838/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35967538
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12836/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/incubator-spark/pull/180#discussion_r10021460
  
--- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala ---
@@ -71,10 +71,21 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
   val computedValues = rdd.computeOrReadCheckpoint(split, context)
   // Persist the result, so long as the task is not running locally
   if (context.runningLocally) { return computedValues }
-  val elements = new ArrayBuffer[Any]
-  elements ++= computedValues
-  blockManager.put(key, elements, storageLevel, tellMaster = true)
-  elements.iterator.asInstanceOf[Iterator[T]]
+  if (storageLevel.useDisk && !storageLevel.useMemory) {
+blockManager.put(key, computedValues, storageLevel, tellMaster 
= true)
+return blockManager.get(key)  match {
--- End diff --

`get(key) match` (one space)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35966452
  
@kellrott  - ah I see, this has just moved the copy from one location to 
another... I retract my comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/incubator-spark/pull/180#discussion_r10021400
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -65,17 +65,19 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
 
   override def putValues(
   blockId: BlockId,
-  values: ArrayBuffer[Any],
+  values: Iterator[Any],
   level: StorageLevel,
   returnValues: Boolean)
 : PutResult = {
 
 if (level.deserialized) {
-  val sizeEstimate = 
SizeEstimator.estimate(values.asInstanceOf[AnyRef])
-  tryToPut(blockId, values, sizeEstimate, true)
-  PutResult(sizeEstimate, Left(values.iterator))
+  val valueEntries = new ArrayBuffer[Any]()
+  valueEntries ++= values
--- End diff --

Making a new buffer here is the issue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35966227
  
Hey @kellrott - I started to do a review on this focused on the tests and 
smaller stuff. But I realized, this makes a fairly major change to the block 
manager API in that it changes it to accept iterators instead of existing 
buffers. This means you do a copy into a new buffer in the case where an 
iterator is not used - which is expensive and will regress behavior for 
existing users. That I think blocks this patch as-is from being merged.

I need to think about this a bit more and see if there is a more 
surgical/simple solution to fixing this. Since this is not a super common issue 
(although agreed it would be way better to pipeline this write directly to 
Disk) it would be nice if we could avoid changing the codepath for normal 
users. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/incubator-spark/pull/180#discussion_r10020928
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+import org.apache.spark.{LocalSparkContext, SparkContext}
+import org.apache.commons.io.FileUtils
+import java.io.File
+
+class Expander(base:String, count:Int) extends Iterator[String] {
+  var i = 0;
+  def next() : String = {
+i += 1;
+return base + i.toString;
+  }
+  def hasNext() : Boolean = i < count;
+}
+
+object Expander {
+  def expand(s:String, i:Int) : Iterator[String] = {
+return new Expander(s,i)
+  }
+}
+
+class LargeIteratorSuite extends FunSuite with LocalSparkContext {
+  /* Tests the ability of Spark to deal with user provided iterators that
+   * generate more data then available memory. In any memory based 
persistance
+   * Spark will unroll the iterator into an ArrayBuffer for caching, 
however in
+   * the case that the use defines DISK_ONLY persistance, the iterator 
will be 
+   * fed directly to the serializer and written to disk.
+   */
+  val clusterUrl = "local-cluster[1,1,512]"
+  test("Flatmap iterator") {
+sc = new SparkContext(clusterUrl, "mem_test");
+val seeds = sc.parallelize( Array(
+  "This is the first sentence that we will test:",
+  "This is the second sentence that we will test:",
+  "This is the third sentence that we will test:"
+) );
+val expand_size = 1000;
--- End diff --

Also if you write a smaller test I'd just have the test write and then read 
back an RDD and make sure they are exactly the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/incubator-spark/pull/180#discussion_r10020917
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+import org.apache.spark.{LocalSparkContext, SparkContext}
+import org.apache.commons.io.FileUtils
+import java.io.File
+
+class Expander(base:String, count:Int) extends Iterator[String] {
+  var i = 0;
+  def next() : String = {
+i += 1;
+return base + i.toString;
+  }
+  def hasNext() : Boolean = i < count;
+}
+
+object Expander {
+  def expand(s:String, i:Int) : Iterator[String] = {
+return new Expander(s,i)
+  }
+}
+
+class LargeIteratorSuite extends FunSuite with LocalSparkContext {
+  /* Tests the ability of Spark to deal with user provided iterators that
+   * generate more data then available memory. In any memory based 
persistance
+   * Spark will unroll the iterator into an ArrayBuffer for caching, 
however in
+   * the case that the use defines DISK_ONLY persistance, the iterator 
will be 
+   * fed directly to the serializer and written to disk.
+   */
+  val clusterUrl = "local-cluster[1,1,512]"
+  test("Flatmap iterator") {
+sc = new SparkContext(clusterUrl, "mem_test");
+val seeds = sc.parallelize( Array(
+  "This is the first sentence that we will test:",
+  "This is the second sentence that we will test:",
+  "This is the third sentence that we will test:"
+) );
+val expand_size = 1000;
--- End diff --

It seems like this test writes a very large amount of data to disk. Spark 
has hundreds of tests and each one can only take a short amount of time. So it 
would be great if you could write a test that just writes a small amount of 
data to disk.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/incubator-spark/pull/180#discussion_r10020880
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+import org.apache.spark.{LocalSparkContext, SparkContext}
+import org.apache.commons.io.FileUtils
+import java.io.File
+
+class Expander(base:String, count:Int) extends Iterator[String] {
--- End diff --

Hey just wondering about this class... is this necessary? It might be 
possible to write more concise tests:

```
sc.makeRDD(1 to 1).flatMap(1 to 1000).saveAsTextFile 
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/incubator-spark/pull/180#discussion_r10020710
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala ---
@@ -25,7 +25,22 @@ import org.apache.spark.SparkConf
 
 private[spark] class JavaSerializationStream(out: OutputStream) extends 
SerializationStream {
   val objOut = new ObjectOutputStream(out)
-  def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); 
this }
+  var counter = 0;
+  /* Calling reset to avoid memory leak:
+   * 
http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
+   * But only call it every 1000th time to avoid bloated serialization 
streams (when
+   * the stream 'resets' object class descriptions have to be re-written)  
+   */
+  def writeObject[T](t: T): SerializationStream = {
+objOut.writeObject(t);
--- End diff --

could you remove this semi-colon and all the other ones in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/incubator-spark/pull/180#discussion_r10020704
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala ---
@@ -25,7 +25,22 @@ import org.apache.spark.SparkConf
 
 private[spark] class JavaSerializationStream(out: OutputStream) extends 
SerializationStream {
   val objOut = new ObjectOutputStream(out)
-  def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); 
this }
+  var counter = 0;
+  /* Calling reset to avoid memory leak:
+   * 
http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
+   * But only call it every 1000th time to avoid bloated serialization 
streams (when
+   * the stream 'resets' object class descriptions have to be re-written)  
+   */
+  def writeObject[T](t: T): SerializationStream = {
+objOut.writeObject(t);
+if (counter >= 1000) {
+  objOut.reset();
+  counter = 0;
+} else {
+  counter+=1;
--- End diff --

`counter += 1`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/incubator-spark/pull/180#discussion_r10020702
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala ---
@@ -25,7 +25,22 @@ import org.apache.spark.SparkConf
 
 private[spark] class JavaSerializationStream(out: OutputStream) extends 
SerializationStream {
   val objOut = new ObjectOutputStream(out)
-  def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); 
this }
+  var counter = 0;
+  /* Calling reset to avoid memory leak:
+   * 
http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
+   * But only call it every 1000th time to avoid bloated serialization 
streams (when
+   * the stream 'resets' object class descriptions have to be re-written)  
+   */
+  def writeObject[T](t: T): SerializationStream = {
+objOut.writeObject(t);
+if (counter >= 1000) {
--- End diff --

It's funny we had the same problem earlier in Spark and I looked for a long 
time and was convinced this reset() thing didn't work this way. We ended up 
solving it another way. But after reading the OOS source code (again) it looks 
like this is what we want. Good catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35964570
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35964569
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35963965
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35963966
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35963553
  
(`sbt/sbt scalastyle` runs its namesake, by the way)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread kellrott
Github user kellrott commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35961011
  
sbt/sbt assembly runs fine. But I haven't re-merged master since 0.9 was 
released (about 20 days...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread ash211
Github user ash211 commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35960770
  
The style rules are defined in scalastyle-config.xml but I'm unsure how to
run that test locally.  Does it not fail when you compile with "sbt/sbt
assemble" ?


On Mon, Feb 24, 2014 at 4:24 PM, Kyle Ellrott 
wrote:

> Thanks, I was trying to figure out what the heck that was all about. Must
> be a new code check they recently added.
>
> —
> Reply to this email directly or view it on 
GitHub
> .
>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread kellrott
Github user kellrott commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35960651
  
Thanks, I was trying to figure out what the heck that was all about. Must 
be a new code check they recently added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35960555
  
One or more automated tests failed
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12835/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35960554
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35960478
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35960479
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread ash211
Github user ash211 commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35960492
  
The test that failed was a line-too-long error:

error 
file=/root/workspace/SparkPullRequestBuilder/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
 message=File line length exceeds 100 characters line=29

Can you wrap that line to under 100 characters?  I think it's the one with 
the URL


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35960041
  
One or more automated tests failed
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12833/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35960040
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35959961
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35959962
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/incubator-spark/pull/180#discussion_r10011961
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+import org.apache.spark.{LocalSparkContext, SparkContext}
+
+class Expander(base:String, count:Int) extends Iterator[String] {
+  var i = 0;
+  def next() : String = {
+i += 1;
+return base + i.toString;
+  }
+  def hasNext() : Boolean = i < count;
+}
+
+object Expander {
+  def expand(s:String, i:Int) : Iterator[String] = {
+return new Expander(s,i)
+  }
+}
+
+class LargeIteratorSuite extends FunSuite with LocalSparkContext {
+  /* Tests the ability of Spark to deal with user provided iterators that
+   * generate more data then available memory. In any memory based 
persistance
+   * Spark will unroll the iterator into an ArrayBuffer for caching, 
however in
+   * the case that the use defines DISK_ONLY persistance, the iterator 
will be 
+   * fed directly to the serializer and written to disk.
+   */
+  val clusterUrl = "local-cluster[1,1,512]"
+  test("Flatmap iterator") {
+sc = new SparkContext(clusterUrl, "mem_test");
+val seeds = sc.parallelize( Array(
+  "This is the first sentence that we will test:",
+  "This is the second sentence that we will test:",
+  "This is the third sentence that we will test:"
+) );
+val out = seeds.flatMap(Expander.expand(_,1000));
--- End diff --

Could this test the read-side as well... right now it seems to only test 
that there is not an exception when writing - but there is no test that the 
correctness is preserved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: SPARK-942 patch review

2014-02-24 Thread Matei Zaharia
Thanks for bringing this up. One issue that makes this harder is that old 
inactive PRs on GitHub are not really getting closed, so active ones might be 
lost between those. For now please just post on the dev list if your PR is 
being ignored. We’ll implement some kind of cleanup (at least manually) to 
close the old ones.

Matei

On Feb 24, 2014, at 1:30 PM, Andrew Ash  wrote:

> Yep that's the one thanks! That's quite a few more people than I thought
> 
> Sent from my mobile phone
> On Feb 24, 2014 1:20 PM, "Nan Zhu"  wrote:
> 
>> Do you mean this
>> https://cwiki.apache.org/confluence/display/SPARK/Committers?
>> 
>> --
>> Nan Zhu
>> 
>> 
>> On Monday, February 24, 2014 at 4:18 PM, Andrew Ash wrote:
>> 
>>> Would love to have a discussion since I know the core contributors are
>>> facing a barrage of PRs and things are falling through the cracks.
>>> 
>>> Is there a list of who can commit to core Spark somewhere? Maybe that
>> list
>>> should be expanded or there should be a rotation of PR duty of some sort.
>>> 
>>> One of the perils of having a vibrant, organic community is that you get
>>> way more contributions than you expected!
>>> 
>>> 
>>> On Mon, Feb 24, 2014 at 1:16 PM, Nan Zhu > zhunanmcg...@gmail.com)> wrote:
>>> 
>>>> yet another email about forgotten PR
>>>> 
>>>> I think Sean would like to start some discussion on the current
>> situation
>>>> where committers are facing a flood of PRs recently (as he said in the
>>>> discussion thread about how to prevent the blob of RDD API)?
>>>> 
>>>> Best,
>>>> 
>>>> --
>>>> Nan Zhu
>>>> 
>>>> 
>>>> On Monday, February 24, 2014 at 4:07 PM, Andrew Ash wrote:
>>>> 
>>>>> Hi Spark devs,
>>>>> 
>>>>> Kyle identified a deficiency in Spark where generating iterators are
>>>>> unrolled into memory and then flushed to disk rather than sent
>> straight
>>>>> 
>>>> 
>>>> to
>>>>> disk when possible.
>>>>> 
>>>>> He's had a patch sitting ready for code review for quite some time
>> now
>>>> (100
>>>>> days) but no response.
>>>>> 
>>>>> Is this something that an admin would be able to review? I for one
>> would
>>>>> find this quite valuable.
>>>>> 
>>>>> Thanks!
>>>>> Andrew
>>>>> 
>>>>> 
>>>>> https://spark-project.atlassian.net/browse/SPARK-942
>>>>> https://github.com/apache/incubator-spark/pull/180
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
>> 
>> 
>> 



Re: SPARK-942 patch review

2014-02-24 Thread Andrew Ash
Yep that's the one thanks! That's quite a few more people than I thought

Sent from my mobile phone
On Feb 24, 2014 1:20 PM, "Nan Zhu"  wrote:

> Do you mean this
> https://cwiki.apache.org/confluence/display/SPARK/Committers?
>
> --
> Nan Zhu
>
>
> On Monday, February 24, 2014 at 4:18 PM, Andrew Ash wrote:
>
> > Would love to have a discussion since I know the core contributors are
> > facing a barrage of PRs and things are falling through the cracks.
> >
> > Is there a list of who can commit to core Spark somewhere? Maybe that
> list
> > should be expanded or there should be a rotation of PR duty of some sort.
> >
> > One of the perils of having a vibrant, organic community is that you get
> > way more contributions than you expected!
> >
> >
> > On Mon, Feb 24, 2014 at 1:16 PM, Nan Zhu  zhunanmcg...@gmail.com)> wrote:
> >
> > > yet another email about forgotten PR
> > >
> > > I think Sean would like to start some discussion on the current
> situation
> > > where committers are facing a flood of PRs recently (as he said in the
> > > discussion thread about how to prevent the blob of RDD API)?
> > >
> > > Best,
> > >
> > > --
> > > Nan Zhu
> > >
> > >
> > > On Monday, February 24, 2014 at 4:07 PM, Andrew Ash wrote:
> > >
> > > > Hi Spark devs,
> > > >
> > > > Kyle identified a deficiency in Spark where generating iterators are
> > > > unrolled into memory and then flushed to disk rather than sent
> straight
> > > >
> > >
> > > to
> > > > disk when possible.
> > > >
> > > > He's had a patch sitting ready for code review for quite some time
> now
> > > (100
> > > > days) but no response.
> > > >
> > > > Is this something that an admin would be able to review? I for one
> would
> > > > find this quite valuable.
> > > >
> > > > Thanks!
> > > > Andrew
> > > >
> > > >
> > > > https://spark-project.atlassian.net/browse/SPARK-942
> > > > https://github.com/apache/incubator-spark/pull/180
> > > >
> > >
> > >
> >
> >
> >
>
>
>


Re: SPARK-942 patch review

2014-02-24 Thread Nan Zhu
Do you mean this https://cwiki.apache.org/confluence/display/SPARK/Committers?

-- 
Nan Zhu


On Monday, February 24, 2014 at 4:18 PM, Andrew Ash wrote:

> Would love to have a discussion since I know the core contributors are
> facing a barrage of PRs and things are falling through the cracks.
> 
> Is there a list of who can commit to core Spark somewhere? Maybe that list
> should be expanded or there should be a rotation of PR duty of some sort.
> 
> One of the perils of having a vibrant, organic community is that you get
> way more contributions than you expected!
> 
> 
> On Mon, Feb 24, 2014 at 1:16 PM, Nan Zhu  (mailto:zhunanmcg...@gmail.com)> wrote:
> 
> > yet another email about forgotten PR
> > 
> > I think Sean would like to start some discussion on the current situation
> > where committers are facing a flood of PRs recently (as he said in the
> > discussion thread about how to prevent the blob of RDD API)?
> > 
> > Best,
> > 
> > --
> > Nan Zhu
> > 
> > 
> > On Monday, February 24, 2014 at 4:07 PM, Andrew Ash wrote:
> > 
> > > Hi Spark devs,
> > > 
> > > Kyle identified a deficiency in Spark where generating iterators are
> > > unrolled into memory and then flushed to disk rather than sent straight
> > > 
> > 
> > to
> > > disk when possible.
> > > 
> > > He's had a patch sitting ready for code review for quite some time now
> > (100
> > > days) but no response.
> > > 
> > > Is this something that an admin would be able to review? I for one would
> > > find this quite valuable.
> > > 
> > > Thanks!
> > > Andrew
> > > 
> > > 
> > > https://spark-project.atlassian.net/browse/SPARK-942
> > > https://github.com/apache/incubator-spark/pull/180
> > > 
> > 
> > 
> 
> 
> 




Re: SPARK-942 patch review

2014-02-24 Thread Andrew Ash
Would love to have a discussion since I know the core contributors are
facing a barrage of PRs and things are falling through the cracks.

Is there a list of who can commit to core Spark somewhere?  Maybe that list
should be expanded or there should be a rotation of PR duty of some sort.

One of the perils of having a vibrant, organic community is that you get
way more contributions than you expected!


On Mon, Feb 24, 2014 at 1:16 PM, Nan Zhu  wrote:

> yet another email about forgotten PR
>
> I think Sean would like to start some discussion on the current situation
> where committers are facing a flood of PRs recently (as he said in the
> discussion thread about how to prevent the blob of RDD API)?
>
> Best,
>
> --
> Nan Zhu
>
>
> On Monday, February 24, 2014 at 4:07 PM, Andrew Ash wrote:
>
> > Hi Spark devs,
> >
> > Kyle identified a deficiency in Spark where generating iterators are
> > unrolled into memory and then flushed to disk rather than sent straight
> to
> > disk when possible.
> >
> > He's had a patch sitting ready for code review for quite some time now
> (100
> > days) but no response.
> >
> > Is this something that an admin would be able to review? I for one would
> > find this quite valuable.
> >
> > Thanks!
> > Andrew
> >
> >
> > https://spark-project.atlassian.net/browse/SPARK-942
> > https://github.com/apache/incubator-spark/pull/180
> >
> >
>
>
>


Re: SPARK-942 patch review

2014-02-24 Thread Nan Zhu
yet another email about forgotten PR

I think Sean would like to start some discussion on the current situation where 
committers are facing a flood of PRs recently (as he said in the discussion 
thread about how to prevent the blob of RDD API)?

Best, 

-- 
Nan Zhu


On Monday, February 24, 2014 at 4:07 PM, Andrew Ash wrote:

> Hi Spark devs,
> 
> Kyle identified a deficiency in Spark where generating iterators are
> unrolled into memory and then flushed to disk rather than sent straight to
> disk when possible.
> 
> He's had a patch sitting ready for code review for quite some time now (100
> days) but no response.
> 
> Is this something that an admin would be able to review? I for one would
> find this quite valuable.
> 
> Thanks!
> Andrew
> 
> 
> https://spark-project.atlassian.net/browse/SPARK-942
> https://github.com/apache/incubator-spark/pull/180
> 
> 




[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread ash211
Github user ash211 commented on a diff in the pull request:

https://github.com/apache/incubator-spark/pull/180#discussion_r10010480
  
--- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala ---
@@ -71,10 +71,21 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
   val computedValues = rdd.computeOrReadCheckpoint(split, context)
   // Persist the result, so long as the task is not running locally
   if (context.runningLocally) { return computedValues }
-  val elements = new ArrayBuffer[Any]
-  elements ++= computedValues
-  blockManager.put(key, elements, storageLevel, tellMaster = true)
-  elements.iterator.asInstanceOf[Iterator[T]]
+  if (storageLevel == StorageLevel.DISK_ONLY || storageLevel == 
StorageLevel.DISK_ONLY_2) {
--- End diff --

StorageLevels have .useDisk() and .useMemory() methods.  
https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
 Maybe it makes sense to replace this with storageLevel.useDisk() && 
!storageLevel.useMemory()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


SPARK-942 patch review

2014-02-24 Thread Andrew Ash
Hi Spark devs,

Kyle identified a deficiency in Spark where generating iterators are
unrolled into memory and then flushed to disk rather than sent straight to
disk when possible.

He's had a patch sitting ready for code review for quite some time now (100
days) but no response.

Is this something that an admin would be able to review?  I for one would
find this quite valuable.

Thanks!
Andrew


https://spark-project.atlassian.net/browse/SPARK-942
https://github.com/apache/incubator-spark/pull/180


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-24 Thread kellrott
Github user kellrott commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35916887
  
Pull Request log, day 100. It’s been three months since I was submitted, 
and almost 2 months since I last heard from an admin. I pass all unit tests and 
fix and known issues, but I’ve reached the last page of the request list, and 
it looking like I’ve been forgotten about. It’s getting cold and I’m 
starting to lose hope. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-16 Thread kellrott
Github user kellrott commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-35211241
  
Are there any other remaining issues that are preventing this pull request 
from being reviewed/merged?


If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. To do so, please top-post your response.
If your project does not have this feature enabled and wishes so, or if the
feature is enabled but not working, please contact infrastructure at
infrastruct...@apache.org or file a JIRA ticket with INFRA.


[GitHub] incubator-spark pull request: Patch for SPARK-942

2014-02-10 Thread ash211
Github user ash211 commented on the pull request:

https://github.com/apache/incubator-spark/pull/180#issuecomment-34660329
  
I often use Spark for ETL and the ability to stream directly onto (HDFS) 
disk would be valuable for me.  No need to buffer in memory if we don't have to.

Admins, is there anything else you want to see from this PR before merging?



Re: SPARK-942

2013-11-14 Thread Evan Chan
+1 for IteratorWithSizeEstimate.

I believe today only HadoopRDDs are able to give fine grained
progress;  with an enhanced iterator interface (which can still expose
the base Iterator trait) we can extend the possibility of fine grained
progress to all RDDs that implement the enhanced iterator.

On Tue, Nov 12, 2013 at 11:07 AM, Stephen Haberman
 wrote:
>
>> The problem is that the iterator interface only defines 'hasNext' and
>> 'next' methods.
>
> Just a comment from the peanut gallery, but FWIW it seems like being
> able to ask "how much data is here" would be a useful thing for Spark
> to know, even if that means moving away from Iterator itself, or
> something like IteratorWithSizeEstimate/something/something.
>
> Not only for this, but so that, ideally, Spark could basically do
> dynamic partitioning.
>
> E.g. when we load a month's worth of data, it's X GB, but after a few
> maps and filters, it's X/100 GB, so could use X/100 partitions instead.
>
> But right now all partitioning decisions are made up-front,
> via .coalesce/etc. type hints from the programmer, and it seems if
> Spark could delay making partitioning decisions each until RDD could
> like lazily-eval/sample a few lines (hand waving), that would be super
> sexy from our respective, in terms of doing automatic perf/partition
> optimization.
>
> Huge disclaimer that this is probably a big pita to implement, and
> could likely not be as worthwhile as I naively think it would be.
>
> - Stephen



-- 
--
Evan Chan
Staff Engineer
e...@ooyala.com  |


Re: SPARK-942

2013-11-13 Thread Aaron Davidson
By the way, there are a few places one can look for logs while testing:
Unit test runner logs (should contain driver and worker
logs): core/target/unit-tests.log
Executor logs: work/app-*

This should help find the root exception when you see one caught by the
DAGScheduler, such as in this case.


On Tue, Nov 12, 2013 at 6:21 PM, Kyle Ellrott  wrote:

> Sure, do you have a URL for your patch?
>
> Kyle
> On Nov 12, 2013 5:59 PM, "Xia, Junluan"  wrote:
>
> > Hi kely
> >
> > I also build a patch for this issue, and pass the test, you could help me
> > to review if you are free.
> >
> > -Original Message-
> > From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu]
> > Sent: Wednesday, November 13, 2013 8:44 AM
> > To: dev@spark.incubator.apache.org
> > Subject: Re: SPARK-942
> >
> > I've posted a patch that I think produces the correct behavior at
> >
> >
> https://github.com/kellrott/incubator-spark/commit/efe1102c8a7436b2fe112d3bece9f35fedea0dc8
> >
> > It works fine on my programs, but if I run the unit tests, I get errors
> > like:
> >
> > [info] - large number of iterations *** FAILED ***
> > [info]   org.apache.spark.SparkException: Job aborted: Task 4.0:0 failed
> > more than 0 times; aborting job java.lang.ClassCastException:
> > scala.collection.immutable.StreamIterator cannot be cast to
> > scala.collection.mutable.ArrayBuffer
> > [info]   at
> >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:818)
> > [info]   at
> >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:816)
> > [info]   at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> > [info]   at
> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > [info]   at
> >
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:816)
> > [info]   at
> >
> >
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:431)
> > [info]   at org.apache.spark.scheduler.DAGScheduler.org
> > $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:493)
> > [info]   at
> >
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158)
> >
> >
> > I can't figure out the line number of where the original error occurred.
> > Or why I can't replicate them in my various test programs.
> > Any help would be appreciated.
> >
> > Kyle
> >
> >
> >
> >
> >
> >
> > On Tue, Nov 12, 2013 at 11:35 AM, Alex Boisvert  > >wrote:
> >
> > > On Tue, Nov 12, 2013 at 11:07 AM, Stephen Haberman <
> > > stephen.haber...@gmail.com> wrote:
> > >
> > > > Huge disclaimer that this is probably a big pita to implement, and
> > > > could likely not be as worthwhile as I naively think it would be.
> > > >
> > >
> > > My perspective on this is it's already big pita of Spark users today.
> > >
> > > In the absence of explicit directions/hints, Spark should be able to
> > > make ballpark estimates and conservatively pick # of partitions,
> > > storage strategies (e.g., memory vs disk) and other runtime parameters
> > that fit the
> > > deployment architecture/capacities.   If this requires code and extra
> > > runtime resources for sampling/measuring data, guestimating job size,
> > > and so on, so be it.
> > >
> > > Users want working jobs first.  Optimal performance / resource
> > > utilization follow from that.
> > >
> >
>


RE: SPARK-942

2013-11-12 Thread Kyle Ellrott
Sure, do you have a URL for your patch?

Kyle
On Nov 12, 2013 5:59 PM, "Xia, Junluan"  wrote:

> Hi kely
>
> I also build a patch for this issue, and pass the test, you could help me
> to review if you are free.
>
> -Original Message-
> From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu]
> Sent: Wednesday, November 13, 2013 8:44 AM
> To: dev@spark.incubator.apache.org
> Subject: Re: SPARK-942
>
> I've posted a patch that I think produces the correct behavior at
>
> https://github.com/kellrott/incubator-spark/commit/efe1102c8a7436b2fe112d3bece9f35fedea0dc8
>
> It works fine on my programs, but if I run the unit tests, I get errors
> like:
>
> [info] - large number of iterations *** FAILED ***
> [info]   org.apache.spark.SparkException: Job aborted: Task 4.0:0 failed
> more than 0 times; aborting job java.lang.ClassCastException:
> scala.collection.immutable.StreamIterator cannot be cast to
> scala.collection.mutable.ArrayBuffer
> [info]   at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:818)
> [info]   at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:816)
> [info]   at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> [info]   at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> [info]   at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:816)
> [info]   at
>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:431)
> [info]   at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:493)
> [info]   at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158)
>
>
> I can't figure out the line number of where the original error occurred.
> Or why I can't replicate them in my various test programs.
> Any help would be appreciated.
>
> Kyle
>
>
>
>
>
>
> On Tue, Nov 12, 2013 at 11:35 AM, Alex Boisvert  >wrote:
>
> > On Tue, Nov 12, 2013 at 11:07 AM, Stephen Haberman <
> > stephen.haber...@gmail.com> wrote:
> >
> > > Huge disclaimer that this is probably a big pita to implement, and
> > > could likely not be as worthwhile as I naively think it would be.
> > >
> >
> > My perspective on this is it's already big pita of Spark users today.
> >
> > In the absence of explicit directions/hints, Spark should be able to
> > make ballpark estimates and conservatively pick # of partitions,
> > storage strategies (e.g., memory vs disk) and other runtime parameters
> that fit the
> > deployment architecture/capacities.   If this requires code and extra
> > runtime resources for sampling/measuring data, guestimating job size,
> > and so on, so be it.
> >
> > Users want working jobs first.  Optimal performance / resource
> > utilization follow from that.
> >
>


RE: SPARK-942

2013-11-12 Thread Xia, Junluan
Hi kely 

I also build a patch for this issue, and pass the test, you could help me to 
review if you are free.

-Original Message-
From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu] 
Sent: Wednesday, November 13, 2013 8:44 AM
To: dev@spark.incubator.apache.org
Subject: Re: SPARK-942

I've posted a patch that I think produces the correct behavior at
https://github.com/kellrott/incubator-spark/commit/efe1102c8a7436b2fe112d3bece9f35fedea0dc8

It works fine on my programs, but if I run the unit tests, I get errors
like:

[info] - large number of iterations *** FAILED ***
[info]   org.apache.spark.SparkException: Job aborted: Task 4.0:0 failed
more than 0 times; aborting job java.lang.ClassCastException:
scala.collection.immutable.StreamIterator cannot be cast to 
scala.collection.mutable.ArrayBuffer
[info]   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:818)
[info]   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:816)
[info]   at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
[info]   at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[info]   at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:816)
[info]   at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:431)
[info]   at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:493)
[info]   at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158)


I can't figure out the line number of where the original error occurred. Or why 
I can't replicate them in my various test programs.
Any help would be appreciated.

Kyle






On Tue, Nov 12, 2013 at 11:35 AM, Alex Boisvert wrote:

> On Tue, Nov 12, 2013 at 11:07 AM, Stephen Haberman < 
> stephen.haber...@gmail.com> wrote:
>
> > Huge disclaimer that this is probably a big pita to implement, and 
> > could likely not be as worthwhile as I naively think it would be.
> >
>
> My perspective on this is it's already big pita of Spark users today.
>
> In the absence of explicit directions/hints, Spark should be able to 
> make ballpark estimates and conservatively pick # of partitions, 
> storage strategies (e.g., memory vs disk) and other runtime parameters that 
> fit the
> deployment architecture/capacities.   If this requires code and extra
> runtime resources for sampling/measuring data, guestimating job size, 
> and so on, so be it.
>
> Users want working jobs first.  Optimal performance / resource 
> utilization follow from that.
>


Re: SPARK-942

2013-11-12 Thread Kyle Ellrott
I've posted a patch that I think produces the correct behavior at
https://github.com/kellrott/incubator-spark/commit/efe1102c8a7436b2fe112d3bece9f35fedea0dc8

It works fine on my programs, but if I run the unit tests, I get errors
like:

[info] - large number of iterations *** FAILED ***
[info]   org.apache.spark.SparkException: Job aborted: Task 4.0:0 failed
more than 0 times; aborting job java.lang.ClassCastException:
scala.collection.immutable.StreamIterator cannot be cast to
scala.collection.mutable.ArrayBuffer
[info]   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:818)
[info]   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:816)
[info]   at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
[info]   at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[info]   at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:816)
[info]   at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:431)
[info]   at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:493)
[info]   at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158)


I can't figure out the line number of where the original error occurred. Or
why I can't replicate them in my various test programs.
Any help would be appreciated.

Kyle






On Tue, Nov 12, 2013 at 11:35 AM, Alex Boisvert wrote:

> On Tue, Nov 12, 2013 at 11:07 AM, Stephen Haberman <
> stephen.haber...@gmail.com> wrote:
>
> > Huge disclaimer that this is probably a big pita to implement, and
> > could likely not be as worthwhile as I naively think it would be.
> >
>
> My perspective on this is it's already big pita of Spark users today.
>
> In the absence of explicit directions/hints, Spark should be able to make
> ballpark estimates and conservatively pick # of partitions, storage
> strategies (e.g., memory vs disk) and other runtime parameters that fit the
> deployment architecture/capacities.   If this requires code and extra
> runtime resources for sampling/measuring data, guestimating job size, and
> so on, so be it.
>
> Users want working jobs first.  Optimal performance / resource utilization
> follow from that.
>


Re: SPARK-942

2013-11-12 Thread Alex Boisvert
On Tue, Nov 12, 2013 at 11:07 AM, Stephen Haberman <
stephen.haber...@gmail.com> wrote:

> Huge disclaimer that this is probably a big pita to implement, and
> could likely not be as worthwhile as I naively think it would be.
>

My perspective on this is it's already big pita of Spark users today.

In the absence of explicit directions/hints, Spark should be able to make
ballpark estimates and conservatively pick # of partitions, storage
strategies (e.g., memory vs disk) and other runtime parameters that fit the
deployment architecture/capacities.   If this requires code and extra
runtime resources for sampling/measuring data, guestimating job size, and
so on, so be it.

Users want working jobs first.  Optimal performance / resource utilization
follow from that.


Re: SPARK-942

2013-11-12 Thread Stephen Haberman

> The problem is that the iterator interface only defines 'hasNext' and
> 'next' methods.

Just a comment from the peanut gallery, but FWIW it seems like being
able to ask "how much data is here" would be a useful thing for Spark
to know, even if that means moving away from Iterator itself, or
something like IteratorWithSizeEstimate/something/something.

Not only for this, but so that, ideally, Spark could basically do
dynamic partitioning.

E.g. when we load a month's worth of data, it's X GB, but after a few
maps and filters, it's X/100 GB, so could use X/100 partitions instead.

But right now all partitioning decisions are made up-front,
via .coalesce/etc. type hints from the programmer, and it seems if
Spark could delay making partitioning decisions each until RDD could
like lazily-eval/sample a few lines (hand waving), that would be super
sexy from our respective, in terms of doing automatic perf/partition
optimization.

Huge disclaimer that this is probably a big pita to implement, and
could likely not be as worthwhile as I naively think it would be.

- Stephen


Re: SPARK-942

2013-11-12 Thread Koert Kuipers
if spark wants to compete as an alternative for mapreduce on hadoop
clusters, then the assumption should not be that 99.9% of time data will
fit in memory. it will not.

however that said, i am fine with a solution where one has to use DISK_ONLY
for this, since that is exactly what mapreduce does too anyhow.


On Mon, Nov 11, 2013 at 8:14 PM, Xia, Junluan  wrote:

> Hi Kyle
>
> I totally agree with you. 'best' solution currently is to only handle
> "DISK_ONLY" scenario and put iterator directly to BlockManager.
>
> It is so expensive for us to make code complicated for only 0.1%
> possibility before we get perfect solution.
>
> -Original Message-
> From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu]
> Sent: Tuesday, November 12, 2013 6:28 AM
> To: dev@spark.incubator.apache.org
> Subject: Re: SPARK-942
>
> The problem is that the iterator interface only defines 'hasNext' and
> 'next' methods. So I don't think that there is really anyway to estimate
> the total count until the iterator is done traversing. In my particular
> case, I'm wrapping a OpenRDF RIO iterator, that is parsing a gzipfile
> stream. And one of the files just happens to be several gigabytes large.
> Each of the individual elements spit out by the iterator are all the same,
> just sometimes it spits out a few million more then normal.
>
> It's not a normal occurrence. 99.9% of the time, when people call 'flatMap'
> they will probably be producing arrays that fit nicely into memory. Trying
> to do a bunch of extra book keeping (ie unrolling the iterator one at a
> time, trying to figure out if it's gotten too big yet), may be an extra
> complication that makes the code much more complicated while only providing
> a solution for extreme edge cases.
>
> I think the 'best' way to go would to leave the 'MEMORY_ONLY' and
> 'MEMORY_AND_DISK' behaviors the same. If the user knows that their code
> could produce these 'mega-iterators' then they pass a 'DISK_ONLY' and that
> iterator gets passed straight to the BlockManager to be written straight to
> disk. Then all we have to do is change "def put(blockId: BlockId, values:
> Iterator[Any], level: StorageLevel, tellMaster: Boolean)"
> (BlockManager.scala:452), to call 'diskStore.putValues' directly, rather
> then unrolling the iterator and passing it onto the stardard 'doPut' like
> it does now.
>
> Kyle
>
>
>
>
> On Mon, Nov 11, 2013 at 5:18 AM, Xia, Junluan 
> wrote:
>
> > Hi
> >
> > I think it is bad user experience to throw OOM exception when user
> > only persist the RDD with DISK_ONLY or MEMORY_ADN_DISK.
> >
> > As Kyle mentioned below, Key point is that CacheManager has unrolled
> > the total Iterator into ArrayBuffer without free memory check, we
> > should estimate size of unrolled iterator object and check if it is
> > beyond current free memory size.
> >
> > We could separate into three scenarios
> >
> > 1. For MEMORY_ONLY, I think it is normal case to throw OOM exception
> > and need user to adjust its application 2. For MEMORY_AND_DISK, we
> > should check if free memory could hold unrolled Arraybuffer, if yes,
> > then it will go with usual path, if no, we will degrade it to
> > DISK_ONLY 3. For DIS_ONLY, I think that we need not to unroll total
> > iterator into ArrayBuffer, because we could write this iterator one by
> > one to disk.
> >
> > So this issue is how to judge if free memory size could hold size of
> > unrolled iterator before it become Arraybuffer.
> >
> > Is there any solution for this case? Could we just unroll first 10% of
> > total iterator into ArrayBuffer, and estimate this size, and total
> > size is equal to 10* size of 10%? apparently it is not perfect.
> >
> > -Original Message-
> > From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu]
> > Sent: Thursday, November 07, 2013 2:59 AM
> > To: dev@spark.incubator.apache.org
> > Subject: Re: SPARK-942
> >
> > I think the usage has to be calculated as the iterator is being put
> > into the arraybuffer.
> > Right now, the BlockManager, in it's put method when it gets an
> > iterator named 'values' uses the simple stanza of:
> >
> > def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
> > tellMaster: Boolean)
> > : Long = {
> > val elements = new ArrayBuffer[Any]
> > elements ++= values
> > put(blockId, elements, level, tellMaster) }
> >
> >
> > Completely unrolling the iterator in a single line.  Abo

RE: SPARK-942

2013-11-11 Thread Xia, Junluan
Hi Kyle

I totally agree with you. 'best' solution currently is to only handle 
"DISK_ONLY" scenario and put iterator directly to BlockManager.

It is so expensive for us to make code complicated for only 0.1% possibility 
before we get perfect solution.

-Original Message-
From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu] 
Sent: Tuesday, November 12, 2013 6:28 AM
To: dev@spark.incubator.apache.org
Subject: Re: SPARK-942

The problem is that the iterator interface only defines 'hasNext' and 'next' 
methods. So I don't think that there is really anyway to estimate the total 
count until the iterator is done traversing. In my particular case, I'm 
wrapping a OpenRDF RIO iterator, that is parsing a gzipfile stream. And one of 
the files just happens to be several gigabytes large.
Each of the individual elements spit out by the iterator are all the same, just 
sometimes it spits out a few million more then normal.

It's not a normal occurrence. 99.9% of the time, when people call 'flatMap'
they will probably be producing arrays that fit nicely into memory. Trying to 
do a bunch of extra book keeping (ie unrolling the iterator one at a time, 
trying to figure out if it's gotten too big yet), may be an extra complication 
that makes the code much more complicated while only providing a solution for 
extreme edge cases.

I think the 'best' way to go would to leave the 'MEMORY_ONLY' and 
'MEMORY_AND_DISK' behaviors the same. If the user knows that their code could 
produce these 'mega-iterators' then they pass a 'DISK_ONLY' and that iterator 
gets passed straight to the BlockManager to be written straight to disk. Then 
all we have to do is change "def put(blockId: BlockId, values:
Iterator[Any], level: StorageLevel, tellMaster: Boolean)"
(BlockManager.scala:452), to call 'diskStore.putValues' directly, rather then 
unrolling the iterator and passing it onto the stardard 'doPut' like it does 
now.

Kyle




On Mon, Nov 11, 2013 at 5:18 AM, Xia, Junluan  wrote:

> Hi
>
> I think it is bad user experience to throw OOM exception when user 
> only persist the RDD with DISK_ONLY or MEMORY_ADN_DISK.
>
> As Kyle mentioned below, Key point is that CacheManager has unrolled 
> the total Iterator into ArrayBuffer without free memory check, we 
> should estimate size of unrolled iterator object and check if it is 
> beyond current free memory size.
>
> We could separate into three scenarios
>
> 1. For MEMORY_ONLY, I think it is normal case to throw OOM exception 
> and need user to adjust its application 2. For MEMORY_AND_DISK, we 
> should check if free memory could hold unrolled Arraybuffer, if yes, 
> then it will go with usual path, if no, we will degrade it to 
> DISK_ONLY 3. For DIS_ONLY, I think that we need not to unroll total 
> iterator into ArrayBuffer, because we could write this iterator one by 
> one to disk.
>
> So this issue is how to judge if free memory size could hold size of 
> unrolled iterator before it become Arraybuffer.
>
> Is there any solution for this case? Could we just unroll first 10% of 
> total iterator into ArrayBuffer, and estimate this size, and total 
> size is equal to 10* size of 10%? apparently it is not perfect.
>
> -Original Message-
> From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu]
> Sent: Thursday, November 07, 2013 2:59 AM
> To: dev@spark.incubator.apache.org
> Subject: Re: SPARK-942
>
> I think the usage has to be calculated as the iterator is being put 
> into the arraybuffer.
> Right now, the BlockManager, in it's put method when it gets an 
> iterator named 'values' uses the simple stanza of:
>
> def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
> tellMaster: Boolean)
> : Long = {
> val elements = new ArrayBuffer[Any]
> elements ++= values
> put(blockId, elements, level, tellMaster) }
>
>
> Completely unrolling the iterator in a single line.  Above it, the 
> CacheManager does the exact same thing with:
>
> val elements = new ArrayBuffer[Any]
> elements ++= computedValues
> blockManager.put(key, elements, storageLevel, tellMaster = true)
>
>
> We would probably have to implement some sort of 'IteratorBuffer' 
> class, which would wrap an iterator. It would include a method to 
> unroll an iterator into a buffer up to a point, something like
>
> def unroll(maxMem:Long) : Boolean ={ ...}
>
> And it would return True if the maxMem was hit. At which point 
> BlockManager could read through the already cached values, then 
> continue on through the rest of the iterators dumping all the values 
> to file. If it unrolled without hitting maxMem (which would probably 
>

Re: SPARK-942

2013-11-11 Thread Kyle Ellrott
The problem is that the iterator interface only defines 'hasNext' and
'next' methods. So I don't think that there is really anyway to estimate
the total count until the iterator is done traversing. In my particular
case, I'm wrapping a OpenRDF RIO iterator, that is parsing a gzipfile
stream. And one of the files just happens to be several gigabytes large.
Each of the individual elements spit out by the iterator are all the same,
just sometimes it spits out a few million more then normal.

It's not a normal occurrence. 99.9% of the time, when people call 'flatMap'
they will probably be producing arrays that fit nicely into memory. Trying
to do a bunch of extra book keeping (ie unrolling the iterator one at a
time, trying to figure out if it's gotten too big yet), may be an extra
complication that makes the code much more complicated while only providing
a solution for extreme edge cases.

I think the 'best' way to go would to leave the 'MEMORY_ONLY' and
'MEMORY_AND_DISK' behaviors the same. If the user knows that their code
could produce these 'mega-iterators' then they pass a 'DISK_ONLY' and that
iterator gets passed straight to the BlockManager to be written straight to
disk. Then all we have to do is change "def put(blockId: BlockId, values:
Iterator[Any], level: StorageLevel, tellMaster: Boolean)"
(BlockManager.scala:452), to call 'diskStore.putValues' directly, rather
then unrolling the iterator and passing it onto the stardard 'doPut' like
it does now.

Kyle




On Mon, Nov 11, 2013 at 5:18 AM, Xia, Junluan  wrote:

> Hi
>
> I think it is bad user experience to throw OOM exception when user only
> persist the RDD with DISK_ONLY or MEMORY_ADN_DISK.
>
> As Kyle mentioned below, Key point is that CacheManager has unrolled the
> total Iterator into ArrayBuffer without free memory check, we should
> estimate size of unrolled iterator object and check if it is beyond current
> free memory size.
>
> We could separate into three scenarios
>
> 1. For MEMORY_ONLY, I think it is normal case to throw OOM exception and
> need user to adjust its application
> 2. For MEMORY_AND_DISK, we should check if free memory could hold unrolled
> Arraybuffer, if yes, then it will go with usual path, if no, we will
> degrade it to DISK_ONLY
> 3. For DIS_ONLY, I think that we need not to unroll total iterator into
> ArrayBuffer, because we could write this iterator one by one to disk.
>
> So this issue is how to judge if free memory size could hold size of
> unrolled iterator before it become Arraybuffer.
>
> Is there any solution for this case? Could we just unroll first 10% of
> total iterator into ArrayBuffer, and estimate this size, and total size is
> equal to 10* size of 10%? apparently it is not perfect.
>
> -Original Message-
> From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu]
> Sent: Thursday, November 07, 2013 2:59 AM
> To: dev@spark.incubator.apache.org
> Subject: Re: SPARK-942
>
> I think the usage has to be calculated as the iterator is being put into
> the arraybuffer.
> Right now, the BlockManager, in it's put method when it gets an iterator
> named 'values' uses the simple stanza of:
>
> def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
> tellMaster: Boolean)
> : Long = {
> val elements = new ArrayBuffer[Any]
> elements ++= values
> put(blockId, elements, level, tellMaster) }
>
>
> Completely unrolling the iterator in a single line.  Above it, the
> CacheManager does the exact same thing with:
>
> val elements = new ArrayBuffer[Any]
> elements ++= computedValues
> blockManager.put(key, elements, storageLevel, tellMaster = true)
>
>
> We would probably have to implement some sort of 'IteratorBuffer' class,
> which would wrap an iterator. It would include a method to unroll an
> iterator into a buffer up to a point, something like
>
> def unroll(maxMem:Long) : Boolean ={ ...}
>
> And it would return True if the maxMem was hit. At which point
> BlockManager could read through the already cached values, then continue on
> through the rest of the iterators dumping all the values to file. If it
> unrolled without hitting maxMem (which would probably be most of the time),
> the class would simply wrap the ArrayBuffer of cached values.
>
> Kyle
>
>
>
> On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin  wrote:
>
> > It's not a very elegant solution, but one possibility is for the
> > CacheManager to check whether it will have enough space. If it is
> > running out of space, skips buffering the output of the iterator &
> > directly write the output of the iterator to disk (if s

RE: SPARK-942

2013-11-11 Thread Xia, Junluan
Hi 

I think it is bad user experience to throw OOM exception when user only persist 
the RDD with DISK_ONLY or MEMORY_ADN_DISK.

As Kyle mentioned below, Key point is that CacheManager has unrolled the total 
Iterator into ArrayBuffer without free memory check, we should estimate size of 
unrolled iterator object and check if it is beyond current free memory size.

We could separate into three scenarios

1. For MEMORY_ONLY, I think it is normal case to throw OOM exception and need 
user to adjust its application
2. For MEMORY_AND_DISK, we should check if free memory could hold unrolled 
Arraybuffer, if yes, then it will go with usual path, if no, we will degrade it 
to DISK_ONLY
3. For DIS_ONLY, I think that we need not to unroll total iterator into 
ArrayBuffer, because we could write this iterator one by one to disk.

So this issue is how to judge if free memory size could hold size of unrolled 
iterator before it become Arraybuffer.

Is there any solution for this case? Could we just unroll first 10% of total 
iterator into ArrayBuffer, and estimate this size, and total size is equal to 
10* size of 10%? apparently it is not perfect.

-Original Message-
From: Kyle Ellrott [mailto:kellr...@soe.ucsc.edu] 
Sent: Thursday, November 07, 2013 2:59 AM
To: dev@spark.incubator.apache.org
Subject: Re: SPARK-942

I think the usage has to be calculated as the iterator is being put into the 
arraybuffer.
Right now, the BlockManager, in it's put method when it gets an iterator named 
'values' uses the simple stanza of:

def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
tellMaster: Boolean)
: Long = {
val elements = new ArrayBuffer[Any]
elements ++= values
put(blockId, elements, level, tellMaster) }


Completely unrolling the iterator in a single line.  Above it, the CacheManager 
does the exact same thing with:

val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)


We would probably have to implement some sort of 'IteratorBuffer' class, which 
would wrap an iterator. It would include a method to unroll an iterator into a 
buffer up to a point, something like

def unroll(maxMem:Long) : Boolean ={ ...}

And it would return True if the maxMem was hit. At which point BlockManager 
could read through the already cached values, then continue on through the rest 
of the iterators dumping all the values to file. If it unrolled without hitting 
maxMem (which would probably be most of the time), the class would simply wrap 
the ArrayBuffer of cached values.

Kyle



On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin  wrote:

> It's not a very elegant solution, but one possibility is for the 
> CacheManager to check whether it will have enough space. If it is 
> running out of space, skips buffering the output of the iterator & 
> directly write the output of the iterator to disk (if storage level allows 
> that).
>
> But it is still tricky to know whether we will run out of space before 
> we even start running the iterator. One possibility is to use sizing 
> data from previous partitions to estimate the size of the current partition 
> (i.e.
> estimated in memory size = avg of current in-memory size / current 
> input size).
>
> Do you have any ideas on this one, Kyle?
>
>
> On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott  >wrote:
>
> > I was wondering if anybody had any thoughts on the best way to 
> > tackle
> > SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ).
> > Basically, Spark takes an iterator from a flatmap call and because I 
> > tell it that it needs to persist Spark proceeds to push it all into 
> > an array before deciding that it doesn't have enough memory and 
> > trying to
> serialize
> > it to disk, and somewhere along the line it runs out of memory. For 
> > my particular operation, the function return an iterator that reads 
> > data out of a file, and the size of the files passed to that 
> > function can vary greatly (from a few kilobytes to a few gigabytes). 
> > The funny thing is
> that
> > if I do a strait 'map' operation after the flat map, everything 
> > works, because Spark just passes the iterator forward and never 
> > tries to expand the whole thing into memory. But I need do a 
> > reduceByKey across all the records, so I'd like to persist to disk 
> > first, and that is where I hit
> this
> > snag.
> > I've already setup a unit test to replicate the problem, and I know 
> > the area of the code that would need to be fixed.
> > I'm just hoping for some tips on the best way to fix the problem.
> >
> > Kyle
> >
>


Re: SPARK-942

2013-11-06 Thread Kyle Ellrott
I think the usage has to be calculated as the iterator is being put into
the arraybuffer.
Right now, the BlockManager, in it's put method when it gets an iterator
named 'values' uses the simple stanza of:

def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
tellMaster: Boolean)
: Long = {
val elements = new ArrayBuffer[Any]
elements ++= values
put(blockId, elements, level, tellMaster)
}


Completely unrolling the iterator in a single line.  Above it, the
CacheManager does the exact same thing with:

val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)


We would probably have to implement some sort of 'IteratorBuffer' class,
which would wrap an iterator. It would include a method to unroll an
iterator into a buffer up to a point, something like

def unroll(maxMem:Long) : Boolean ={ ...}

And it would return True if the maxMem was hit. At which point BlockManager
could read through the already cached values, then continue on through the
rest of the iterators dumping all the values to file. If it unrolled
without hitting maxMem (which would probably be most of the time), the
class would simply wrap the ArrayBuffer of cached values.

Kyle



On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin  wrote:

> It's not a very elegant solution, but one possibility is for the
> CacheManager to check whether it will have enough space. If it is running
> out of space, skips buffering the output of the iterator & directly write
> the output of the iterator to disk (if storage level allows that).
>
> But it is still tricky to know whether we will run out of space before we
> even start running the iterator. One possibility is to use sizing data from
> previous partitions to estimate the size of the current partition (i.e.
> estimated in memory size = avg of current in-memory size / current input
> size).
>
> Do you have any ideas on this one, Kyle?
>
>
> On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott  >wrote:
>
> > I was wondering if anybody had any thoughts on the best way to tackle
> > SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ).
> > Basically, Spark takes an iterator from a flatmap call and because I tell
> > it that it needs to persist Spark proceeds to push it all into an array
> > before deciding that it doesn't have enough memory and trying to
> serialize
> > it to disk, and somewhere along the line it runs out of memory. For my
> > particular operation, the function return an iterator that reads data out
> > of a file, and the size of the files passed to that function can vary
> > greatly (from a few kilobytes to a few gigabytes). The funny thing is
> that
> > if I do a strait 'map' operation after the flat map, everything works,
> > because Spark just passes the iterator forward and never tries to expand
> > the whole thing into memory. But I need do a reduceByKey across all the
> > records, so I'd like to persist to disk first, and that is where I hit
> this
> > snag.
> > I've already setup a unit test to replicate the problem, and I know the
> > area of the code that would need to be fixed.
> > I'm just hoping for some tips on the best way to fix the problem.
> >
> > Kyle
> >
>


Re: SPARK-942

2013-11-03 Thread Reynold Xin
It's not a very elegant solution, but one possibility is for the
CacheManager to check whether it will have enough space. If it is running
out of space, skips buffering the output of the iterator & directly write
the output of the iterator to disk (if storage level allows that).

But it is still tricky to know whether we will run out of space before we
even start running the iterator. One possibility is to use sizing data from
previous partitions to estimate the size of the current partition (i.e.
estimated in memory size = avg of current in-memory size / current input
size).

Do you have any ideas on this one, Kyle?


On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott wrote:

> I was wondering if anybody had any thoughts on the best way to tackle
> SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ).
> Basically, Spark takes an iterator from a flatmap call and because I tell
> it that it needs to persist Spark proceeds to push it all into an array
> before deciding that it doesn't have enough memory and trying to serialize
> it to disk, and somewhere along the line it runs out of memory. For my
> particular operation, the function return an iterator that reads data out
> of a file, and the size of the files passed to that function can vary
> greatly (from a few kilobytes to a few gigabytes). The funny thing is that
> if I do a strait 'map' operation after the flat map, everything works,
> because Spark just passes the iterator forward and never tries to expand
> the whole thing into memory. But I need do a reduceByKey across all the
> records, so I'd like to persist to disk first, and that is where I hit this
> snag.
> I've already setup a unit test to replicate the problem, and I know the
> area of the code that would need to be fixed.
> I'm just hoping for some tips on the best way to fix the problem.
>
> Kyle
>


SPARK-942

2013-10-26 Thread Kyle Ellrott
I was wondering if anybody had any thoughts on the best way to tackle
SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ).
Basically, Spark takes an iterator from a flatmap call and because I tell
it that it needs to persist Spark proceeds to push it all into an array
before deciding that it doesn't have enough memory and trying to serialize
it to disk, and somewhere along the line it runs out of memory. For my
particular operation, the function return an iterator that reads data out
of a file, and the size of the files passed to that function can vary
greatly (from a few kilobytes to a few gigabytes). The funny thing is that
if I do a strait 'map' operation after the flat map, everything works,
because Spark just passes the iterator forward and never tries to expand
the whole thing into memory. But I need do a reduceByKey across all the
records, so I'd like to persist to disk first, and that is where I hit this
snag.
I've already setup a unit test to replicate the problem, and I know the
area of the code that would need to be fixed.
I'm just hoping for some tips on the best way to fix the problem.

Kyle