spark git commit: [SPARK-6765] Fix test code style for streaming.

2015-04-08 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 8d2a36c0f - 15e0d2bd1


[SPARK-6765] Fix test code style for streaming.

So we can turn style checker on for test code.

Author: Reynold Xin r...@databricks.com

Closes #5409 from rxin/test-style-streaming and squashes the following commits:

7aea69b [Reynold Xin] [SPARK-6765] Fix test code style for streaming.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15e0d2bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15e0d2bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15e0d2bd

Branch: refs/heads/master
Commit: 15e0d2bd1304db62fad286c1bb687e87c361e16c
Parents: 8d2a36c
Author: Reynold Xin r...@databricks.com
Authored: Wed Apr 8 00:24:59 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Wed Apr 8 00:24:59 2015 -0700

--
 .../flume/FlumePollingStreamSuite.scala | 29 ++---
 .../streaming/flume/FlumeStreamSuite.scala  |  4 +-
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  |  3 +-
 .../spark/streaming/BasicOperationsSuite.scala  |  6 ++-
 .../spark/streaming/CheckpointSuite.scala   | 45 +++-
 .../apache/spark/streaming/FailureSuite.scala   |  4 +-
 .../spark/streaming/InputStreamsSuite.scala | 15 ---
 .../streaming/ReceivedBlockHandlerSuite.scala   |  4 +-
 .../streaming/ReceivedBlockTrackerSuite.scala   |  6 ++-
 .../apache/spark/streaming/ReceiverSuite.scala  | 11 ++---
 .../spark/streaming/StreamingContextSuite.scala |  5 ++-
 .../streaming/StreamingListenerSuite.scala  |  4 +-
 .../apache/spark/streaming/TestSuiteBase.scala  | 28 ++--
 .../spark/streaming/UISeleniumSuite.scala   |  3 +-
 .../spark/streaming/WindowOperationsSuite.scala |  4 +-
 .../rdd/WriteAheadLogBackedBlockRDDSuite.scala  | 12 --
 .../streaming/scheduler/JobGeneratorSuite.scala |  2 +-
 .../streaming/util/WriteAheadLogSuite.scala |  2 +-
 .../spark/streamingtest/ImplicitSuite.scala |  3 +-
 19 files changed, 115 insertions(+), 75 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
--
diff --git 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index e04d408..2edea9b 100644
--- 
a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ 
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -1,21 +1,20 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
+
 package org.apache.spark.streaming.flume
 
 import java.net.InetSocketAddress
@@ -213,7 +212,7 @@ class FlumePollingStreamSuite extends FunSuite with 
BeforeAndAfter with Logging
 assert(counter === totalEventsPerChannel * channels.size)
   }
 
-  def assertChannelIsEmpty(channel: MemoryChannel) = {
+  def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
 val queueRemaining = channel.getClass.getDeclaredField(queueRemaining)
 queueRemaining.setAccessible(true)

spark git commit: [SPARK-6506] [pyspark] Do not try to retrieve SPARK_HOME when not needed...

2015-04-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 15e0d2bd1 - f7e21dd1e


[SPARK-6506] [pyspark] Do not try to retrieve SPARK_HOME when not needed...



In particular, this makes pyspark in yarn-cluster mode fail unless
SPARK_HOME is set, when it's not really needed.

Author: Marcelo Vanzin van...@cloudera.com

Closes #5405 from vanzin/SPARK-6506 and squashes the following commits:

e184507 [Marcelo Vanzin] [SPARK-6506] [pyspark] Do not try to retrieve 
SPARK_HOME when not needed.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7e21dd1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7e21dd1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7e21dd1

Branch: refs/heads/master
Commit: f7e21dd1ec4541be54eb01d8b15cfcc6714feed0
Parents: 15e0d2b
Author: Marcelo Vanzin van...@cloudera.com
Authored: Wed Apr 8 10:14:52 2015 -0700
Committer: Josh Rosen joshro...@databricks.com
Committed: Wed Apr 8 10:14:52 2015 -0700

--
 python/pyspark/java_gateway.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f7e21dd1/python/pyspark/java_gateway.py
--
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 0a16cbd..2a5e84a 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -29,11 +29,10 @@ from pyspark.serializers import read_int
 
 
 def launch_gateway():
-SPARK_HOME = os.environ[SPARK_HOME]
-
 if PYSPARK_GATEWAY_PORT in os.environ:
 gateway_port = int(os.environ[PYSPARK_GATEWAY_PORT])
 else:
+SPARK_HOME = os.environ[SPARK_HOME]
 # Launch the Py4j gateway using Spark's run command so that we pick up 
the
 # proper classpath and settings from spark-env.sh
 on_windows = platform.system() == Windows


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6506] [pyspark] Do not try to retrieve SPARK_HOME when not needed...

2015-04-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 cdef7d080 - e967ecaca


[SPARK-6506] [pyspark] Do not try to retrieve SPARK_HOME when not needed...



In particular, this makes pyspark in yarn-cluster mode fail unless
SPARK_HOME is set, when it's not really needed.

Author: Marcelo Vanzin van...@cloudera.com

Closes #5405 from vanzin/SPARK-6506 and squashes the following commits:

e184507 [Marcelo Vanzin] [SPARK-6506] [pyspark] Do not try to retrieve 
SPARK_HOME when not needed.

(cherry picked from commit f7e21dd1ec4541be54eb01d8b15cfcc6714feed0)
Signed-off-by: Josh Rosen joshro...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e967ecac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e967ecac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e967ecac

Branch: refs/heads/branch-1.3
Commit: e967ecacad8075ef521fbc1a501e074c861d0fe7
Parents: cdef7d0
Author: Marcelo Vanzin van...@cloudera.com
Authored: Wed Apr 8 10:14:52 2015 -0700
Committer: Josh Rosen joshro...@databricks.com
Committed: Wed Apr 8 10:15:10 2015 -0700

--
 python/pyspark/java_gateway.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e967ecac/python/pyspark/java_gateway.py
--
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 936857e..19ee2e3 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -29,11 +29,10 @@ from pyspark.serializers import read_int
 
 
 def launch_gateway():
-SPARK_HOME = os.environ[SPARK_HOME]
-
 if PYSPARK_GATEWAY_PORT in os.environ:
 gateway_port = int(os.environ[PYSPARK_GATEWAY_PORT])
 else:
+SPARK_HOME = os.environ[SPARK_HOME]
 # Launch the Py4j gateway using Spark's run command so that we pick up 
the
 # proper classpath and settings from spark-env.sh
 on_windows = platform.system() == Windows


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6753] Clone SparkConf in ShuffleSuite tests

2015-04-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 f7fe87f4b - 7a1583917


[SPARK-6753] Clone SparkConf in ShuffleSuite tests

Prior to this change, the unit test for SPARK-3426 did not clone the
original SparkConf, which meant that that test did not use the options
set by suites that subclass ShuffleSuite.scala. This commit fixes that
problem.

JoshRosen would be great if you could take a look at this, since you wrote this
test originally.

Author: Kay Ousterhout kayousterh...@gmail.com

Closes #5401 from kayousterhout/SPARK-6753 and squashes the following commits:

368c540 [Kay Ousterhout] [SPARK-6753] Clone SparkConf in ShuffleSuite tests

(cherry picked from commit 9d44ddce1d1e19011026605549c37d0db6d6afa1)
Signed-off-by: Josh Rosen joshro...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a158391
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a158391
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a158391

Branch: refs/heads/branch-1.2
Commit: 7a1583917e4a73a464d3db57406fe708a9283c7c
Parents: f7fe87f
Author: Kay Ousterhout kayousterh...@gmail.com
Authored: Wed Apr 8 10:26:45 2015 -0700
Committer: Josh Rosen joshro...@databricks.com
Committed: Wed Apr 8 10:27:21 2015 -0700

--
 core/src/test/scala/org/apache/spark/ShuffleSuite.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7a158391/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 96cb8e4..64202e3 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -243,14 +243,14 @@ abstract class ShuffleSuite extends FunSuite with 
