[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15142372
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
@@ -17,134 +17,68 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
+import java.nio.ByteBuffer
 
 import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-
-private[spark] object ResultTask {
-
-  // A simple map between the stage id to the serialized byte array of a 
task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) => _): Array[Byte] =
-  {
-synchronized {
-  val old = serializedInfoCache.get(stageId).orNull
-  if (old != null) {
-old
-  } else {
-val out = new ByteArrayOutputStream
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objOut = ser.serializeStream(new GZIPOutputStream(out))
-objOut.writeObject(rdd)
-objOut.writeObject(func)
-objOut.close()
-val bytes = out.toByteArray
-serializedInfoCache.put(stageId, bytes)
-bytes
-  }
-}
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) => _) =
-  {
-val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objIn = ser.deserializeStream(in)
-val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
=> _]
-(rdd, func)
-  }
-
-  def removeStage(stageId: Int) {
-serializedInfoCache.remove(stageId)
-  }
-
-  def clearCache() {
-synchronized {
-  serializedInfoCache.clear()
-}
-  }
-}
-
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 
 /**
  * A task that sends back the output to the driver application.
  *
- * See [[org.apache.spark.scheduler.Task]] for more information.
+ * See [[Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd input to func
+ * @param rddBinary broadcast version of of the serialized RDD
  * @param func a function to apply on a partition of the RDD
- * @param _partitionId index of the number in the RDD
+ * @param partition partition of the RDD this task is associated with
  * @param locs preferred task execution locations for locality scheduling
  * @param outputId index of the task in this job (a job can launch tasks 
on only a subset of the
  * input RDD's partitions).
  */
 private[spark] class ResultTask[T, U](
 stageId: Int,
-var rdd: RDD[T],
-var func: (TaskContext, Iterator[T]) => U,
-_partitionId: Int,
+val rddBinary: Broadcast[Array[Byte]],
+val func: (TaskContext, Iterator[T]) => U,
+val partition: Partition,
 @transient locs: Seq[TaskLocation],
-var outputId: Int)
-  extends Task[U](stageId, _partitionId) with Externalizable {
-
-  def this() = this(0, null, null, 0, null, 0)
-
-  var split = if (rdd == null) null else rdd.partitions(partitionId)
+val outputId: Int)
+  extends Task[U](stageId, partition.index) with Serializable {
--- End diff --

@mateiz and I looked and it seems so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1452


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Improve scheduler delay tooltip.

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1488#issuecomment-49501654
  
QA tests have started for PR 1488. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16843/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49501642
  
Thanks for taking a look. I'm merging this one as is, and will submit a 
small PR to fix the issues. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Improve scheduler delay tooltip.

2014-07-18 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/1488#issuecomment-49501586
  
Jenkins, retest this *pretty* please ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Improve scheduler delay tooltip.

2014-07-18 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/1488#issuecomment-49501577
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: put 'curRequestSize = 0' after 'logDebug' it

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1477#issuecomment-49501569
  
QA tests have started for PR 1477. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16842/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fixed the number of worker thread

2014-07-18 Thread fireflyc
Github user fireflyc commented on the pull request:

https://github.com/apache/spark/pull/1485#issuecomment-49501533
  
My program is spark streaming over Hadoop yarn.It work for user click 
stream.
I read code,number of worker threads and block?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: put 'curRequestSize = 0' after 'logDebug' it

2014-07-18 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1477#issuecomment-49501450
  
Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2588][SQL] Add some more DSLs.

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1491#issuecomment-49500046
  
QA results for PR 1491:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16841/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fixed the number of worker thread

2014-07-18 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/1485#issuecomment-49499453
  
Does your patch fix this problem, or do your Executors just hang after you 
reach enough cores? This behavior should not be happening, even with an 
unlimited capacity cached tread pool.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2294: fix locality inversion bug in Task...

2014-07-18 Thread CodingCat
Github user CodingCat commented on the pull request:

https://github.com/apache/spark/pull/1313#issuecomment-49499397
  
@lirui-intel , you are right, we will not get a PROCESS_LOCAL + NOPREF 
setup in any case, since PROCESS_LOCAL is also NODE_LOCAL. I also agree with 
that the valid localities should be recalculated when the executor is lost 
(since we do that when the executer is added, there is no point for us not to 
do the same thing when the executer is gone...)

As @mateiz said, If we add a special locality between NODE_LOCAL and 
RACK_LOCAL, and always pass that as the parameter of findTask, it seems that we 
don't need to recalculate the valid locality upon the completion of each task; 
because PROCESS_LOCAL and NODE_LOCAL will always be executed before noPref and 
when there are only noPref tasks or when the process_local are all started, we 
can also ensure that the noPref tasks are checked for each call of TaskManager 

Hi, all, sorry for the delay of the patch updatea bit hectic week, will 
update it within the weekend...



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2588][SQL] Add some more DSLs.

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1491#issuecomment-49498566
  
QA tests have started for PR 1491. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16841/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2588][SQL] Add some more DSLs.

2014-07-18 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/1491

[SPARK-2588][SQL] Add some more DSLs.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark issues/SPARK-2588

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1491.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1491


commit 2310bf12156d7937181da199ed1d2305d7644cc7
Author: Takuya UESHIN 
Date:   2014-07-15T08:55:00Z

Add some more DSLs.

commit 1023ea0bcd206d4d7f6312dd9726479b8f304a63
Author: Takuya UESHIN 
Date:   2014-07-16T10:53:47Z

Modify tests to use DSLs.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1706: Allow multiple executors per worke...

2014-07-18 Thread CodingCat
Github user CodingCat commented on the pull request:

https://github.com/apache/spark/pull/731#issuecomment-49498510
  
@nishkamravi2 enit's OK to reuse the parameters, I'm just not sure 
which option is more convenient for the user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-18 Thread cfregly
Github user cfregly commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15141666
  
--- Diff: project/SparkBuild.scala ---
@@ -63,6 +63,10 @@ object SparkBuild extends PomBuild {
   println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use 
-Pganglia-lgpl flag.")
   profiles ++= Seq("spark-ganglia-lgpl")
 }