Matchers with LocalSparkContex
   shuffleSpillCompress - Set(true, false);
   shuffleCompress - Set(true, false)
 ) {
-  val conf = new SparkConf()
+  val myConf = conf.clone()
 .setAppName(test)
 .setMaster(local)
 .set(spark.shuffle.spill.compress, shuffleSpillCompress.toString)
 .set(spark.shuffle.compress, shuffleCompress.toString)
 .set(spark.shuffle.memoryFraction, 0.001)
   resetSparkContext()
-  sc = new SparkContext(conf)
+  sc = new SparkContext(myConf)
   try {
 sc.parallelize(0 until 10).map(i = (i / 4, 
i)).groupByKey().collect()
   } catch {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6753] Clone SparkConf in ShuffleSuite tests

2015-04-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 39761f515 - ee06e9271


[SPARK-6753] Clone SparkConf in ShuffleSuite tests

Prior to this change, the unit test for SPARK-3426 did not clone the
original SparkConf, which meant that that test did not use the options
set by suites that subclass ShuffleSuite.scala. This commit fixes that
problem.

JoshRosen would be great if you could take a look at this, since you wrote this
test originally.

Author: Kay Ousterhout kayousterh...@gmail.com

Closes #5401 from kayousterhout/SPARK-6753 and squashes the following commits:

368c540 [Kay Ousterhout] [SPARK-6753] Clone SparkConf in ShuffleSuite tests

(cherry picked from commit 9d44ddce1d1e19011026605549c37d0db6d6afa1)
Signed-off-by: Josh Rosen joshro...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee06e927
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee06e927
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee06e927

Branch: refs/heads/branch-1.1
Commit: ee06e9271017d2730147ac08a228c771630b3688
Parents: 39761f5
Author: Kay Ousterhout kayousterh...@gmail.com
Authored: Wed Apr 8 10:26:45 2015 -0700
Committer: Josh Rosen joshro...@databricks.com
Committed: Wed Apr 8 10:27:37 2015 -0700

--
 core/src/test/scala/org/apache/spark/ShuffleSuite.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ee06e927/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 625702b..eba3948 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -244,14 +244,14 @@ class ShuffleSuite extends FunSuite with Matchers with 
LocalSparkContext {
   shuffleSpillCompress - Set(true, false);
   shuffleCompress - Set(true, false)
 ) {
-  val conf = new SparkConf()
+  val myConf = conf.clone()
 .setAppName(test)
 .setMaster(local)
 .set(spark.shuffle.spill.compress, shuffleSpillCompress.toString)
 .set(spark.shuffle.compress, shuffleCompress.toString)
 .set(spark.shuffle.memoryFraction, 0.001)
   resetSparkContext()
-  sc = new SparkContext(conf)
+  sc = new SparkContext(myConf)
   try {
 sc.parallelize(0 until 10).map(i = (i / 4, 
i)).groupByKey().collect()
   } catch {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6753] Clone SparkConf in ShuffleSuite tests

2015-04-08 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 e967ecaca - 3b655680c


[SPARK-6753] Clone SparkConf in ShuffleSuite tests

Prior to this change, the unit test for SPARK-3426 did not clone the
original SparkConf, which meant that that test did not use the options
set by suites that subclass ShuffleSuite.scala. This commit fixes that
problem.

JoshRosen would be great if you could take a look at this, since you wrote this
test originally.

Author: Kay Ousterhout kayousterh...@gmail.com

Closes #5401 from kayousterhout/SPARK-6753 and squashes the following commits:

368c540 [Kay Ousterhout] [SPARK-6753] Clone SparkConf in ShuffleSuite tests

(cherry picked from commit 9d44ddce1d1e19011026605549c37d0db6d6afa1)
Signed-off-by: Josh Rosen joshro...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b655680
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b655680
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b655680

Branch: refs/heads/branch-1.3
Commit: 3b655680c4f5c71c903af5a71c96447e03350f93
Parents: e967eca
Author: Kay Ousterhout kayousterh...@gmail.com
Authored: Wed Apr 8 10:26:45 2015 -0700
Committer: Josh Rosen joshro...@databricks.com
Committed: Wed Apr 8 10:27:07 2015 -0700

--
 core/src/test/scala/org/apache/spark/ShuffleSuite.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3b655680/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index f57921b..30b6184 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -242,14 +242,14 @@ abstract class ShuffleSuite extends FunSuite with 
Matchers with LocalSparkContex
   shuffleSpillCompress - Set(true, false);
   shuffleCompress - Set(true, false)
 ) {
-  val conf = new SparkConf()
+  val myConf = conf.clone()
 .setAppName(test)
 .setMaster(local)
 .set(spark.shuffle.spill.compress, shuffleSpillCompress.toString)
 .set(spark.shuffle.compress, shuffleCompress.toString)
 .set(spark.shuffle.memoryFraction, 0.001)
   resetSparkContext()
-  sc = new SparkContext(conf)
+  sc = new SparkContext(myConf)
   try {
 sc.parallelize(0 until 10).map(i = (i / 4, 
i)).groupByKey().collect()
   } catch {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6765] Fix test code style for graphx.

2015-04-08 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 9d44ddce1 - 8d812f998


[SPARK-6765] Fix test code style for graphx.

So we can turn style checker on for test code.

Author: Reynold Xin r...@databricks.com

Closes #5410 from rxin/test-style-graphx and squashes the following commits:

89e253a [Reynold Xin] [SPARK-6765] Fix test code style for graphx.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d812f99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d812f99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d812f99

Branch: refs/heads/master
Commit: 8d812f9986f2edf420a18ca822711c9765f480e2
Parents: 9d44ddc
Author: Reynold Xin r...@databricks.com
Authored: Wed Apr 8 11:31:48 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Wed Apr 8 11:31:48 2015 -0700

--
 .../org/apache/spark/graphx/GraphSuite.scala| 71 +++-
 .../apache/spark/graphx/LocalSparkContext.scala |  2 +-
 .../apache/spark/graphx/VertexRDDSuite.scala| 26 +++
 .../graphx/lib/ConnectedComponentsSuite.scala   | 18 ++---
 .../apache/spark/graphx/lib/PageRankSuite.scala | 33 -
 .../lib/StronglyConnectedComponentsSuite.scala  | 23 ---
 6 files changed, 88 insertions(+), 85 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8d812f99/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
--
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 8d15150..a570e4e 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -38,12 +38,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
   val doubleRing = ring ++ ring
   val graph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1)
   assert(graph.edges.count() === doubleRing.size)
-  assert(graph.edges.collect.forall(e = e.attr == 1))
+  assert(graph.edges.collect().forall(e = e.attr == 1))
 
   // uniqueEdges option should uniquify edges and store duplicate count in 
edge attributes
   val uniqueGraph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1, 
Some(RandomVertexCut))
   assert(uniqueGraph.edges.count() === ring.size)
-  assert(uniqueGraph.edges.collect.forall(e = e.attr == 2))
+  assert(uniqueGraph.edges.collect().forall(e = e.attr == 2))
 }
   }
 
@@ -64,7 +64,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
   assert( graph.edges.count() === rawEdges.size )
   // Vertices not explicitly provided but referenced by edges should be 
created automatically
   assert( graph.vertices.count() === 100)