+if (Properties.envOrNone("SPARK_KINESIS_ASL").isDefined) {
+  println("NOTE: SPARK_KINESIS_ASL is deprecated, please use 
-Pspark-kinesis-asl flag.")
+  profiles ++= Seq("spark-ganglia-lgpl")
--- End diff --

good catch, stephen.  thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1341] [Streaming] Throttle BlockGenerat...

2014-07-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/945#discussion_r15141465
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala 
---
@@ -146,6 +146,44 @@ class NetworkReceiverSuite extends FunSuite with 
Timeouts {
 assert(recordedData.toSet === generatedData.toSet)
   }
 
+  test("block generator throttling") {
+val blockGeneratorListener = new FakeBlockGeneratorListener
+val blockInterval = 50
+val maxRate = 200
+val conf = new SparkConf().set("spark.streaming.blockInterval", 
blockInterval.toString).
+  set("spark.streaming.receiver.maxRate", maxRate.toString)
+val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, 
conf)
+val expectedBlocks = 20
+val waitTime = expectedBlocks * blockInterval
+val expectedMessages = maxRate * waitTime / 1000
+val expectedMessagesPerBlock = maxRate * blockInterval / 1000
+val generatedData = new ArrayBuffer[Int]
+
+// Generate blocks
+val startTime = System.currentTimeMillis()
+blockGenerator.start()
+var count = 0
+while(System.currentTimeMillis - startTime < waitTime) {
+  blockGenerator += count
+  generatedData += count
+  count += 1
+  Thread.sleep(1)
+}
+blockGenerator.stop()
+
+val recordedData = blockGeneratorListener.arrayBuffers
+assert(blockGeneratorListener.arrayBuffers.size > 0)
+assert(recordedData.flatten.toSet === generatedData.toSet)
+// recordedData size should be close to the expected rate
+assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
+  recordedData.flatten.size <= expectedMessages * 1.1 )
+// the first and last block may be incomplete, so we slice them out
+recordedData.slice(1, recordedData.size - 1).foreach { block =>
+  assert(block.size >= expectedMessagesPerBlock * 0.8 &&
--- End diff --

Maybe. The Jenkins being often too slow at doing things, it maybe causing
the data rate in this test to be lower than expected.




On Fri, Jul 18, 2014 at 5:29 PM, Kay Ousterhout 
wrote:

> In
> 
streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala:
>
> > +  blockGenerator += count
> > +  generatedData += count
> > +  count += 1
> > +  Thread.sleep(1)
> > +}
> > +blockGenerator.stop()
> > +
> > +val recordedData = blockGeneratorListener.arrayBuffers
> > +assert(blockGeneratorListener.arrayBuffers.size > 0)
> > +assert(recordedData.flatten.toSet === generatedData.toSet)
> > +// recordedData size should be close to the expected rate
> > +assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
> > +  recordedData.flatten.size <= expectedMessages * 1.1 )
> > +// the first and last block may be incomplete, so we slice them out
> > +recordedData.slice(1, recordedData.size - 1).foreach { block =>
> > +  assert(block.size >= expectedMessagesPerBlock * 0.8 &&
>
> This line is failing on Jenkins for an (I think) unrelated change (
> 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16838/consoleFull)
> -- any change this is flaky?
>
> —
> Reply to this email directly or view it on GitHub
> .
>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1165#issuecomment-49497172
  
QA results for PR 1165:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds the following public classes 
(experimental):case class Sample(size: Long, numUpdates: Long)For 
more information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16840/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2553. CoGroupedRDD unnecessarily allocat...

2014-07-18 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1461#issuecomment-49496955
  
Oops, sorry about that, the Jenkins messages look too similar now :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2566. Update ShuffleWriteMetrics increme...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1481#issuecomment-49496103
  
QA results for PR 1481:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16839/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2298: Show stage attempt in UI

2014-07-18 Thread tsudukim
Github user tsudukim commented on the pull request:

https://github.com/apache/spark/pull/1384#issuecomment-49495727
  
Modified PR as your comments. thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2583] ConnectionManager cannot distingu...

2014-07-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1490#issuecomment-49495215
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2583] ConnectionManager cannot distingu...

2014-07-18 Thread sarutak
GitHub user sarutak opened a pull request:

https://github.com/apache/spark/pull/1490

[SPARK-2583] ConnectionManager cannot distinguish whether error occurred or 
not



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sarutak/spark SPARK-2583

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1490


commit e2b8c4a2595c3196821ffed582ceea487d0d65d4
Author: Kousuke Saruta 
Date:   2014-07-19T01:01:35Z

Modify to propagete error using ConnectionManager




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fixed the number of worker thread

2014-07-18 Thread fireflyc
Github user fireflyc commented on the pull request:

https://github.com/apache/spark/pull/1485#issuecomment-49495043
  
My application have 1000+ Worker Threads.

![0e75b115d7a1b2dba97284cf6443b6f0](https://cloud.githubusercontent.com/assets/183107/3633383/d939413c-0edf-11e4-91d0-5ab99df71b59.jpeg)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2587: Fix error message in make-distribu...

2014-07-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1489#issuecomment-49494945
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2587: Fix error message in make-distribu...

2014-07-18 Thread wagnermarkd
GitHub user wagnermarkd opened a pull request:

https://github.com/apache/spark/pull/1489

SPARK-2587: Fix error message in make-distribution.sh

make-distribution.sh gives a slightly off error message when using 
--with-hive.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wagnermarkd/spark SPARK-2587

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1489.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1489


commit 7b5d3ffeaa9018c94f05c7bef7617bb89bcc5528
Author: Mark Wagner 
Date:   2014-07-19T00:52:52Z

SPARK-2587: Fix error message in make-distribution.sh




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1165#issuecomment-49494682
  
QA tests have started for PR 1165. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16840/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fixed the number of worker thread

2014-07-18 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/1485#issuecomment-49494194
  
The tasks launched on an Executor are controlled by the DAGScheduler, and 
should not exceed the number of cores that executor is advertising. In what 
situation have you seen this happening?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2246: Add user-data option to EC2 script...

2014-07-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1186#issuecomment-49494131
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1341] [Streaming] Throttle BlockGenerat...

2014-07-18 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/945#discussion_r15140315
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala 
---
@@ -146,6 +146,44 @@ class NetworkReceiverSuite extends FunSuite with 
Timeouts {
 assert(recordedData.toSet === generatedData.toSet)
   }
 