-  graph.triplets.collect.map { et =
+  graph.triplets.collect().map { et =
 assert((et.srcId  10  et.srcAttr) || (et.srcId = 10  
!et.srcAttr))
 assert((et.dstId  10  et.dstAttr) || (et.dstId = 10  
!et.dstAttr))
   }
@@ -75,15 +75,17 @@ class GraphSuite extends FunSuite with LocalSparkContext {
 withSpark { sc =
   val n = 5
   val star = starGraph(sc, n)
-  assert(star.triplets.map(et = (et.srcId, et.dstId, et.srcAttr, 
et.dstAttr)).collect.toSet ===
-(1 to n).map(x = (0: VertexId, x: VertexId, v, v)).toSet)
+  assert(star.triplets.map(et = (et.srcId, et.dstId, et.srcAttr, 
et.dstAttr)).collect().toSet
+=== (1 to n).map(x = (0: VertexId, x: VertexId, v, v)).toSet)
 }
   }
 
   test(partitionBy) {
 withSpark { sc =
-  def mkGraph(edges: List[(Long, Long)]) = 
Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
-  def nonemptyParts(graph: Graph[Int, Int]) = {
+  def mkGraph(edges: List[(Long, Long)]): Graph[Int, Int] = {
+Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
+  }
+  def nonemptyParts(graph: Graph[Int, Int]): RDD[List[Edge[Int]]] = {
 graph.edges.partitionsRDD.mapPartitions { iter =
   Iterator(iter.next()._2.iterator.toList)
 }.filter(_.nonEmpty)
@@ -102,7 +104,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
   
assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count 
=== 1)
   // partitionBy(CanonicalRandomVertexCut) puts edges that are identical 
modulo direction into
   // the same partition
-  
assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count
 === 1)
+  assert(
+
nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count
 === 1)
   // partitionBy(EdgePartition2D) puts identical edges in the same 
partition
   
assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count
 

spark git commit: [SPARK-6767][SQL] Fixed Query DSL error in spark sql Readme

2015-04-08 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 6ada4f6f5 - 2f482d706


[SPARK-6767][SQL] Fixed Query DSL error in spark sql Readme

Fixed the  following error
query.where('key  30).select(avg('key)).collect()
console:43: error: value  is not a member of Symbol
  query.where('key  30).select(avg('key)).collect()

Author: Tijo Thomas tijopara...@gmail.com

Closes #5415 from tijoparacka/ERROR_SQL_DATAFRAME_EXAMPLE and squashes the 
following commits:

234751e [Tijo Thomas] Fixed Query DSL error in spark sql Readme


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f482d70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f482d70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f482d70

Branch: refs/heads/master
Commit: 2f482d706b9d38820472c3152dbd1612c98729bd
Parents: 6ada4f6
Author: Tijo Thomas tijopara...@gmail.com
Authored: Wed Apr 8 13:42:29 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Wed Apr 8 13:42:29 2015 -0700

--
 sql/README.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2f482d70/sql/README.md
--
diff --git a/sql/README.md b/sql/README.md
index fbb3200..237620e 100644
--- a/sql/README.md
+++ b/sql/README.md
@@ -56,6 +56,6 @@ res2: Array[org.apache.spark.sql.Row] = Array([238,val_238], 
[86,val_86], [311,v
 
 You can also build further queries on top of these `DataFrames` using the 
query DSL.
 ```
-scala query.where('key  30).select(avg('key)).collect()
+scala query.where(query(key)  30).select(avg(query(key))).collect()
 res3: Array[org.apache.spark.sql.Row] = Array([274.79025423728814])
 ```


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6781] [SQL] use sqlContext in python shell

2015-04-08 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 66159c350 - 6ada4f6f5


[SPARK-6781] [SQL] use sqlContext in python shell

Use `sqlContext` in PySpark shell, make it consistent with SQL programming 
guide. `sqlCtx` is also kept for compatibility.

Author: Davies Liu dav...@databricks.com

Closes #5425 from davies/sqlCtx and squashes the following commits:

af67340 [Davies Liu] sqlCtx - sqlContext
15a278f [Davies Liu] use sqlContext in python shell


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ada4f6f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ada4f6f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ada4f6f

Branch: refs/heads/master
Commit: 6ada4f6f52cf1d992c7ab0c32318790cf08b0a0d
Parents: 66159c3
Author: Davies Liu dav...@databricks.com
Authored: Wed Apr 8 13:31:45 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Wed Apr 8 13:31:45 2015 -0700

--
 docs/ml-guide.md|  2 +-
 docs/sql-programming-guide.md   |  4 +-
 .../apache/spark/examples/sql/JavaSparkSQL.java | 20 ++---
 .../ml/simple_text_classification_pipeline.py   |  2 +-
 .../src/main/python/mllib/dataset_example.py|  6 +-
 python/pyspark/ml/classification.py |  4 +-
 python/pyspark/ml/feature.py|  4 +-
 python/pyspark/shell.py |  6 +-
 python/pyspark/sql/context.py   | 79 ++--
 python/pyspark/sql/dataframe.py |  6 +-
 python/pyspark/sql/functions.py |  2 +-
 python/pyspark/sql/types.py |  4 +-
 12 files changed, 69 insertions(+), 70 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6ada4f6f/docs/ml-guide.md
--
diff --git a/docs/ml-guide.md b/docs/ml-guide.md
index c08c76d..771a071 100644
--- a/docs/ml-guide.md
+++ b/docs/ml-guide.md
@@ -493,7 +493,7 @@ from pyspark.ml.feature import HashingTF, Tokenizer
 from pyspark.sql import Row, SQLContext
 
 sc = SparkContext(appName=SimpleTextClassificationPipeline)
-sqlCtx = SQLContext(sc)
+sqlContext = SQLContext(sc)
 
 # Prepare training documents, which are labeled.
 LabeledDocument = Row(id, text, label)

http://git-wip-us.apache.org/repos/asf/spark/blob/6ada4f6f/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 4441d6a..663f656 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1642,7 +1642,7 @@ moved into the udf object in `SQLContext`.
 div data-lang=scala  markdown=1
 {% highlight java %}
 
-sqlCtx.udf.register(strLen, (s: String) = s.length())
+sqlContext.udf.register(strLen, (s: String) = s.length())
 
 {% endhighlight %}
 /div
@@ -1650,7 +1650,7 @@ sqlCtx.udf.register(strLen, (s: String) = s.length())
 div data-lang=java  markdown=1
 {% highlight java %}
 
-sqlCtx.udf().register(strLen, (String s) - { s.length(); });
+sqlContext.udf().register(strLen, (String s) - { s.length(); });
 
 {% endhighlight %}
 /div

http://git-wip-us.apache.org/repos/asf/spark/blob/6ada4f6f/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
index dee7948..8159ffb 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
@@ -55,7 +55,7 @@ public class JavaSparkSQL {
   public static void main(String[] args) throws Exception {
 SparkConf sparkConf = new SparkConf().setAppName(JavaSparkSQL);
 JavaSparkContext ctx = new JavaSparkContext(sparkConf);
-SQLContext sqlCtx = new SQLContext(ctx);
+SQLContext sqlContext = new SQLContext(ctx);
 
 System.out.println(=== Data source: RDD ===);
 // Load a text file and convert each line to a Java Bean.
@@ -74,11 +74,11 @@ public class JavaSparkSQL {
   });
 
 // Apply a schema to an RDD of Java Beans and register it as a table.
-DataFrame schemaPeople = sqlCtx.createDataFrame(people, Person.class);
+DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
 schemaPeople.registerTempTable(people);
 
 // SQL can be run over RDDs that have been registered as tables.
-DataFrame teenagers = sqlCtx.sql(SELECT name FROM people WHERE age = 13 
AND age = 19);
+DataFrame teenagers = sqlContext.sql(SELECT name FROM people WHERE age = 
13 AND age = 19);
 
 // The results of SQL queries are 

spark git commit: [SPARK-6781] [SQL] use sqlContext in python shell

2015-04-08 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 3b655680c - e1afd479b


[SPARK-6781] [SQL] use sqlContext in python shell

Use `sqlContext` in PySpark shell, make it consistent with SQL programming 
guide. `sqlCtx` is also kept for compatibility.

Author: Davies Liu dav...@databricks.com

Closes #5425 from davies/sqlCtx and squashes the following commits:

af67340 [Davies Liu] sqlCtx - sqlContext
15a278f [Davies Liu] use sqlContext in python shell

(cherry picked from commit 6ada4f6f52cf1d992c7ab0c32318790cf08b0a0d)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1afd479
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1afd479
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1afd479

Branch: refs/heads/branch-1.3
Commit: e1afd479b3446483e6e1626afdec549cc214d80e
Parents: 3b65568
Author: Davies Liu dav...@databricks.com
Authored: Wed Apr 8 13:31:45 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Wed Apr 8 13:32:00 2015 -0700

--
 docs/ml-guide.md|  2 +-
 docs/sql-programming-guide.md   |  4 +-
 .../apache/spark/examples/sql/JavaSparkSQL.java | 20 ++---
 .../ml/simple_text_classification_pipeline.py   |  2 +-
 .../src/main/python/mllib/dataset_example.py|  6 +-
 python/pyspark/ml/classification.py |  4 +-
 python/pyspark/ml/feature.py|  4 +-
 python/pyspark/shell.py |  6 +-
 python/pyspark/sql/context.py   | 79 ++--
 python/pyspark/sql/dataframe.py |  6 +-
 python/pyspark/sql/functions.py |  2 +-
 python/pyspark/sql/types.py |  4 +-
 12 files changed, 69 insertions(+), 70 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e1afd479/docs/ml-guide.md
--
diff --git a/docs/ml-guide.md b/docs/ml-guide.md
index c08c76d..771a071 100644
--- a/docs/ml-guide.md
+++ b/docs/ml-guide.md
@@ -493,7 +493,7 @@ from pyspark.ml.feature import HashingTF, Tokenizer
 from pyspark.sql import Row, SQLContext
 
 sc = SparkContext(appName=SimpleTextClassificationPipeline)
-sqlCtx = SQLContext(sc)
+sqlContext = SQLContext(sc)
 
 # Prepare training documents, which are labeled.
 LabeledDocument = Row(id, text, label)

http://git-wip-us.apache.org/repos/asf/spark/blob/e1afd479/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 4441d6a..663f656 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1642,7 +1642,7 @@ moved into the udf object in `SQLContext`.
 div data-lang=scala  markdown=1
 {% highlight java %}
 
-sqlCtx.udf.register(strLen, (s: String) = s.length())
+sqlContext.udf.register(strLen, (s: String) = s.length())
 
 {% endhighlight %}
 /div
@@ -1650,7 +1650,7 @@ sqlCtx.udf.register(strLen, (s: String) = s.length())
 div data-lang=java  markdown=1
 {% highlight java %}
 
-sqlCtx.udf().register(strLen, (String s) - { s.length(); });
+sqlContext.udf().register(strLen, (String s) - { s.length(); });
 
 {% endhighlight %}
 /div

http://git-wip-us.apache.org/repos/asf/spark/blob/e1afd479/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
index dee7948..8159ffb 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
@@ -55,7 +55,7 @@ public class JavaSparkSQL {
   public static void main(String[] args) throws Exception {
 SparkConf sparkConf = new SparkConf().setAppName(JavaSparkSQL);
 JavaSparkContext ctx = new JavaSparkContext(sparkConf);
-SQLContext sqlCtx = new SQLContext(ctx);
+SQLContext sqlContext = new SQLContext(ctx);
 
 System.out.println(=== Data source: RDD ===);
 // Load a text file and convert each line to a Java Bean.
@@ -74,11 +74,11 @@ public class JavaSparkSQL {
   });
 
 // Apply a schema to an RDD of Java Beans and register it as a table.
-DataFrame schemaPeople = sqlCtx.createDataFrame(people, Person.class);
+DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
 schemaPeople.registerTempTable(people);
 
 // SQL can be run over RDDs that have been registered as tables.
-DataFrame teenagers = sqlCtx.sql(SELECT name FROM people WHERE age = 13 
AND age = 19);
+

spark git commit: [SPARK-6767][SQL] Fixed Query DSL error in spark sql Readme

2015-04-08 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 e1afd479b - 4453c591a


[SPARK-6767][SQL] Fixed Query DSL error in spark sql Readme

Fixed the  following error
query.where('key  30).select(avg('key)).collect()
console:43: error: value  is not a member of Symbol
  query.where('key  30).select(avg('key)).collect()

Author: Tijo Thomas tijopara...@gmail.com

Closes #5415 from tijoparacka/ERROR_SQL_DATAFRAME_EXAMPLE and squashes the 
following commits:

234751e [Tijo Thomas] Fixed Query DSL error in spark sql Readme

(cherry picked from commit 2f482d706b9d38820472c3152dbd1612c98729bd)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4453c591
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4453c591
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4453c591

Branch: refs/heads/branch-1.3
Commit: 4453c591a2eaa9381766f6155bfd3e7749f721e0
Parents: e1afd47
Author: Tijo Thomas tijopara...@gmail.com
Authored: Wed Apr 8 13:42:29 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Wed Apr 8 13:43:05 2015 -0700

--
 sql/README.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4453c591/sql/README.md
--
diff --git a/sql/README.md b/sql/README.md
index a792499..0125de6 100644
--- a/sql/README.md
+++ b/sql/README.md
@@ -57,6 +57,6 @@ res2: Array[org.apache.spark.sql.Row] = Array([238,val_238], 
[86,val_86], [311,v
 
 You can also build further queries on top of these `DataFrames` using the 
query DSL.
 ```
-scala query.where('key  30).select(avg('key)).collect()
+scala query.where(query(key)  30).select(avg(query(key))).collect()
 res3: Array[org.apache.spark.sql.Row] = Array([274.79025423728814])
 ```


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6451][SQL] supported code generation for CombineSum

2015-04-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 941828054 - 7d7384c78


[SPARK-6451][SQL] supported code generation for CombineSum

Author: Venkata Ramana Gollamudi ramana.gollam...@huawei.com

Closes #5138 from gvramana/sum_fix_codegen and squashes the following commits:

95f5fe4 [Venkata Ramana Gollamudi] rebase merge changes
12f45a5 [Venkata Ramana Gollamudi] Combined and added code generations tests as 
per comment
d6a76ac [Venkata Ramana Gollamudi] added support for codegeneration for 
CombineSum and tests


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d7384c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d7384c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d7384c7

Branch: refs/heads/master
Commit: 7d7384c781ea72e1eabab3daca2e237e3b0fc666
Parents: 9418280
Author: Venkata Ramana Gollamudi ramana.gollam...@huawei.com
Authored: Wed Apr 8 18:42:34 2015 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Wed Apr 8 18:42:34 2015 -0700

--
 .../sql/execution/GeneratedAggregate.scala  | 44 +-
 .../spark/sql/execution/SparkStrategies.scala   |  2 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala| 92 +++-
 3 files changed, 133 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7d7384c7/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index a8018b9..861a2c2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -99,7 +99,10 @@ case class GeneratedAggregate(
 // but really, common sub expression elimination would be better
 val zero = Cast(Literal(0), calcType)
 val updateFunction = Coalesce(
-  Add(Coalesce(currentSum :: zero :: Nil), Cast(expr, calcType)) :: 
currentSum :: Nil)
+  Add(
+Coalesce(currentSum :: zero :: Nil),
+Cast(expr, calcType)
+  ) :: currentSum :: zero :: Nil)
 val result =
   expr.dataType match {
 case DecimalType.Fixed(_, _) =
@@ -109,6 +112,45 @@ case class GeneratedAggregate(
 
 AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
 
+  case cs @ CombineSum(expr) =
+val calcType = expr.dataType
+  expr.dataType match {
+case DecimalType.Fixed(_, _) =
+  DecimalType.Unlimited
+case _ =
+  expr.dataType
+  }
+
+val currentSum = AttributeReference(currentSum, calcType, nullable = 
true)()
+val initialValue = Literal.create(null, calcType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be better
+val zero = Cast(Literal(0), calcType)
+// If we're evaluating UnscaledValue(x), we can do Count on x 
directly, since its
+// UnscaledValue will be null if and only if x is null; helps with 
Average on decimals
+val actualExpr = expr match {
+  case UnscaledValue(e) = e
+  case _ = expr
+}
+// partial sum result can be null only when no input rows present 
+val updateFunction = If(
+  IsNotNull(actualExpr),
+  Coalesce(
+Add(
+  Coalesce(currentSum :: zero :: Nil), 
+  Cast(expr, calcType)) :: currentSum :: zero :: Nil),
+  currentSum)
+  
+val result =
+  expr.dataType match {
+case DecimalType.Fixed(_, _) =
+  Cast(currentSum, cs.dataType)
+case _ = currentSum
+  }
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
   case a @ Average(expr) =
 val calcType =
   expr.dataType match {

http://git-wip-us.apache.org/repos/asf/spark/blob/7d7384c7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f754fa7..23f7e56 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -155,7 +155,7 @@ private[sql] abstract 

spark git commit: [SPARK-5242]: Add --private-ips flag to EC2 script

2015-04-08 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 2f482d706 - 86403f552


[SPARK-5242]: Add --private-ips flag to EC2 script

The `spark_ec2.py` script currently references the `ip_address` and 
`public_dns_name` attributes of an instance. On private networks, these fields 
aren't set, so we have problems.

This PR introduces a `--private-ips` flag that instead refers to the 
`private_ip_address` attribute in both cases.

Author: Michelangelo D'Agostino mdagost...@civisanalytics.com

Closes #5244 from mdagost/ec2_private_nets and squashes the following commits:

b684c67 [Michelangelo D'Agostino] STY: A few python lint changes.
a4a2eac [Michelangelo D'Agostino] ENH: Fix IP's typo and refactor conditional 
logic into functions.
c004604 [Michelangelo D'Agostino] ENH: Add --private-ips flag.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86403f55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86403f55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86403f55

Branch: refs/heads/master
Commit: 86403f5525782bc9656ab11790f7020baa6b2c1f
Parents: 2f482d7
Author: Michelangelo D'Agostino mdagost...@civisanalytics.com
Authored: Wed Apr 8 16:48:45 2015 -0400
Committer: Sean Owen so...@cloudera.com
Committed: Wed Apr 8 16:48:45 2015 -0400

--
 ec2/spark_ec2.py | 64 +--
 1 file changed, 47 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/86403f55/ec2/spark_ec2.py
--
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 879a52c..0c1f247 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -282,6 +282,10 @@ def parse_args():
 parser.add_option(
 --vpc-id, default=None,
 help=VPC to launch instances in)
+parser.add_option(
+--private-ips, action=store_true, default=False,
+help=Use private IPs for instances rather than public if VPC/subnet  
+
+ requires that.)
 
 (opts, args) = parser.parse_args()
 if len(args) != 2:
@@ -707,7 +711,7 @@ def get_existing_cluster(conn, opts, cluster_name, 
die_on_error=True):
 # Deploy configuration files and run setup scripts on a newly launched
 # or started EC2 cluster.
 def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
-master = master_nodes[0].public_dns_name
+master = get_dns_name(master_nodes[0], opts.private_ips)
 if deploy_ssh_key:
 print Generating cluster's SSH key on master...
 key_setup = 
@@ -719,8 +723,9 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, 
deploy_ssh_key):
 dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
 print Transferring cluster's SSH key to slaves...
 for slave in slave_nodes:
-print slave.public_dns_name
-ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar)
+slave_address = get_dns_name(slave, opts.private_ips)
+print slave_address
+ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar)
 
 modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
'mapreduce', 'spark-standalone', 'tachyon']
@@ -809,7 +814,8 @@ def is_cluster_ssh_available(cluster_instances, opts):
 Check if SSH is available on all the instances in a cluster.
 
 for i in cluster_instances:
-if not is_ssh_available(host=i.public_dns_name, opts=opts):
+dns_name = get_dns_name(i, opts.private_ips)
+if not is_ssh_available(host=dns_name, opts=opts):
 return False
 else:
 return True
@@ -923,7 +929,7 @@ def get_num_disks(instance_type):
 #
 # root_dir should be an absolute path to the directory with the files we want 
to deploy.
 def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
-active_master = master_nodes[0].public_dns_name
+active_master = get_dns_name(master_nodes[0], opts.private_ips)
 
 num_disks = get_num_disks(opts.instance_type)
 hdfs_data_dirs = /mnt/ephemeral-hdfs/data
@@ -948,10 +954,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, 
slave_nodes, modules):
 print Deploying Spark via git hash; Tachyon won't be set up
 modules = filter(lambda x: x != tachyon, modules)
 
+master_addresses = [get_dns_name(i, opts.private_ips) for i in 
master_nodes]
+slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes]
 template_vars = {
-master_list: '\n'.join([i.public_dns_name for i in master_nodes]),
+master_list: '\n'.join(master_addresses),
 active_master: active_master,
-slave_list: '\n'.join([i.public_dns_name for i in slave_nodes]),
+slave_list: 

[5/7] spark git commit: [SPARK-5654] Integrate SparkR

2015-04-08 Thread shivaram
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/RDD.R
--
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
new file mode 100644
index 000..604ad03
--- /dev/null
+++ b/R/pkg/R/RDD.R
@@ -0,0 +1,1539 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# RDD in R implemented in S4 OO system.
+
+setOldClass(jobj)
+
+#' @title S4 class that represents an RDD
+#' @description RDD can be created using functions like
+#'  \code{parallelize}, \code{textFile} etc.
+#' @rdname RDD
+#' @seealso parallelize, textFile
+#'
+#' @slot env An R environment that stores bookkeeping states of the RDD
+#' @slot jrdd Java object reference to the backing JavaRDD
+#' to an RDD
+#' @export
+setClass(RDD,
+ slots = list(env = environment,
+  jrdd = jobj))
+
+setClass(PipelinedRDD,
+ slots = list(prev = RDD,
+  func = function,
+  prev_jrdd = jobj),
+ contains = RDD)
+
+setMethod(initialize, RDD, function(.Object, jrdd, serializedMode,
+isCached, isCheckpointed) {
+  # Check that RDD constructor is using the correct version of serializedMode
+  stopifnot(class(serializedMode) == character)
+  stopifnot(serializedMode %in% c(byte, string, row))
+  # RDD has three serialization types:
+  # byte: The RDD stores data serialized in R.
+  # string: The RDD stores data as strings.
+  # row: The RDD stores the serialized rows of a DataFrame.
+  
+  # We use an environment to store mutable states inside an RDD object.
+  # Note that R's call-by-value semantics makes modifying slots inside an
+  # object (passed as an argument into a function, such as cache()) difficult:
+  # i.e. one needs to make a copy of the RDD object and sets the new slot value
+  # there.
+
+  # The slots are inheritable from superclass. Here, both `env' and `jrdd' are
+  # inherited from RDD, but only the former is used.
+  .Object@env - new.env()
+  .Object@env$isCached - isCached
+  .Object@env$isCheckpointed - isCheckpointed
+  .Object@env$serializedMode - serializedMode
+
+  .Object@jrdd - jrdd
+  .Object
+})
+
+setMethod(initialize, PipelinedRDD, function(.Object, prev, func, 
jrdd_val) {
+  .Object@env - new.env()
+  .Object@env$isCached - FALSE
+  .Object@env$isCheckpointed - FALSE
+  .Object@env$jrdd_val - jrdd_val
+  if (!is.null(jrdd_val)) {
+# This tracks the serialization mode for jrdd_val
+.Object@env$serializedMode - prev@env$serializedMode
+  }
+
+  .Object@prev - prev
+
+  isPipelinable - function(rdd) {
+e - rdd@env
+!(e$isCached || e$isCheckpointed)
+  }
+
+  if (!inherits(prev, PipelinedRDD) || !isPipelinable(prev)) {
+# This transformation is the first in its stage:
+.Object@func - func
+.Object@prev_jrdd - getJRDD(prev)
+.Object@env$prev_serializedMode - prev@env$serializedMode
+# NOTE: We use prev_serializedMode to track the serialization mode of 
prev_JRDD
+# prev_serializedMode is used during the delayed computation of JRDD in 
getJRDD
+  } else {
+pipelinedFunc - function(split, iterator) {
+  func(split, prev@func(split, iterator))
+}
+.Object@func - pipelinedFunc
+.Object@prev_jrdd - prev@prev_jrdd # maintain the pipeline
+# Get the serialization mode of the parent RDD
+.Object@env$prev_serializedMode - prev@env$prev_serializedMode
+  }
+
+  .Object
+})
+
+#' @rdname RDD
+#' @export
+#'
+#' @param jrdd Java object reference to the backing JavaRDD
+#' @param serializedMode Use byte if the RDD stores data serialized in R, 
string if the RDD
+#' stores strings, and row if the RDD stores the rows of a DataFrame
+#' @param isCached TRUE if the RDD is cached
+#' @param isCheckpointed TRUE if the RDD has been checkpointed
+RDD - function(jrdd, serializedMode = byte, isCached = FALSE,
+isCheckpointed = FALSE) {
+  new(RDD, jrdd, serializedMode, isCached, isCheckpointed)
+}
+
+PipelinedRDD - function(prev, func) {
+  new(PipelinedRDD, prev, func, NULL)
+}
+
+# Return the serialization mode for an RDD.
+setGeneric(getSerializedMode, function(rdd, ...) { 
standardGeneric(getSerializedMode) })
+# For normal RDDs we can 

[2/2] spark git commit: [SPARK-6765] Fix test code style for SQL

2015-04-08 Thread rxin
[SPARK-6765] Fix test code style for SQL

So we can turn style checker on for test code.

Author: Reynold Xin r...@databricks.com

Closes #5412 from rxin/test-style-sql and squashes the following commits:

9098a31 [Reynold Xin] One more compilation error ...
8c7250a [Reynold Xin] Fix compilation.
82d0944 [Reynold Xin] Indentation.
0b03fbb [Reynold Xin] code review.
f2f4348 [Reynold Xin] oops.
ef4ec48 [Reynold Xin] Hive module.
7e0db5e [Reynold Xin] sql module
04ec7ac [Reynold Xin] catalyst module


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b2aab8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b2aab8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b2aab8d

Branch: refs/heads/master
Commit: 1b2aab8d5b9cc2ff702506038bd71aa8debe7ca0
Parents: 891ada5
Author: Reynold Xin r...@databricks.com
Authored: Wed Apr 8 20:35:29 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Wed Apr 8 20:35:29 2015 -0700

--
 .../spark/sql/catalyst/DistributionSuite.scala  |   3 +-
 .../sql/catalyst/analysis/AnalysisSuite.scala   |  10 +-
 .../analysis/HiveTypeCoercionSuite.scala|   8 +-
 .../expressions/ExpressionEvaluationSuite.scala | 134 +--
 .../optimizer/ConstantFoldingSuite.scala|  51 ---
 .../optimizer/FilterPushdownSuite.scala |   3 +-
 .../catalyst/optimizer/OptimizeInSuite.scala|   2 +-
 .../spark/sql/catalyst/plans/PlanTest.scala |   5 +-
 .../sql/catalyst/plans/SameResultSuite.scala|   2 +-
 .../sql/catalyst/trees/TreeNodeSuite.scala  |   8 +-
 .../org/apache/spark/sql/CachedTableSuite.scala |   3 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |   3 +-
 .../scala/org/apache/spark/sql/QueryTest.scala  |   2 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala|  30 +++--
 .../sql/ScalaReflectionRelationSuite.scala  |   5 +-
 .../apache/spark/sql/UserDefinedTypeSuite.scala |   2 +-
 .../spark/sql/columnar/ColumnarTestUtils.scala  |   4 +-
 .../columnar/NullableColumnAccessorSuite.scala  |   3 +-
 .../columnar/NullableColumnBuilderSuite.scala   |   3 +-
 .../TestCompressibleColumnBuilder.scala |   2 +-
 .../sql/execution/debug/DebuggingSuite.scala|   2 +-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |  98 +++---
 .../org/apache/spark/sql/json/JsonSuite.scala   |  17 ++-
 .../spark/sql/parquet/ParquetIOSuite.scala  |   2 +-
 .../spark/sql/parquet/ParquetSchemaSuite.scala  |   2 +
 .../apache/spark/sql/sources/DDLTestSuite.scala |   8 +-
 .../spark/sql/sources/FilteredScanSuite.scala   |   3 +-
 .../spark/sql/sources/PrunedScanSuite.scala |   5 +-
 .../spark/sql/sources/SaveLoadSuite.scala   |   2 +-
 .../spark/sql/sources/TableScanSuite.scala  |   9 +-
 .../spark/sql/hive/ErrorPositionSuite.scala |   2 +-
 .../spark/sql/hive/HiveInspectorSuite.scala |  33 +++--
 .../sql/hive/InsertIntoHiveTableSuite.scala |  57 ++--
 .../apache/spark/sql/hive/StatisticsSuite.scala |   2 +-
 .../hive/execution/BigDataBenchmarkSuite.scala  |  12 +-
 .../sql/hive/execution/HiveComparisonTest.scala |  27 ++--
 .../sql/hive/execution/HiveQueryFileTest.scala  |  11 +-
 .../sql/hive/execution/HiveQuerySuite.scala |  13 +-
 .../hive/execution/HiveResolutionSuite.scala|   3 +-
 .../sql/hive/execution/HiveSerDeSuite.scala |   3 +-
 .../hive/execution/HiveTypeCoercionSuite.scala  |   6 +-
 .../spark/sql/hive/execution/HiveUdfSuite.scala |  16 ++-
 .../spark/sql/hive/execution/PruningSuite.scala |   2 +-
 .../sql/hive/execution/SQLQuerySuite.scala  |   4 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |   7 +-
 45 files changed, 395 insertions(+), 234 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1b2aab8d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
index 46b2250..ea82cd2 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
@@ -30,7 +30,7 @@ class DistributionSuite extends FunSuite {
   inputPartitioning: Partitioning,
   requiredDistribution: Distribution,
   satisfied: Boolean) {
-if (inputPartitioning.satisfies(requiredDistribution) != satisfied)
+if (inputPartitioning.satisfies(requiredDistribution) != satisfied) {
   fail(
 s
 |== Input Partitioning ==
@@ -40,6 +40,7 @@ class DistributionSuite extends FunSuite {
 |== Does input partitioning satisfy required distribution? ==
   

spark git commit: [SPARK-6696] [SQL] Adds HiveContext.refreshTable to PySpark

2015-04-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 7d7384c78 - 891ada5be


[SPARK-6696] [SQL] Adds HiveContext.refreshTable to PySpark

!-- Reviewable:start --
[img src=https://reviewable.io/review_button.png; height=40 alt=Review on 
Reviewable/](https://reviewable.io/reviews/apache/spark/5349)
!-- Reviewable:end --

Author: Cheng Lian l...@databricks.com

Closes #5349 from liancheng/py-refresh-table and squashes the following commits:

004bec0 [Cheng Lian] Adds HiveContext.refreshTable to PySpark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/891ada5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/891ada5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/891ada5b

Branch: refs/heads/master
Commit: 891ada5be1e7fdd796380e2626d80843f2ef6017
Parents: 7d7384c
Author: Cheng Lian l...@databricks.com
Authored: Wed Apr 8 18:47:39 2015 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Wed Apr 8 18:47:39 2015 -0700

--
 python/pyspark/sql/context.py | 9 +
 1 file changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/891ada5b/python/pyspark/sql/context.py
--
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 93e2d17..e8529a8 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -574,6 +574,15 @@ class HiveContext(SQLContext):
 def _get_hive_ctx(self):
 return self._jvm.HiveContext(self._jsc.sc())
 
+def refreshTable(self, tableName):
+Invalidate and refresh all the cached the metadata of the given
+table. For performance reasons, Spark SQL or the external data source
+library it uses might cache certain metadata about a table, such as the
+location of blocks. When those change outside of Spark SQL, users 
should
+call this function to invalidate the cache.
+
+self._ssql_ctx.refreshTable(tableName)
+
 
 class UDFRegistration(object):
 Wrapper for user-defined function registration.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[7/7] spark git commit: [SPARK-5654] Integrate SparkR

2015-04-08 Thread shivaram
[SPARK-5654] Integrate SparkR

This pull requests integrates SparkR, an R frontend for Spark. The SparkR 
package contains both RDD and DataFrame APIs in R and is integrated with 
Spark's submission scripts to work on different cluster managers.

Some integration points that would be great to get feedback on:

1. Build procedure: SparkR requires R to be installed on the machine to be 
built. Right now we have a new Maven profile `-PsparkR` that can be used to 
enable SparkR builds

2. YARN cluster mode: The R package that is built needs to be present on the 
driver and all the worker nodes during execution. The R package location is 
currently set using SPARK_HOME, but this might not work on YARN cluster mode.

The SparkR package represents the work of many contributors and attached below 
is a list of people along with areas they worked on

edwardt (edwart) - Documentation improvements
Felix Cheung (felixcheung) - Documentation improvements
Hossein Falaki (falaki)  - Documentation improvements
Chris Freeman (cafreeman) - DataFrame API, Programming Guide
Todd Gao (7c00) - R worker Internals
Ryan Hafen (hafen) - SparkR Internals
Qian Huang (hqzizania) - RDD API
Hao Lin (hlin09) - RDD API, Closure cleaner
Evert Lammerts (evertlammerts) - DataFrame API
Davies Liu (davies) - DataFrame API, R worker internals, Merging with Spark
Yi Lu (lythesia) - RDD API, Worker internals
Matt Massie (massie) - Jenkins build
Harihar Nahak (hnahak87) - SparkR examples
Oscar Olmedo (oscaroboto) - Spark configuration
Antonio Piccolboni (piccolbo) - SparkR examples, Namespace bug fixes
Dan Putler (dputler) - Dataframe API, SparkR Install Guide
Ashutosh Raina (ashutoshraina) - Build improvements
Josh Rosen (joshrosen) - Travis CI build
Sun Rui (sun-rui)- RDD API, JVM Backend, Shuffle improvements
Shivaram Venkataraman (shivaram) - RDD API, JVM Backend, Worker Internals
Zongheng Yang (concretevitamin) - RDD API, Pipelined RDDs, Examples and EC2 
guide

Author: Shivaram Venkataraman shiva...@cs.berkeley.edu
Author: Shivaram Venkataraman shivaram.venkatara...@gmail.com
Author: Zongheng Yang zonghen...@gmail.com
Author: cafreeman cfree...@alteryx.com
Author: Shivaram Venkataraman shiva...@eecs.berkeley.edu
Author: Davies Liu dav...@databricks.com
Author: Davies Liu davies@gmail.com
Author: hlin09 hlin0...@gmail.com
Author: Sun Rui rui@intel.com
Author: lythesia iranaik...@gmail.com
Author: oscaroboto osca...@gmail.com
Author: Antonio Piccolboni anto...@piccolboni.info
Author: root edward
Author: edwardt edwardt.t...@gmail.com
Author: hqzizania qian.hu...@intel.com
Author: dputler dan.put...@gmail.com
Author: Todd Gao todd.gao.2...@gmail.com
Author: Chris Freeman cfree...@alteryx.com
Author: Felix Cheung fcheung@AVVOMAC-119.local
Author: Hossein hoss...@databricks.com
Author: Evert Lammerts ev...@apache.org
Author: Felix Cheung fche...@avvomac-119.t-mobile.com
Author: felixcheung felixcheun...@hotmail.com
Author: Ryan Hafen rha...@gmail.com
Author: Ashutosh Raina ashutoshra...@users.noreply.github.com
Author: Oscar Olmedo osca...@gmail.com
Author: Josh Rosen rosenvi...@gmail.com
Author: Yi Lu iranaik...@gmail.com
Author: Harihar Nahak hnaha...@users.noreply.github.com

Closes #5096 from shivaram/R and squashes the following commits:

da64742 [Davies Liu] fix Date serialization
59266d1 [Davies Liu] check exclusive of primary-py-file and primary-r-file
55808e4 [Davies Liu] fix tests
5581c75 [Davies Liu] update author of SparkR
f731b48 [Shivaram Venkataraman] Only run SparkR tests if R is installed
64eda24 [Shivaram Venkataraman] Merge branch 'R' of 
https://github.com/amplab-extras/spark into R
d7c3f22 [Shivaram Venkataraman] Address code review comments Changes include 1. 
Adding SparkR docs to API docs generated 2. Style fixes in SparkR scala files 
3. Clean up of shell scripts and explanation of install-dev.sh
377151f [Shivaram Venkataraman] Merge remote-tracking branch 'apache/master' 
into R
eb5da53 [Shivaram Venkataraman] Merge pull request #3 from davies/R2
a18ff5c [Davies Liu] Update sparkR.R
5133f3a [Shivaram Venkataraman] Merge pull request #7 from hqzizania/R3
940b631 [hqzizania] [SPARKR-92] Phase 2: implement sum(rdd)
0e788c0 [Shivaram Venkataraman] Merge pull request #5 from hlin09/doc-fix
3487461 [hlin09] Add tests log in .gitignore.
1d1802e [Shivaram Venkataraman] Merge pull request #4 from felixcheung/r-require
11981b7 [felixcheung] Update R to fail early if SparkR package is missing
c300e08 [Davies Liu] remove duplicated file
b045701 [Davies Liu] Merge branch 'remote_r' into R
19c9368 [Davies Liu] Merge branch 'sparkr-sql' of 
github.com:amplab-extras/SparkR-pkg into remote_r
f8fa8af [Davies Liu] mute logging when start/stop context
e7104b6 [Davies Liu] remove ::: in SparkR
a1777eb [Davies Liu] move rules into R/.gitignore
e88b649 [Davies Liu] Merge branch 'R' of github.com:amplab-extras/spark into R
6e20e71 [Davies Liu] address comments
b433817 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R

[1/2] spark git commit: [SPARK-6765] Fix test code style for SQL

2015-04-08 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 891ada5be - 1b2aab8d5


http://git-wip-us.apache.org/repos/asf/spark/blob/1b2aab8d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala
index 02518d5..f7b37da 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala
@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.util._
 /**
  * A framework for running the query tests that are listed as a set of text 
files.
  *
- * TestSuites that derive from this class must provide a map of testCaseName 
- testCaseFiles that should be included.
- * Additionally, there is support for whitelisting and blacklisting tests as 
development progresses.
+ * TestSuites that derive from this class must provide a map of testCaseName 
- testCaseFiles
+ * that should be included. Additionally, there is support for whitelisting 
and blacklisting
+ * tests as development progresses.
  */
 abstract class HiveQueryFileTest extends HiveComparisonTest {
   /** A list of tests deemed out of scope and thus completely disregarded */
@@ -54,15 +55,17 @@ abstract class HiveQueryFileTest extends HiveComparisonTest 
{
 case (testCaseName, testCaseFile) =
   if 
(blackList.map(_.r.pattern.matcher(testCaseName).matches()).reduceLeft(_||_)) {
 logDebug(sBlacklisted test skipped $testCaseName)
-  } else if 
(realWhiteList.map(_.r.pattern.matcher(testCaseName).matches()).reduceLeft(_||_)
 || runAll) {
+  } else if 
(realWhiteList.map(_.r.pattern.matcher(testCaseName).matches()).reduceLeft(_||_)
 ||
+runAll) {
 // Build a test case and submit it to scala test framework...
 val queriesString = fileToString(testCaseFile)
 createQueryTest(testCaseName, queriesString)
   } else {
 // Only output warnings for the built in whitelist as this clutters 
the output when the user
 // trying to execute a single test from the commandline.
-if(System.getProperty(whiteListProperty) == null  !runAll)
+if (System.getProperty(whiteListProperty) == null  !runAll) {
   ignore(testCaseName) {}
+}
   }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1b2aab8d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index de140fc..af781a5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -37,7 +37,8 @@ import org.apache.spark.sql.hive.test.TestHive._
 case class TestData(a: Int, b: String)
 
 /**
- * A set of test cases expressed in Hive QL that are not covered by the tests 
included in the hive distribution.
+ * A set of test cases expressed in Hive QL that are not covered by the tests
+ * included in the hive distribution.
  */
 class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
   private val originalTimeZone = TimeZone.getDefault
@@ -237,7 +238,8 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
   }
 
   createQueryTest(modulus,
-SELECT 11 % 10, IF((101.1 % 100.0) BETWEEN 1.01 AND 1.11, \true\, 
\false\), (101 / 2) % 10 FROM src LIMIT 1)
+SELECT 11 % 10, IF((101.1 % 100.0) BETWEEN 1.01 AND 1.11, \true\, 
\false\),  +
+  (101 / 2) % 10 FROM src LIMIT 1)
 
   test(Query expressed in SQL) {
 setConf(spark.sql.dialect, sql)
@@ -309,7 +311,8 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
 SELECT * FROM src a JOIN src b ON a.key = b.key)
 
   createQueryTest(small.cartesian,
-SELECT a.key, b.key FROM (SELECT key FROM src WHERE key  1) a JOIN 
(SELECT key FROM src WHERE key = 2) b)
+SELECT a.key, b.key FROM (SELECT key FROM src WHERE key  1) a JOIN  +
+  (SELECT key FROM src WHERE key = 2) b)
 
   createQueryTest(length.udf,
 SELECT length(\test\) FROM src LIMIT 1)
@@ -457,6 +460,7 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
   createQueryTest(lateral view3,
 FROM src SELECT key, D.* lateral view explode(array(key+3, key+4)) D as 
CX)
 
+  // scalastyle:off
   createQueryTest(lateral view4,
 
   |create table src_lv1 (key string, value string);
@@ -466,6 +470,7 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
   |insert overwrite table src_lv1 SELECT key, 

spark git commit: [SQL][minor] remove duplicated resolveGetField and update comment

2015-04-08 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 55a92ef34 - 941828054


[SQL][minor] remove duplicated resolveGetField and update comment

It's after https://github.com/apache/spark/pull/5189

Author: Wenchen Fan cloud0...@outlook.com

Closes #5304 from cloud-fan/tmp and squashes the following commits:

c58c9b3 [Wenchen Fan] remove duplicated code and update comment


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94182805
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94182805
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94182805

Branch: refs/heads/master
Commit: 9418280547f962eaf309bfff9986cdd848409643
Parents: 55a92ef
Author: Wenchen Fan cloud0...@outlook.com
Authored: Wed Apr 8 13:57:01 2015 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Wed Apr 8 13:57:01 2015 -0700

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 32 +---
 .../catalyst/plans/logical/LogicalPlan.scala| 13 +++-
 2 files changed, 6 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/94182805/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 119cb9c..b3aba4f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -293,7 +293,7 @@ class Analyzer(
 logDebug(sResolving $u to $result)
 result
   case UnresolvedGetField(child, fieldName) if child.resolved =
-resolveGetField(child, fieldName)
+q.resolveGetField(child, fieldName, resolver)
 }
 }
 
@@ -313,36 +313,6 @@ class Analyzer(
  */
 protected def containsStar(exprs: Seq[Expression]): Boolean =
   exprs.exists(_.collect { case _: Star = true }.nonEmpty)
-
-/**
- * Returns the resolved `GetField`, and report error if no desired field 
or over one
- * desired fields are found.
- */
-protected def resolveGetField(expr: Expression, fieldName: String): 
Expression = {
-  def findField(fields: Array[StructField]): Int = {
-val checkField = (f: StructField) = resolver(f.name, fieldName)
-val ordinal = fields.indexWhere(checkField)
-if (ordinal == -1) {
-  throw new AnalysisException(
-sNo such struct field $fieldName in 
${fields.map(_.name).mkString(, )})
-} else if (fields.indexWhere(checkField, ordinal + 1) != -1) {
-  throw new AnalysisException(
-sAmbiguous reference to fields 
${fields.filter(checkField).mkString(, )})
-} else {
-  ordinal
-}
-  }
-  expr.dataType match {
-case StructType(fields) =
-  val ordinal = findField(fields)
-  StructGetField(expr, fields(ordinal), ordinal)
-case ArrayType(StructType(fields), containsNull) =
-  val ordinal = findField(fields)
-  ArrayGetField(expr, fields(ordinal), ordinal, containsNull)
-case otherType =
-  throw new AnalysisException(sGetField is not valid on fields of 
type $otherType)
-  }
-}
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/94182805/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 2e9f3aa..d8f5858 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -205,11 +205,10 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] 
with Logging {
   // One match, but we also need to extract the requested nested field.
   case Seq((a, nestedFields)) =
 try {
-
-  // The foldLeft adds UnresolvedGetField for every remaining parts of 
the name,
-  // and aliased it with the last part of the name.
-  // For example, consider name a.b.c, where a is resolved to an 
existing attribute.
-  // Then this will add UnresolvedGetField(b) and 
UnresolvedGetField(c), and alias
+  // The foldLeft adds GetFields for every remaining parts of the 
identifier,
+  // and aliases it with the last part of the 

[6/7] spark git commit: [SPARK-5654] Integrate SparkR

2015-04-08 Thread shivaram
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
new file mode 100644
index 000..a354cdc
--- /dev/null
+++ b/R/pkg/NAMESPACE
@@ -0,0 +1,182 @@
+#exportPattern(^[[:alpha:]]+)
+exportClasses(RDD)
+exportClasses(Broadcast)
+exportMethods(
+  aggregateByKey,
+  aggregateRDD,
+  cache,
+  checkpoint,
+  coalesce,
+  cogroup,
+  collect,
+  collectAsMap,
+  collectPartition,
+  combineByKey,
+  count,
+  countByKey,
+  countByValue,
+  distinct,
+  Filter,
+  filterRDD,
+  first,
+  flatMap,
+  flatMapValues,
+  fold,
+  foldByKey,
+  foreach,
+  foreachPartition,
+  fullOuterJoin,
+  glom,
+  groupByKey,
+  join,
+  keyBy,
+  keys,
+  length,
+  lapply,
+  lapplyPartition,
+  lapplyPartitionsWithIndex,
+  leftOuterJoin,
+  lookup,
+  map,
+  mapPartitions,
+  mapPartitionsWithIndex,
+  mapValues,
+  maximum,
+  minimum,
+  numPartitions,
+  partitionBy,
+  persist,
+  pipeRDD,
+  reduce,
+  reduceByKey,
+  reduceByKeyLocally,
+  repartition,
+  rightOuterJoin,
+  sampleRDD,
+  saveAsTextFile,
+  saveAsObjectFile,
+  sortBy,
+  sortByKey,
+  sumRDD,
+  take,
+  takeOrdered,
+  takeSample,
+  top,
+  unionRDD,
+  unpersist,
+  value,
+  values,
+  zipRDD,
+  zipWithIndex,
+  zipWithUniqueId
+ )
+
+# S3 methods exported
+export(
+   textFile,
+   objectFile,
+   parallelize,
+   hashCode,
+   includePackage,
+   broadcast,
+   setBroadcastValue,
+   setCheckpointDir
+  )
+export(sparkR.init)
+export(sparkR.stop)
+export(print.jobj)
+useDynLib(SparkR, stringHashCode)
+importFrom(methods, setGeneric, setMethod, setOldClass)
+
+# SparkRSQL
+
+exportClasses(DataFrame)
+
+exportMethods(columns,
+  distinct,
+  dtypes,
+  explain,
+  filter,
+  groupBy,
+  head,
+  insertInto,
+  intersect,
+  isLocal,
+  limit,
+  orderBy,
+  names,
+  printSchema,
+  registerTempTable,
+  repartition,
+  sampleDF,
+  saveAsParquetFile,
+  saveAsTable,
+  saveDF,
+  schema,
+  select,
+  selectExpr,
+  show,
+  showDF,
+  sortDF,
+  subtract,
+  toJSON,
+  toRDD,
+  unionAll,
+  where,
+  withColumn,
+  withColumnRenamed)
+
+exportClasses(Column)
+
+exportMethods(abs,
+  alias,
+  approxCountDistinct,
+  asc,
+  avg,
+  cast,
+  contains,
+  countDistinct,
+  desc,
+  endsWith,
+  getField,
+  getItem,
+  isNotNull,
+  isNull,
+  last,
+  like,
+  lower,
+  max,
+  mean,
+  min,
+  rlike,
+  sqrt,
+  startsWith,
+  substr,
+  sum,
+  sumDistinct,
+  upper)
+
+exportClasses(GroupedData)
+exportMethods(agg)
+
+export(sparkRSQL.init,
+   sparkRHive.init)
+
+export(cacheTable,
+   clearCache,
+   createDataFrame,
+   createExternalTable,
+   dropTempTable,
+   jsonFile,
+   jsonRDD,
+   loadDF,
+   parquetFile,
+   sql,
+   table,
+   tableNames,
+   tables,
+   toDF,
+   uncacheTable)
+
+export(print.structType,
+   print.structField)

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
new file mode 100644
index 000..feafd56
--- /dev/null
+++ b/R/pkg/R/DataFrame.R
@@ -0,0 +1,1270 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional 

[4/7] spark git commit: [SPARK-5654] Integrate SparkR

2015-04-08 Thread shivaram
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/context.R
--
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
new file mode 100644
index 000..2fc0bb2
--- /dev/null
+++ b/R/pkg/R/context.R
@@ -0,0 +1,225 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# context.R: SparkContext driven functions
+
+getMinSplits - function(sc, minSplits) {
+  if (is.null(minSplits)) {
+defaultParallelism - callJMethod(sc, defaultParallelism)
+minSplits - min(defaultParallelism, 2)
+  }
+  as.integer(minSplits)
+}
+
+#' Create an RDD from a text file.
+#'
+#' This function reads a text file from HDFS, a local file system (available 
on all
+#' nodes), or any Hadoop-supported file system URI, and creates an
+#' RDD of strings from it.
+#'
+#' @param sc SparkContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @param minSplits Minimum number of splits to be created. If NULL, the 
default
+#'  value is chosen based on available parallelism.
+#' @return RDD where each item is of type \code{character}
+#' @export
+#' @examples
+#'\dontrun{
+#'  sc - sparkR.init()
+#'  lines - textFile(sc, myfile.txt)
+#'}
+textFile - function(sc, path, minSplits = NULL) {
+  # Allow the user to have a more flexible definiton of the text file path
+  path - suppressWarnings(normalizePath(path))
+  #' Convert a string vector of paths to a string containing comma separated 
paths
+  path - paste(path, collapse = ,)
+
+  jrdd - callJMethod(sc, textFile, path, getMinSplits(sc, minSplits))
+  # jrdd is of type JavaRDD[String]
+  RDD(jrdd, string)
+}
+
+#' Load an RDD saved as a SequenceFile containing serialized objects.
+#'
+#' The file to be loaded should be one that was previously generated by calling
+#' saveAsObjectFile() of the RDD class.
+#'
+#' @param sc SparkContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @param minSplits Minimum number of splits to be created. If NULL, the 
default
+#'  value is chosen based on available parallelism.
+#' @return RDD containing serialized R objects.
+#' @seealso saveAsObjectFile
+#' @export
+#' @examples
+#'\dontrun{
+#'  sc - sparkR.init()
+#'  rdd - objectFile(sc, myfile)
+#'}
+objectFile - function(sc, path, minSplits = NULL) {
+  # Allow the user to have a more flexible definiton of the text file path
+  path - suppressWarnings(normalizePath(path))
+  #' Convert a string vector of paths to a string containing comma separated 
paths
+  path - paste(path, collapse = ,)
+
+  jrdd - callJMethod(sc, objectFile, path, getMinSplits(sc, minSplits))
+  # Assume the RDD contains serialized R objects.
+  RDD(jrdd, byte)
+}
+
+#' Create an RDD from a homogeneous list or vector.
+#'
+#' This function creates an RDD from a local homogeneous list in R. The 
elements
+#' in the list are split into \code{numSlices} slices and distributed to nodes
+#' in the cluster.
+#'
+#' @param sc SparkContext to use
+#' @param coll collection to parallelize
+#' @param numSlices number of partitions to create in the RDD
+#' @return an RDD created from this collection
+#' @export
+#' @examples
+#'\dontrun{
+#' sc - sparkR.init()
+#' rdd - parallelize(sc, 1:10, 2)
+#' # The RDD should contain 10 elements
+#' length(rdd)
+#'}
+parallelize - function(sc, coll, numSlices = 1) {
+  # TODO: bound/safeguard numSlices
+  # TODO: unit tests for if the split works for all primitives
+  # TODO: support matrix, data frame, etc
+  if ((!is.list(coll)  !is.vector(coll)) || is.data.frame(coll)) {
+if (is.data.frame(coll)) {
+  message(paste(context.R: A data frame is parallelized by columns.))
+} else {
+  if (is.matrix(coll)) {
+message(paste(context.R: A matrix is parallelized by elements.))
+  } else {
+message(paste(context.R: parallelize() currently only supports lists 
and vectors.,
+  Calling as.list() to coerce coll into a list.))
+  }
+}
+coll - as.list(coll)
+  }
+
+  if (numSlices  length(coll))
+numSlices - length(coll)
+
+  sliceLen - ceiling(length(coll) / numSlices)
+  slices - split(coll, rep(1:(numSlices + 1), each = 

[1/7] spark git commit: [SPARK-5654] Integrate SparkR

2015-04-08 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 1b2aab8d5 - 2fe0a1aae


http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala 
b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
new file mode 100644
index 000..ccb2a37
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.r
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.sql.{Date, Time}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Utility functions to serialize, deserialize objects to / from R
+ */
+private[spark] object SerDe {
+
+  // Type mapping from R to Java
+  //
+  // NULL - void
+  // integer - Int
+  // character - String
+  // logical - Boolean
+  // double, numeric - Double
+  // raw - Array[Byte]
+  // Date - Date
+  // POSIXlt/POSIXct - Time
+  //
+  // list[T] - Array[T], where T is one of above mentioned types
+  // environment - Map[String, T], where T is a native type
+  // jobj - Object, where jobj is an object created in the backend
+
+  def readObjectType(dis: DataInputStream): Char = {
+dis.readByte().toChar
+  }
+
+  def readObject(dis: DataInputStream): Object = {
+val dataType = readObjectType(dis)
+readTypedObject(dis, dataType)
+  }
+
+  def readTypedObject(
+  dis: DataInputStream,
+  dataType: Char): Object = {
+dataType match {
+  case 'n' = null
+  case 'i' = new java.lang.Integer(readInt(dis))
+  case 'd' = new java.lang.Double(readDouble(dis))
+  case 'b' = new java.lang.Boolean(readBoolean(dis))
+  case 'c' = readString(dis)
+  case 'e' = readMap(dis)
+  case 'r' = readBytes(dis)
+  case 'l' = readList(dis)
+  case 'D' = readDate(dis)
+  case 't' = readTime(dis)
+  case 'j' = JVMObjectTracker.getObject(readString(dis))
+  case _ = throw new IllegalArgumentException(sInvalid type $dataType)
+}
+  }
+
+  def readBytes(in: DataInputStream): Array[Byte] = {
+val len = readInt(in)
+val out = new Array[Byte](len)
+val bytesRead = in.readFully(out)
+out
+  }
+
+  def readInt(in: DataInputStream): Int = {
+in.readInt()
+  }
+
+  def readDouble(in: DataInputStream): Double = {
+in.readDouble()
+  }
+
+  def readString(in: DataInputStream): String = {
+val len = in.readInt()
+val asciiBytes = new Array[Byte](len)
+in.readFully(asciiBytes)
+assert(asciiBytes(len - 1) == 0)
+val str = new String(asciiBytes.dropRight(1).map(_.toChar))
+str
+  }
+
+  def readBoolean(in: DataInputStream): Boolean = {
+val intVal = in.readInt()
+if (intVal == 0) false else true
+  }
+
+  def readDate(in: DataInputStream): Date = {
+Date.valueOf(readString(in))
+  }
+
+  def readTime(in: DataInputStream): Time = {
+val t = in.readDouble()
+new Time((t * 1000L).toLong)
+  }
+
+  def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
+val len = readInt(in)
+(0 until len).map(_ = readBytes(in)).toArray
+  }
+
+  def readIntArr(in: DataInputStream): Array[Int] = {
+val len = readInt(in)
+(0 until len).map(_ = readInt(in)).toArray
+  }
+
+  def readDoubleArr(in: DataInputStream): Array[Double] = {
+val len = readInt(in)
+(0 until len).map(_ = readDouble(in)).toArray
+  }
+
+  def readBooleanArr(in: DataInputStream): Array[Boolean] = {
+val len = readInt(in)
+(0 until len).map(_ = readBoolean(in)).toArray
+  }
+
+  def readStringArr(in: DataInputStream): Array[String] = {
+val len = readInt(in)
+(0 until len).map(_ = readString(in)).toArray
+  }
+
+  def readList(dis: DataInputStream): Array[_] = {
+val arrType = readObjectType(dis)
+arrType match {
+  case 'i' = readIntArr(dis)
+  case 'c' = readStringArr(dis)
+  case 'd' = readDoubleArr(dis)
+  case 'b' = readBooleanArr(dis)
+  case 'j' = readStringArr(dis).map(x = JVMObjectTracker.getObject(x))
+  case 'r' = readBytesArr(dis)
+  case _ = throw new