+  test("block generator throttling") {
+val blockGeneratorListener = new FakeBlockGeneratorListener
+val blockInterval = 50
+val maxRate = 200
+val conf = new SparkConf().set("spark.streaming.blockInterval", 
blockInterval.toString).
+  set("spark.streaming.receiver.maxRate", maxRate.toString)
+val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, 
conf)
+val expectedBlocks = 20
+val waitTime = expectedBlocks * blockInterval
+val expectedMessages = maxRate * waitTime / 1000
+val expectedMessagesPerBlock = maxRate * blockInterval / 1000
+val generatedData = new ArrayBuffer[Int]
+
+// Generate blocks
+val startTime = System.currentTimeMillis()
+blockGenerator.start()
+var count = 0
+while(System.currentTimeMillis - startTime < waitTime) {
+  blockGenerator += count
+  generatedData += count
+  count += 1
+  Thread.sleep(1)
+}
+blockGenerator.stop()
+
+val recordedData = blockGeneratorListener.arrayBuffers
+assert(blockGeneratorListener.arrayBuffers.size > 0)
+assert(recordedData.flatten.toSet === generatedData.toSet)
+// recordedData size should be close to the expected rate
+assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
+  recordedData.flatten.size <= expectedMessages * 1.1 )
+// the first and last block may be incomplete, so we slice them out
+recordedData.slice(1, recordedData.size - 1).foreach { block =>
+  assert(block.size >= expectedMessagesPerBlock * 0.8 &&
--- End diff --

This line is failing on Jenkins for an (I think) unrelated change 
(https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16838/consoleFull)
 -- any change this is flaky?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-49493898
  
LGTM. Thanks for implementing correlations! Merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Improve scheduler delay tooltip.

2014-07-18 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/1488#issuecomment-49493877
  
Jenkins, retest this please


On Fri, Jul 18, 2014 at 5:24 PM, Apache Spark QA 
wrote:

> QA results for PR 1488:
> - This patch FAILED unit tests.
> - This patch merges cleanly
> - This patch adds no public classes
>
> For more information see test ouptut:
>
> 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16838/consoleFull
>
> —
> Reply to this email directly or view it on GitHub
> .
>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1367


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Improve scheduler delay tooltip.

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1488#issuecomment-49493735
  
QA results for PR 1488:- This patch FAILED unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16838/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-49493613
  
QA results for PR 1367:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16837/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fixed the number of worker thread

2014-07-18 Thread fireflyc
Github user fireflyc commented on the pull request:

https://github.com/apache/spark/pull/1485#issuecomment-49493529
  
Will always have a task to run, when the system is idle all threads will 
not participate CPU. When need   run  task need not new a thread.

`fixed` is great way. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2566. Update ShuffleWriteMetrics increme...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1481#issuecomment-49492607
  
QA tests have started for PR 1481. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16839/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1165#issuecomment-49491986
  
QA results for PR 1165:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds the following public classes 
(experimental):case class Sample(size: Long, numUpdates: Long)For 
more information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16836/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49491608
  
QA results for PR 1460:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds the following public classes 
(experimental):class AutoSerializer(FramedSerializer):class 
Merger(object):For more information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16835/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [YARN] SPARK-2577: File upload to viewfs is br...

2014-07-18 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/1483#issuecomment-49491293
  
@tgravescs do you know whether we need to do anything to fetch tokens for 
the destination filesystem?  Otherwise the change looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1706: Allow multiple executors per worke...

2014-07-18 Thread nishkamravi2
Github user nishkamravi2 commented on the pull request:

https://github.com/apache/spark/pull/731#issuecomment-49490129
  
Would it be possible to reuse existing parameters: spark.executor.instances 
and spark.executor.cores instead of introducing new ones? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Improve scheduler delay tooltip.

2014-07-18 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/1488#issuecomment-49489583
  
We don't track it right now but it wouldn't be too hard to add to TaskInfo.
 I'll file a JIRA...not a bad starter thing.


On Fri, Jul 18, 2014 at 4:04 PM, Shivaram Venkataraman <
notificati...@github.com> wrote:

> LGTM. BTW it would be good to add average task size and average task
> result size per stage at the top. Is that something we track ?
>
> —
> Reply to this email directly or view it on GitHub
> .
>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Improve scheduler delay tooltip.

2014-07-18 Thread shivaram
Github user shivaram commented on the pull request:

https://github.com/apache/spark/pull/1488#issuecomment-49489355
  
LGTM. BTW it would be good to add average task size and average task result 
size per stage at the top. Is that something we track ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [YARN] SPARK-2577: File upload to viewfs is br...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1483#issuecomment-49488793
  
QA results for PR 1483:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16833/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2407: Added Parser of SQL SUBSTR()

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1442#issuecomment-49488836
  
QA results for PR 1442:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16834/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Improve scheduler delay tooltip.

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1488#issuecomment-49487961
  
QA tests have started for PR 1488. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16838/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Improve scheduler delay tooltip.

2014-07-18 Thread kayousterhout
GitHub user kayousterhout opened a pull request:

https://github.com/apache/spark/pull/1488

Improve scheduler delay tooltip.

As a result of @shivaram's experience debugging long scheduler delay, I 
think we should improve the tooltip to point people in the right direction if 
scheduler delay is large.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kayousterhout/spark-1 better_tooltips

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1488


commit 22176fd5bed72adfbd4f3ad47bf65921a3aebc15
Author: Kay Ousterhout 
Date:   2014-07-18T22:41:04Z

Improve scheduler delay tooltip.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-49487511
  
QA tests have started for PR 1367. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16837/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49487262
  
Only a set of very minor comments, LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15137404
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -89,62 +31,47 @@ private[spark] object ShuffleMapTask {
  * See [[org.apache.spark.scheduler.Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd the final RDD in this stage
+ * @param rddBinary broadcast version of of the serialized RDD
  * @param dep the ShuffleDependency
- * @param _partitionId index of the number in the RDD
+ * @param partition partition of the RDD this task is associated with
  * @param locs preferred task execution locations for locality scheduling
  */
 private[spark] class ShuffleMapTask(
 stageId: Int,
-var rdd: RDD[_],
+var rddBinary: Broadcast[Array[Byte]],
 var dep: ShuffleDependency[_, _, _],
-_partitionId: Int,
+partition: Partition,
 @transient private var locs: Seq[TaskLocation])
-  extends Task[MapStatus](stageId, _partitionId)
-  with Externalizable
-  with Logging {
-
-  protected def this() = this(0, null, null, 0, null)
+  extends Task[MapStatus](stageId, partition.index) with Logging {
+
+  // TODO: Should we also broadcast the ShuffleDependency? For that we 
would need a place to
--- End diff --

Perhaps JIRA-ize this one too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15137295
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
@@ -17,134 +17,68 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
+import java.nio.ByteBuffer
 
 import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-
-private[spark] object ResultTask {
-
-  // A simple map between the stage id to the serialized byte array of a 
task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) => _): Array[Byte] =
-  {
-synchronized {
-  val old = serializedInfoCache.get(stageId).orNull
-  if (old != null) {
-old
-  } else {
-val out = new ByteArrayOutputStream
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objOut = ser.serializeStream(new GZIPOutputStream(out))
-objOut.writeObject(rdd)
-objOut.writeObject(func)
-objOut.close()
-val bytes = out.toByteArray
-serializedInfoCache.put(stageId, bytes)
-bytes
-  }
-}
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) => _) =
-  {
-val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objIn = ser.deserializeStream(in)
-val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
=> _]
-(rdd, func)
-  }
-
-  def removeStage(stageId: Int) {
-serializedInfoCache.remove(stageId)
-  }
-
-  def clearCache() {
-synchronized {
-  serializedInfoCache.clear()
-}
-  }
-}
-
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 
 /**
  * A task that sends back the output to the driver application.
  *
- * See [[org.apache.spark.scheduler.Task]] for more information.
+ * See [[Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd input to func
+ * @param rddBinary broadcast version of of the serialized RDD
  * @param func a function to apply on a partition of the RDD
- * @param _partitionId index of the number in the RDD
+ * @param partition partition of the RDD this task is associated with
  * @param locs preferred task execution locations for locality scheduling
  * @param outputId index of the task in this job (a job can launch tasks 
on only a subset of the
  * input RDD's partitions).
  */
 private[spark] class ResultTask[T, U](
 stageId: Int,
-var rdd: RDD[T],
-var func: (TaskContext, Iterator[T]) => U,
-_partitionId: Int,
+val rddBinary: Broadcast[Array[Byte]],
+val func: (TaskContext, Iterator[T]) => U,
+val partition: Partition,
 @transient locs: Seq[TaskLocation],
-var outputId: Int)
-  extends Task[U](stageId, _partitionId) with Externalizable {
-
-  def this() = this(0, null, null, 0, null, 0)
-
-  var split = if (rdd == null) null else rdd.partitions(partitionId)
+val outputId: Int)
+  extends Task[U](stageId, partition.index) with Serializable {
+
+  // TODO: Should we also broadcast func? For that we would need a place to
--- End diff --

Perhaps we can just turn this into a JIRA rather than keeping it here in 
the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15137265
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
@@ -17,134 +17,68 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
+import java.nio.ByteBuffer
 
 import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-
-private[spark] object ResultTask {
-
-  // A simple map between the stage id to the serialized byte array of a 
task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) => _): Array[Byte] =
-  {
-synchronized {
-  val old = serializedInfoCache.get(stageId).orNull
-  if (old != null) {
-old
-  } else {
-val out = new ByteArrayOutputStream
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objOut = ser.serializeStream(new GZIPOutputStream(out))
-objOut.writeObject(rdd)
-objOut.writeObject(func)
-objOut.close()
-val bytes = out.toByteArray
-serializedInfoCache.put(stageId, bytes)
-bytes
-  }
-}
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) => _) =
-  {
-val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objIn = ser.deserializeStream(in)
-val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
=> _]
-(rdd, func)
-  }
-
-  def removeStage(stageId: Int) {
-serializedInfoCache.remove(stageId)
-  }
-
-  def clearCache() {
-synchronized {
-  serializedInfoCache.clear()
-}
-  }
-}
-
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 
 /**
  * A task that sends back the output to the driver application.
  *
- * See [[org.apache.spark.scheduler.Task]] for more information.
+ * See [[Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd input to func
+ * @param rddBinary broadcast version of of the serialized RDD
  * @param func a function to apply on a partition of the RDD
- * @param _partitionId index of the number in the RDD
+ * @param partition partition of the RDD this task is associated with
  * @param locs preferred task execution locations for locality scheduling
  * @param outputId index of the task in this job (a job can launch tasks 
on only a subset of the
  * input RDD's partitions).
  */
 private[spark] class ResultTask[T, U](
 stageId: Int,
-var rdd: RDD[T],
-var func: (TaskContext, Iterator[T]) => U,
-_partitionId: Int,
+val rddBinary: Broadcast[Array[Byte]],
+val func: (TaskContext, Iterator[T]) => U,
+val partition: Partition,
 @transient locs: Seq[TaskLocation],
-var outputId: Int)
-  extends Task[U](stageId, _partitionId) with Externalizable {
-
-  def this() = this(0, null, null, 0, null, 0)
-
-  var split = if (rdd == null) null else rdd.partitions(partitionId)
+val outputId: Int)
+  extends Task[U](stageId, partition.index) with Serializable {
--- End diff --

Is partitionId the same thing as partition.index?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-18 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15137226
  
--- Diff: python/pyspark/serializers.py ---
@@ -297,6 +297,33 @@ class MarshalSerializer(FramedSerializer):
 loads = marshal.loads
 
 
+class AutoSerializer(FramedSerializer):
+"""
+Choose marshal or cPickle as serialization protocol autumatically
+"""
+def __init__(self):
+FramedSerializer.__init__(self)
+self._type = None
+
+def dumps(self, obj):
+try:
+if self._type is not None:
+raise TypeError("fallback")
+return 'M' + marshal.dumps(obj)
+except Exception:
+self._type = 'P'
+return 'P' + cPickle.dumps(obj, -1)
--- End diff --

I had add an fast path for it, no exception cost any more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15137230
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
@@ -17,134 +17,68 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
+import java.nio.ByteBuffer
 
 import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-
-private[spark] object ResultTask {
-
-  // A simple map between the stage id to the serialized byte array of a 
task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) => _): Array[Byte] =
-  {
-synchronized {
-  val old = serializedInfoCache.get(stageId).orNull
-  if (old != null) {
-old
-  } else {
-val out = new ByteArrayOutputStream
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objOut = ser.serializeStream(new GZIPOutputStream(out))
-objOut.writeObject(rdd)
-objOut.writeObject(func)
-objOut.close()
-val bytes = out.toByteArray
-serializedInfoCache.put(stageId, bytes)
-bytes
-  }
-}
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) => _) =
-  {
-val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objIn = ser.deserializeStream(in)
-val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
=> _]
-(rdd, func)
-  }
-
-  def removeStage(stageId: Int) {
-serializedInfoCache.remove(stageId)
-  }
-
-  def clearCache() {
-synchronized {
-  serializedInfoCache.clear()
-}
-  }
-}
-
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 
 /**
  * A task that sends back the output to the driver application.
  *
- * See [[org.apache.spark.scheduler.Task]] for more information.
+ * See [[Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd input to func
+ * @param rddBinary broadcast version of of the serialized RDD
--- End diff --

also past tense -- broadcasted


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15137220
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
@@ -17,134 +17,68 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
+import java.nio.ByteBuffer
 
 import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-
-private[spark] object ResultTask {
-
-  // A simple map between the stage id to the serialized byte array of a 
task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) => _): Array[Byte] =
-  {
-synchronized {
-  val old = serializedInfoCache.get(stageId).orNull
-  if (old != null) {
-old
-  } else {
-val out = new ByteArrayOutputStream
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objOut = ser.serializeStream(new GZIPOutputStream(out))
-objOut.writeObject(rdd)
-objOut.writeObject(func)
-objOut.close()
-val bytes = out.toByteArray
-serializedInfoCache.put(stageId, bytes)
-bytes
-  }
-}
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) => _) =
-  {
-val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objIn = ser.deserializeStream(in)
-val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
=> _]
-(rdd, func)
-  }
-
-  def removeStage(stageId: Int) {
-serializedInfoCache.remove(stageId)
-  }
-
-  def clearCache() {
-synchronized {
-  serializedInfoCache.clear()
-}
-  }
-}
-
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 
 /**
  * A task that sends back the output to the driver application.
  *
- * See [[org.apache.spark.scheduler.Task]] for more information.
+ * See [[Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd input to func
+ * @param rddBinary broadcast version of of the serialized RDD
--- End diff --

*of - ha!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-18 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15137187
  
--- Diff: python/pyspark/serializers.py ---
@@ -297,6 +297,33 @@ class MarshalSerializer(FramedSerializer):
 loads = marshal.loads
 
 
+class AutoSerializer(FramedSerializer):
+"""
+Choose marshal or cPickle as serialization protocol autumatically
+"""
+def __init__(self):
+FramedSerializer.__init__(self)
+self._type = None
+
+def dumps(self, obj):
+try:
+if self._type is not None:
+raise TypeError("fallback")
+return 'M' + marshal.dumps(obj)
+except Exception:
+self._type = 'P'
+return 'P' + cPickle.dumps(obj, -1)
+
+def loads(self, stream):
+_type = stream[0]
+if _type == 'M':
+return marshal.loads(stream[1:])
+elif _type == 'P':
+return cPickle.loads(stream[1:])
--- End diff --

the input of loads() is an string, I had update the name of argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-18 Thread srosenthal
Github user srosenthal commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15137055
  
--- Diff: project/SparkBuild.scala ---
@@ -63,6 +63,10 @@ object SparkBuild extends PomBuild {
   println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use 
-Pganglia-lgpl flag.")
   profiles ++= Seq("spark-ganglia-lgpl")
 }
+if (Properties.envOrNone("SPARK_KINESIS_ASL").isDefined) {
+  println("NOTE: SPARK_KINESIS_ASL is deprecated, please use 
-Pspark-kinesis-asl flag.")
+  profiles ++= Seq("spark-ganglia-lgpl")
--- End diff --

Looks like a copy-paste error? Did you mean to use "spark-kinesis-asl" here 
instead of "spark-ganglia-lgpl"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1981] Add AWS Kinesis streaming support

2014-07-18 Thread srosenthal
Github user srosenthal commented on a diff in the pull request:

https://github.com/apache/spark/pull/1434#discussion_r15137079
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -467,6 +468,62 @@ For more details on these additional sources, see the 
corresponding [API documen
 Furthermore, you can also implement your own custom receiver for your 
sources. See the
 [Custom Receiver Guide](streaming-custom-receivers.html).
 
+### Kinesis
+Build notes:
+Spark supports a Kinesis Streaming Receiver which is not included in 
the default build due to licensing restrictions.
+_**Note that by embedding this library you will include 
[ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.
+For sbt users, set the `SPARK_KINESIS_ASL` environment variable before 
building.
+For Maven users, enable the `-Pspark-kinesis-asl` profile.
+User applications will need to link to the `spark-kinesis-asl` 
artifact.
+The Spark Kinesis Streaming Receiver source code, examples, tests, and 
artifacts live in $SPARK_HOME/extras/spark-kinesis-asl.
+
+Deployment and runtime notes:
+Each shard of a stream is processed by one or more KinesisReceiver's 
managed by the Kinesis Client Library (KCL) Worker.
+Said differently, a single KinesisReceiver can process many shards of 
a stream.
+You never need more KinesisReceivers than the number of shards in your 
stream.
+The Kinesis assembly jar must also be present on all worker nodes, as 
they will need access to the Kinesis Client Library.
+/tmp/checkpoint is a valid and accessible directory on all workers (or 
locally if running in local mode)
+This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials in the following order of precedence:
+1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+2) Java System Properties - aws.accessKeyId and aws.secretKey
+3) Credential profiles file - default location (~/.aws/credentials) 
shared by all AWS SDKs
+4) Instance profile credentials - delivered through the Amazon EC2 
metadata service
+
+You need to setup a Kinesis stream with 1 or more shards per the 
following:
+ 
http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html
+When you first start up the KinesisReceiver, the Kinesis Client 
Library (KCL) needs ~30s to establish connectivity with the AWS Kinesis service,
+retrieve any checkpoint data, and negotiate with other KCL's reading from 
the same stream.
+During testing, I noticed varying degrees of delays while retrieving 
records from Kinesis depending on which coffee shop in San Francisco I was 
working.
+The input and output data eventually matched, but sometimes after an 
unusually long time.
+Be careful when changing the app name.  Kinesis maintains a mapping 
table in DynamoDB based on this app name 
(http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization).
  
+Changing the app name could lead to Kinesis errors as only 1 logical 
application can process a stream.
+
+Failure recovery notes:
+The combination of Spark Streaming and Kinesis creates 3 different 
checkpoints as follows:
+  1) RDD data checkpoint (Spark Streaming) - frequency is configurable 
with DStream.checkpoint(Duration)
+  2) RDD metadata checkpoint (Spark Streaming) - frequency is every 
DStream batch
+  3) Kinesis checkpointing (Kinesis) - frequency is controlled by the 
developer calling ICheckpointer.checkpoint() directly
+
+During testing, if you see the same data being read from the stream 
twice, it's likely due to the Kinesis checkpoints not being written.
+Checkpointing too freqently will cause excess load on the AWS 
checkpoint storage layer and may lead to AWS throttling
+Upon startup, a KinesisReceiver will begin processing records with 
sequence numbers greater than the last checkpoint sequence number recorded per 
shard.
+If no checkpoint info exists, the worker will start either from the 
oldest record available (InitialPositionInStream.TRIM_HORIZON)
+or from the tip/latest (InitialPostitionInStream.LATEST).  This is 
configurable.
+When pulling from the stream tip (InitialPositionInStream.LATEST), 
only new stream data will be picked up after the KinesisReceiver starts.
+InitialPositionInStream.LATEST could lead to missed records if data is 
added to the stream while no KinesisReceivers are running.
+In production, you'll want to switch to 
InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis 
limit) of previous stream data
+depending on the checkpoint frequency.
+InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing 
of records depending on the checkpoint frequency.
+Record processing should be idempotent when possible.
+Failed or

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15137023
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+private[stat] object SpearmansCorrelation extends Correlation with Logging 
{
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+if (numCols > 50) {
+  logWarning("Computing the Spearman correlation matrix can be slow 
for large RDDs with more"
++ " than 50 columns.")
+}
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map { case (vector, index) => (vector(k), 
index) }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
+   * Note that positions here are 0-indexed, instead of the 1-indexed as 
in the definition for
+   * ranks in the standard definition for Spearman's correlation. This 
does not affect the final
+   * results and is slightly more performant.
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var firstRank = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
+ 

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15136854
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+private[stat] object SpearmansCorrelation extends Correlation with Logging 
{
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+if (numCols > 50) {
+  logWarning("Computing the Spearman correlation matrix can be slow 
for large RDDs with more"
++ " than 50 columns.")
+}
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map { case (vector, index) => (vector(k), 
index) }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
+   * Note that positions here are 0-indexed, instead of the 1-indexed as 
in the definition for
+   * ranks in the standard definition for Spearman's correlation. This 
does not affect the final
+   * results and is slightly more performant.
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var firstRank = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
--

[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-18 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15136736
  
--- Diff: python/pyspark/rdd.py ---
@@ -168,6 +170,123 @@ def _replaceRoot(self, value):
 self._sink(1)
 
 
+class Merger(object):
+"""
+External merger will dump the aggregated data into disks when memory 
usage is above
+the limit, then merge them together.
+
+>>> combiner = lambda x, y:x+y
+>>> merger = Merger(combiner, 10)
+>>> N = 1
+>>> merger.merge(zip(xrange(N), xrange(N)) * 10)
+>>> merger.spills
+100
+>>> sum(1 for k,v in merger.iteritems())
+1
+"""
+
+PARTITIONS = 64
+BATCH = 1000
+
+def __init__(self, combiner, memory_limit=256, path="/tmp/pyspark", 
serializer=None):
+self.combiner = combiner
+self.path = os.path.join(path, str(os.getpid()))
+self.memory_limit = memory_limit
+self.serializer = serializer or 
BatchedSerializer(AutoSerializer(), 1024)
+self.item_limit = None
+self.data = {}
+self.pdata = []
+self.spills = 0
+
+def used_memory(self):
+rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+if platform.system() == 'Linux':
+rss >>= 10
+elif platform.system() == 'Darwin':
+rss >>= 20
--- End diff --

In most cases, the merger will work in no-spill mode. So it's better to 
fallback into no-spill mode when in Windows, an show an warning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1165#issuecomment-49485183
  
QA tests have started for PR 1165. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16836/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49484674
  
QA tests have started for PR 1460. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16835/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1767: Prefer HDFS-cached replicas when s...

2014-07-18 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/1486#issuecomment-49483833
  
Does choice 1 (/this patch) change anything about the order in which things 
are scheduled?  My understanding (based on 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L183)
 is that we use a task's preferred locations to create a hash map of hosts to 
tasks that can be run on them, and then when one of the hosts becomes free, 
we'll schedule one of the tasks in the corresponding entry in the hash map.  
So, I don't think the order of preferred Locations for a task has any effect on 
scheduling order?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2523] [SQL] [WIP] Hadoop table scan bug...

2014-07-18 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/1439#issuecomment-49483743
  
I'll just add the the `HiveTableReader` vs `HiveTableScan` separation is 
purely artificial, and the split is based on what code was stolen from Shark vs 
what code was written for Spark SQL.  It would be reasonable to combine them at 
some point.  However, for this PR it would be great to just fix the bug at hand.

If we are going to do major refactoring I'd want to see benchmarks showing 
that we aren't introducing any performance regressions.

It would also be nice to see a test case that would be currently failing 
but passes after this PR is added.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Modify default YARN memory_overhead-- from an ...

2014-07-18 Thread nishkamravi2
Github user nishkamravi2 commented on the pull request:

https://github.com/apache/spark/pull/1391#issuecomment-49483642
  
6% was experimentally obtained (with the goal of keeping the bound as tight 
as possible without the containers crashing). Three workloads were experimented 
with: PageRank, WordCount and KMeans over moderate to large input datasets and 
configured such that the containers are optimally utilized (neither 
under-utilized nor over-subscribed). Based on my observations, less than 5% is 
a no-no. If someone would like to tune this parameter more and make a case for 
a higher value (keeping in mind that this is a default value that will not 
cover all workloads), that would be helpful. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15135411
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+private[stat] object SpearmansCorrelation extends Correlation with Logging 
{
--- End diff --

Spearman's correlation is a lot more prevalent (whereas Pearson correlation 
is much more common), but changing it to SpearmanCorrelation for consistency. 
This was why I wanted to provide fuzzy string matching.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2571] Correctly report shuffle read met...

2014-07-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1476


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2571] Correctly report shuffle read met...

2014-07-18 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/1476#issuecomment-49482886
  
Thanks for the review @andrewor14 and @aarondav ! Merged into master (I 
decided not to back port -- I figured since no one has noticed this bug it's 
not worth back porting -- but happy to go ahead and do that if someone thinks 
it's worthwhile).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2540] [SQL] Add HiveDecimal & HiveVarch...

2014-07-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1436


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15135084
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+private[stat] object SpearmansCorrelation extends Correlation with Logging 
{
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+if (numCols > 50) {
+  logWarning("Computing the Spearman correlation matrix can be slow 
for large RDDs with more"
++ " than 50 columns.")
+}
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map { case (vector, index) => (vector(k), 
index) }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
+   * Note that positions here are 0-indexed, instead of the 1-indexed as 
in the definition for
+   * ranks in the standard definition for Spearman's correlation. This 
does not affect the final
+   * results and is slightly more performant.
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var firstRank = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
+  padded.flatMap { item =>
+ 

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15135115
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+private[stat] object SpearmansCorrelation extends Correlation with Logging 
{
+
+  /**
+   * Compute Spearman's correlation for two datasets.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute Spearman's correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j.
+   *
+   * Input RDD[Vector] should be cached or checkpointed if possible since 
it would be split into
+   * numCol RDD[Double]s, each of which sorted, and the joined back into a 
single RDD[Vector].
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val indexed = X.zipWithUniqueId()
+
+val numCols = X.first.size
+if (numCols > 50) {
+  logWarning("Computing the Spearman correlation matrix can be slow 
for large RDDs with more"
++ " than 50 columns.")
+}
+val ranks = new Array[RDD[(Long, Double)]](numCols)
+
+// Note: we use a for loop here instead of a while loop with a single 
index variable
+// to avoid race condition caused by closure serialization
+for (k <- 0 until numCols) {
+  val column = indexed.map { case (vector, index) => (vector(k), 
index) }
+  ranks(k) = getRanks(column)
+}
+
+val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
+PearsonCorrelation.computeCorrelationMatrix(ranksMat)
+  }
+
+  /**
+   * Compute the ranks for elements in the input RDD, using the average 
method for ties.
+   *
+   * With the average method, elements with the same value receive the 
same rank that's computed
+   * by taking the average of their positions in the sorted list.
+   * e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
+   * Note that positions here are 0-indexed, instead of the 1-indexed as 
in the definition for
+   * ranks in the standard definition for Spearman's correlation. This 
does not affect the final
+   * results and is slightly more performant.
+   *
+   * @param indexed RDD[(Double, Long)] containing pairs of the format 
(originalValue, uniqueId)
+   * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, 
rank), where uniqueId is
+   * copied from the input RDD.
+   */
+  private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] 
= {
+// Get elements' positions in the sorted list for computing average 
rank for duplicate values
+val sorted = indexed.sortByKey().zipWithIndex()
+
+val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
+  // add an extra element to signify the end of the list so that 
flatMap can flush the last
+  // batch of duplicates
+  val padded = iter ++
+Iterator[((Double, Long), Long)](((Double.NaN, Long.MinValue), 
Long.MinValue))
+  var lastVal = 0.0
+  var firstRank = 0.0
+  val IDBuffer = new ArrayBuffer[Long]()
--- End diff --

Use camelCase

[GitHub] spark pull request: [SPARK-2540] [SQL] Add HiveDecimal & HiveVarch...

2014-07-18 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/1436#issuecomment-49482636
  
Thanks! I've merged this into master and 1.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15134693
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmansCorrelation.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+
+/**
+ * Compute Spearman's correlation for two RDDs of the type RDD[Double] or 
the correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Spearman's correlation can be found at
+ * http://en.wikipedia.org/wiki/Spearman's_rank_correlation_coefficient
+ */
+private[stat] object SpearmansCorrelation extends Correlation with Logging 
{
--- End diff --

Should it be called `SpearmanCorrelation` instead of `SpearmansCorrelation`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15134590
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.Logging
+import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.rdd.RDD
+
+/**
+ * Compute Pearson correlation for two RDDs of the type RDD[Double] or the 
correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ */
+private[stat] object PearsonCorrelation extends Correlation with Logging {
+
+  /**
+   * Compute the Pearson correlation for two datasets. NaN if either 
vector has 0 variance.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j. 0 covariance results in a 
correlation value of Double.NaN.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val rowMatrix = new RowMatrix(X)
+val cov = rowMatrix.computeCovariance()
+computeCorrelationMatrixFromCovariance(cov)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix from the covariance matrix.
+   * 0 covariance results in a correlation value of Double.NaN.
+   */
+  def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): 
Matrix = {
+val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]]
+val n = cov.cols
+
+// Compute the standard deviation on the diagonals first
+var i = 0
+while (i < n) {
+  // TODO remove once covariance numerical issue resolved.
+  cov(i, i) = if (closeToZero(cov(i, i))) 0.0 else math.sqrt(cov(i, i))
+  i +=1
+}
+
+// Loop through columns since cov is column major
+var j = 0
+var sigma = 0.0
+var containNaN = false
+while (j < n) {
+  sigma = cov(j, j)
+  i = 0
+  while (i < j) {
+val corr = if (sigma == 0.0 || cov(i, i) == 0.0) {
+  containNaN = true
+  Double.NaN
+} else {
+  cov(i, j) / (sigma * cov(i, i))
+}
+cov(i, j) = corr
+cov(j, i) = corr
+i += 1
+  }
+  j += 1
+}
+
+// put 1.0 on the diagonals
+i = 0
+while (i < n) {
+  cov(i, i) = 1.0
+  i +=1
+}
+
+if (containNaN) {
+  logWarning("Pearson correlation matrix contains NaN values.")
+}
+
+Matrices.fromBreeze(cov)
+  }
+
+  private def closeToZero(value: Double, threshhold: Double = 10e-12): 
Boolean = {
--- End diff --

Using `10e-12` is not very common. Did you mean `1e-12` or `1e-11`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15134609
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.Logging
+import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector}
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.rdd.RDD
+
+/**
+ * Compute Pearson correlation for two RDDs of the type RDD[Double] or the 
correlation matrix
+ * for an RDD of the type RDD[Vector].
+ *
+ * Definition of Pearson correlation can be found at
+ * 
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
+ */
+private[stat] object PearsonCorrelation extends Correlation with Logging {
+
+  /**
+   * Compute the Pearson correlation for two datasets. NaN if either 
vector has 0 variance.
+   */
+  override def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double 
= {
+computeCorrelationWithMatrixImpl(x, y)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix S, for the input matrix, where 
S(i, j) is the
+   * correlation between column i and j. 0 covariance results in a 
correlation value of Double.NaN.
+   */
+  override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
+val rowMatrix = new RowMatrix(X)
+val cov = rowMatrix.computeCovariance()
+computeCorrelationMatrixFromCovariance(cov)
+  }
+
+  /**
+   * Compute the Pearson correlation matrix from the covariance matrix.
+   * 0 covariance results in a correlation value of Double.NaN.
+   */
+  def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): 
Matrix = {
+val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]]
+val n = cov.cols
+
+// Compute the standard deviation on the diagonals first
+var i = 0
+while (i < n) {
+  // TODO remove once covariance numerical issue resolved.
+  cov(i, i) = if (closeToZero(cov(i, i))) 0.0 else math.sqrt(cov(i, i))
+  i +=1
+}
+
+// Loop through columns since cov is column major
+var j = 0
+var sigma = 0.0
+var containNaN = false
+while (j < n) {
+  sigma = cov(j, j)
+  i = 0
+  while (i < j) {
+val corr = if (sigma == 0.0 || cov(i, i) == 0.0) {
+  containNaN = true
+  Double.NaN
+} else {
+  cov(i, j) / (sigma * cov(i, i))
+}
+cov(i, j) = corr
+cov(j, i) = corr
+i += 1
+  }
+  j += 1
+}
+
+// put 1.0 on the diagonals
+i = 0
+while (i < n) {
+  cov(i, i) = 1.0
+  i +=1
+}
+
+if (containNaN) {
+  logWarning("Pearson correlation matrix contains NaN values.")
+}
+
+Matrices.fromBreeze(cov)
+  }
+
+  private def closeToZero(value: Double, threshhold: Double = 10e-12): 
Boolean = {
+math.abs(value - 0.0) <= threshhold
--- End diff --

`math.abs(value)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2535][SQL] Add StringComparison case to...

2014-07-18 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/1451#issuecomment-49481477
  
Thanks for doing this, and even adding tests for my code :)

Merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2535][SQL] Add StringComparison case to...

2014-07-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1451


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2294: fix locality inversion bug in Task...

2014-07-18 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/1313#issuecomment-49481250
  
@mridulm I think in the example you talked about, the idea is the 
TaskSetManagers are supposed to be iterated over in the "fair" order -- so it's 
fine if we launch rack-local tasks for task set manager A even if there's a 
later task set manager that could have launched a node-local task (because the 
fact that we tried A first means that scheduling the tasks for A is higher 
priority).

In terms of what to do for this patch, it seems like we should do @mateiz's 
suggestion where we add an extra, special locality level that gets called by 
TaskSchedulerImpl after all of the valid locality levels, that will allow 
speculative and no-pref tasks to be launched.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15134018
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * Trait for correlation algorithms.
+ */
+private[stat] trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j. S(i, j) can be NaN if the correlation is 
undefined for column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]. Can be NaN if correlation 
is undefined for the
+   * input vectors.
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions { iter =>
+  iter.map { case (xi, yi) => new DenseVector(Array(xi, yi)) }
+}
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Maintains the default correlation type, pearson
+ */
+private[stat] object Correlations {
+
+  // Note: after new types of correlations are implemented, please update 
this map
+  val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", 
SpearmansCorrelation))
+  val defaultCorrName: String = "pearson"
+  val defaultCorr: Correlation = nameToObjectMap(defaultCorrName)
+
+  def corr(x: RDD[Double], y: RDD[Double], method: String = 
defaultCorrName): Double = {
+val correlation = getCorrelationFromName(method)
+correlation.computeCorrelation(x, y)
+  }
+
+  def corrMatrix(X: RDD[Vector], method: String = defaultCorrName): Matrix 
= {
+val correlation = getCorrelationFromName(method)
+correlation.computeCorrelationMatrix(X)
+  }
+
+  /**
+   * Perform simple string processing to match the input correlation name 
with a known name
+   *
+   * private to mllib for ease of unit testing
+   */
+  private[mllib] def getCorrelationFromName(method: String): Correlation = 
{
--- End diff --

This function could be further simplified since you have an object map. 
Querying on the map directly should be sufficient. Iterating over its entries 
is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2407: Added Parser of SQL SUBSTR()

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1442#issuecomment-49480325
  
QA tests have started for PR 1442. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16834/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [YARN] SPARK-2577: File upload to viewfs is br...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1483#issuecomment-49480308
  
QA tests have started for PR 1483. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16833/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15133921
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * Trait for correlation algorithms.
+ */
+private[stat] trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j. S(i, j) can be NaN if the correlation is 
undefined for column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]. Can be NaN if correlation 
is undefined for the
+   * input vectors.
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions { iter =>
--- End diff --

`map` is more concise here than `mapPartitions`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2269] Refactor mesos scheduler resource...

2014-07-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1487#issuecomment-49480185
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15133872
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.mllib.stat.correlation.Correlations
+import org.apache.spark.rdd.RDD
+
+/**
+ * API for statistical functions in MLlib
+ */
+@Experimental
+object Statistics {
+
+  /**
+   * Compute the Pearson correlation matrix for the input RDD of Vectors.
+   * Returns NaN if either vector has 0 variance.
+   *
+   * @param X an RDD[Vector] for which the correlation matrix is to be 
computed.
+   * @return Pearson correlation matrix comparing columns in X.
+   */
+  def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
+
+  /**
+   * Compute the correlation matrix for the input RDD of Vectors using the 
specified method.
+   * Methods currently supported: pearson (default), spearman
+   *
+   * Note that for Spearman, a rank correlation, we need to create an 
RDD[Double] for each column
+   * and sort it in order to retrieve the ranks and then join the columns 
back into an RDD[Vector],
+   * which is fairly costly. Cache the input RDD before calling corr with 
`method = "spearman"` to
+   * avoid recomputing the common lineage
+   *
+   * @param X an RDD[Vector] for which the correlation matrix is to be 
computed.
+   * @param method String specifying the method to use for computing 
correlation
+   * @return Correlation matrix comparing columns in X.
+   */
+  def corr(X: RDD[Vector], method: String): Matrix = 
Correlations.corrMatrix(X, method)
+
+  /**
+   * Compute the Pearson correlation for the input RDDs.
+   * Columns with 0 covariance produce NaN entries in the correlation 
matrix.
+   *
+   * @param x RDD[Double] of the same cardinality as y
+   * @param y RDD[Double] of the same cardinality as x
+   * @return A Double containing the Pearson correlation between the two 
input RDD[Double]s
+   */
+  def corr(x: RDD[Double], y: RDD[Double]): Double = Correlations.corr(x, 
y)
+
+  /**
+   * Compute the correlation for the input RDDs using the specified method.
+   * Methods currently supported: pearson (default), spearman
+   *
+   * @param x RDD[Double] of the same cardinality as y
+   * @param y RDD[Double] of the same cardinality as x
+   * @param method String specifying the method to use for computing 
correlation
--- End diff --

ditto.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Modify default YARN memory_overhead-- from an ...

2014-07-18 Thread tgravescs
Github user tgravescs commented on the pull request:

https://github.com/apache/spark/pull/1391#issuecomment-49480064
  
I'll let mridul comment on this but I think adding a comment where 0.06 
came from would be useful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15133828
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.mllib.stat.correlation.Correlations
+import org.apache.spark.rdd.RDD
+
+/**
+ * API for statistical functions in MLlib
+ */
+@Experimental
+object Statistics {
+
+  /**
+   * Compute the Pearson correlation matrix for the input RDD of Vectors.
+   * Returns NaN if either vector has 0 variance.
+   *
+   * @param X an RDD[Vector] for which the correlation matrix is to be 
computed.
+   * @return Pearson correlation matrix comparing columns in X.
+   */
+  def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
+
+  /**
+   * Compute the correlation matrix for the input RDD of Vectors using the 
specified method.
+   * Methods currently supported: pearson (default), spearman
+   *
+   * Note that for Spearman, a rank correlation, we need to create an 
RDD[Double] for each column
+   * and sort it in order to retrieve the ranks and then join the columns 
back into an RDD[Vector],
+   * which is fairly costly. Cache the input RDD before calling corr with 
`method = "spearman"` to
+   * avoid recomputing the common lineage
--- End diff --

Missing `.` at the end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r15133829
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.mllib.stat.correlation.Correlations
+import org.apache.spark.rdd.RDD
+
+/**
+ * API for statistical functions in MLlib
+ */
+@Experimental
+object Statistics {
+
+  /**
+   * Compute the Pearson correlation matrix for the input RDD of Vectors.
+   * Returns NaN if either vector has 0 variance.
+   *
+   * @param X an RDD[Vector] for which the correlation matrix is to be 
computed.
+   * @return Pearson correlation matrix comparing columns in X.
+   */
+  def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
+
+  /**
+   * Compute the correlation matrix for the input RDD of Vectors using the 
specified method.
+   * Methods currently supported: pearson (default), spearman
+   *
+   * Note that for Spearman, a rank correlation, we need to create an 
RDD[Double] for each column
+   * and sort it in order to retrieve the ranks and then join the columns 
back into an RDD[Vector],
+   * which is fairly costly. Cache the input RDD before calling corr with 
`method = "spearman"` to
+   * avoid recomputing the common lineage
+   *
+   * @param X an RDD[Vector] for which the correlation matrix is to be 
computed.
+   * @param method String specifying the method to use for computing 
correlation
--- End diff --

Besides mentioning in the method doc, could you also list the supported 
method names and the default values in this line? It may be easier for users to 
find what the options are.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2269] Refactor mesos scheduler resource...

2014-07-18 Thread tnachen
GitHub user tnachen opened a pull request:

https://github.com/apache/spark/pull/1487

[SPARK-2269] Refactor mesos scheduler resourceOffers and add unit test



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tnachen/spark resource_offer_refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1487


commit 614459c8cf31add01c9c1d0a94fe09bdcfcd4019
Author: Timothy Chen 
Date:   2014-07-18T21:09:10Z

Refactor mesos scheduler resourceOffers and add unit test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2407: Added Parser of SQL SUBSTR()

2014-07-18 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/1442#issuecomment-49479943
  
test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [YARN] SPARK-2577: File upload to viewfs is br...

2014-07-18 Thread tgravescs
Github user tgravescs commented on the pull request:

https://github.com/apache/spark/pull/1483#issuecomment-49479880
  
Jenkins, test this please 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2571] Correctly report shuffle read met...

2014-07-18 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1476#discussion_r15133729
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -527,8 +527,9 @@ private[spark] object JsonProtocol {
 metrics.resultSerializationTime = (json \ "Result Serialization 
Time").extract[Long]
 metrics.memoryBytesSpilled = (json \ "Memory Bytes 
Spilled").extract[Long]
 metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
-metrics.shuffleReadMetrics =
-  Utils.jsonOption(json \ "Shuffle Read 
Metrics").map(shuffleReadMetricsFromJson)
+Utils.jsonOption(json \ "Shuffle Read Metrics").map { 
shuffleReadMetrics =>
+  
metrics.updateShuffleReadMetrics(shuffleReadMetricsFromJson(shuffleReadMetrics))
--- End diff --

What do you mean here?  updateShuffleReadMetrics is the only way to set it 
right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix sbt script

2014-07-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/260#issuecomment-49479738
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2571] Correctly report shuffle read met...

2014-07-18 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1476#discussion_r15133588
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -75,7 +75,9 @@ class TaskMetrics extends Serializable {
   /**
* If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here
*/
-  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+
+  def shuffleReadMetrics = _shuffleReadMetrics
--- End diff --

That's fine


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: fix compile error for hadoop CDH 4.4+

2014-07-18 Thread tgravescs
Github user tgravescs commented on the pull request:

https://github.com/apache/spark/pull/151#issuecomment-49479425
  
 I am hoping to deprecate the hadoop 0.23/yarn-alpha at some point, 
hopefully late this year, but we'll have to figure out which spark release 
makes sense to deprecate it and exact timeline on when we get everything off of 
it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [MLlib] SPARK-1536: multiclass classification ...

2014-07-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/886


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [MLlib] SPARK-1536: multiclass classification ...

2014-07-18 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/886#issuecomment-49479172
  
@manishamde Thanks for adding multiclass support to decision tree! And 
thank everyone for the code review!

Merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2571] Correctly report shuffle read met...

2014-07-18 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1476#discussion_r15133407
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -75,7 +75,9 @@ class TaskMetrics extends Serializable {
   /**
* If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here
*/
-  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+
+  def shuffleReadMetrics = _shuffleReadMetrics
--- End diff --

I was trying to force people to use updateShuffleReadMetrics() rather than 
directly updating shuffleReadMetrics to avoid accidentally overwriting existing 
values...happy to do this in a different way though if you think something else 
would be better / more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1367#issuecomment-49477683
  
QA results for PR 1367:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds no public classesFor more 
information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16830/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2564. ShuffleReadMetrics.totalBlocksRead...

2014-07-18 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/1474#issuecomment-49477720
  
As far as I can tell the test failures don't seem to be related.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >