spark git commit: [SPARK-6765] Fix test code style for streaming.
Repository: spark Updated Branches: refs/heads/master 8d2a36c0f - 15e0d2bd1 [SPARK-6765] Fix test code style for streaming. So we can turn style checker on for test code. Author: Reynold Xin r...@databricks.com Closes #5409 from rxin/test-style-streaming and squashes the following commits: 7aea69b [Reynold Xin] [SPARK-6765] Fix test code style for streaming. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15e0d2bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15e0d2bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15e0d2bd Branch: refs/heads/master Commit: 15e0d2bd1304db62fad286c1bb687e87c361e16c Parents: 8d2a36c Author: Reynold Xin r...@databricks.com Authored: Wed Apr 8 00:24:59 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Wed Apr 8 00:24:59 2015 -0700 -- .../flume/FlumePollingStreamSuite.scala | 29 ++--- .../streaming/flume/FlumeStreamSuite.scala | 4 +- .../spark/streaming/mqtt/MQTTStreamSuite.scala | 3 +- .../spark/streaming/BasicOperationsSuite.scala | 6 ++- .../spark/streaming/CheckpointSuite.scala | 45 +++- .../apache/spark/streaming/FailureSuite.scala | 4 +- .../spark/streaming/InputStreamsSuite.scala | 15 --- .../streaming/ReceivedBlockHandlerSuite.scala | 4 +- .../streaming/ReceivedBlockTrackerSuite.scala | 6 ++- .../apache/spark/streaming/ReceiverSuite.scala | 11 ++--- .../spark/streaming/StreamingContextSuite.scala | 5 ++- .../streaming/StreamingListenerSuite.scala | 4 +- .../apache/spark/streaming/TestSuiteBase.scala | 28 ++-- .../spark/streaming/UISeleniumSuite.scala | 3 +- .../spark/streaming/WindowOperationsSuite.scala | 4 +- .../rdd/WriteAheadLogBackedBlockRDDSuite.scala | 12 -- .../streaming/scheduler/JobGeneratorSuite.scala | 2 +- .../streaming/util/WriteAheadLogSuite.scala | 2 +- .../spark/streamingtest/ImplicitSuite.scala | 3 +- 19 files changed, 115 insertions(+), 75 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/15e0d2bd/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala -- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index e04d408..2edea9b 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -1,21 +1,20 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + *http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ + package org.apache.spark.streaming.flume import java.net.InetSocketAddress @@ -213,7 +212,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging assert(counter === totalEventsPerChannel * channels.size) } - def assertChannelIsEmpty(channel: MemoryChannel) = { + def assertChannelIsEmpty(channel: MemoryChannel): Unit = { val queueRemaining = channel.getClass.getDeclaredField(queueRemaining) queueRemaining.setAccessible(true)
spark git commit: [SPARK-6506] [pyspark] Do not try to retrieve SPARK_HOME when not needed...
Repository: spark Updated Branches: refs/heads/master 15e0d2bd1 - f7e21dd1e [SPARK-6506] [pyspark] Do not try to retrieve SPARK_HOME when not needed... In particular, this makes pyspark in yarn-cluster mode fail unless SPARK_HOME is set, when it's not really needed. Author: Marcelo Vanzin van...@cloudera.com Closes #5405 from vanzin/SPARK-6506 and squashes the following commits: e184507 [Marcelo Vanzin] [SPARK-6506] [pyspark] Do not try to retrieve SPARK_HOME when not needed. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7e21dd1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7e21dd1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7e21dd1 Branch: refs/heads/master Commit: f7e21dd1ec4541be54eb01d8b15cfcc6714feed0 Parents: 15e0d2b Author: Marcelo Vanzin van...@cloudera.com Authored: Wed Apr 8 10:14:52 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Wed Apr 8 10:14:52 2015 -0700 -- python/pyspark/java_gateway.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f7e21dd1/python/pyspark/java_gateway.py -- diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 0a16cbd..2a5e84a 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -29,11 +29,10 @@ from pyspark.serializers import read_int def launch_gateway(): -SPARK_HOME = os.environ[SPARK_HOME] - if PYSPARK_GATEWAY_PORT in os.environ: gateway_port = int(os.environ[PYSPARK_GATEWAY_PORT]) else: +SPARK_HOME = os.environ[SPARK_HOME] # Launch the Py4j gateway using Spark's run command so that we pick up the # proper classpath and settings from spark-env.sh on_windows = platform.system() == Windows - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6506] [pyspark] Do not try to retrieve SPARK_HOME when not needed...
Repository: spark Updated Branches: refs/heads/branch-1.3 cdef7d080 - e967ecaca [SPARK-6506] [pyspark] Do not try to retrieve SPARK_HOME when not needed... In particular, this makes pyspark in yarn-cluster mode fail unless SPARK_HOME is set, when it's not really needed. Author: Marcelo Vanzin van...@cloudera.com Closes #5405 from vanzin/SPARK-6506 and squashes the following commits: e184507 [Marcelo Vanzin] [SPARK-6506] [pyspark] Do not try to retrieve SPARK_HOME when not needed. (cherry picked from commit f7e21dd1ec4541be54eb01d8b15cfcc6714feed0) Signed-off-by: Josh Rosen joshro...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e967ecac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e967ecac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e967ecac Branch: refs/heads/branch-1.3 Commit: e967ecacad8075ef521fbc1a501e074c861d0fe7 Parents: cdef7d0 Author: Marcelo Vanzin van...@cloudera.com Authored: Wed Apr 8 10:14:52 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Wed Apr 8 10:15:10 2015 -0700 -- python/pyspark/java_gateway.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e967ecac/python/pyspark/java_gateway.py -- diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 936857e..19ee2e3 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -29,11 +29,10 @@ from pyspark.serializers import read_int def launch_gateway(): -SPARK_HOME = os.environ[SPARK_HOME] - if PYSPARK_GATEWAY_PORT in os.environ: gateway_port = int(os.environ[PYSPARK_GATEWAY_PORT]) else: +SPARK_HOME = os.environ[SPARK_HOME] # Launch the Py4j gateway using Spark's run command so that we pick up the # proper classpath and settings from spark-env.sh on_windows = platform.system() == Windows - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6753] Clone SparkConf in ShuffleSuite tests
Repository: spark Updated Branches: refs/heads/branch-1.2 f7fe87f4b - 7a1583917 [SPARK-6753] Clone SparkConf in ShuffleSuite tests Prior to this change, the unit test for SPARK-3426 did not clone the original SparkConf, which meant that that test did not use the options set by suites that subclass ShuffleSuite.scala. This commit fixes that problem. JoshRosen would be great if you could take a look at this, since you wrote this test originally. Author: Kay Ousterhout kayousterh...@gmail.com Closes #5401 from kayousterhout/SPARK-6753 and squashes the following commits: 368c540 [Kay Ousterhout] [SPARK-6753] Clone SparkConf in ShuffleSuite tests (cherry picked from commit 9d44ddce1d1e19011026605549c37d0db6d6afa1) Signed-off-by: Josh Rosen joshro...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a158391 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a158391 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a158391 Branch: refs/heads/branch-1.2 Commit: 7a1583917e4a73a464d3db57406fe708a9283c7c Parents: f7fe87f Author: Kay Ousterhout kayousterh...@gmail.com Authored: Wed Apr 8 10:26:45 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Wed Apr 8 10:27:21 2015 -0700 -- core/src/test/scala/org/apache/spark/ShuffleSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7a158391/core/src/test/scala/org/apache/spark/ShuffleSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 96cb8e4..64202e3 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -243,14 +243,14 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex shuffleSpillCompress - Set(true, false); shuffleCompress - Set(true, false) ) { - val conf = new SparkConf() + val myConf = conf.clone() .setAppName(test) .setMaster(local) .set(spark.shuffle.spill.compress, shuffleSpillCompress.toString) .set(spark.shuffle.compress, shuffleCompress.toString) .set(spark.shuffle.memoryFraction, 0.001) resetSparkContext() - sc = new SparkContext(conf) + sc = new SparkContext(myConf) try { sc.parallelize(0 until 10).map(i = (i / 4, i)).groupByKey().collect() } catch { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6753] Clone SparkConf in ShuffleSuite tests
Repository: spark Updated Branches: refs/heads/branch-1.1 39761f515 - ee06e9271 [SPARK-6753] Clone SparkConf in ShuffleSuite tests Prior to this change, the unit test for SPARK-3426 did not clone the original SparkConf, which meant that that test did not use the options set by suites that subclass ShuffleSuite.scala. This commit fixes that problem. JoshRosen would be great if you could take a look at this, since you wrote this test originally. Author: Kay Ousterhout kayousterh...@gmail.com Closes #5401 from kayousterhout/SPARK-6753 and squashes the following commits: 368c540 [Kay Ousterhout] [SPARK-6753] Clone SparkConf in ShuffleSuite tests (cherry picked from commit 9d44ddce1d1e19011026605549c37d0db6d6afa1) Signed-off-by: Josh Rosen joshro...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee06e927 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee06e927 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee06e927 Branch: refs/heads/branch-1.1 Commit: ee06e9271017d2730147ac08a228c771630b3688 Parents: 39761f5 Author: Kay Ousterhout kayousterh...@gmail.com Authored: Wed Apr 8 10:26:45 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Wed Apr 8 10:27:37 2015 -0700 -- core/src/test/scala/org/apache/spark/ShuffleSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ee06e927/core/src/test/scala/org/apache/spark/ShuffleSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 625702b..eba3948 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -244,14 +244,14 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { shuffleSpillCompress - Set(true, false); shuffleCompress - Set(true, false) ) { - val conf = new SparkConf() + val myConf = conf.clone() .setAppName(test) .setMaster(local) .set(spark.shuffle.spill.compress, shuffleSpillCompress.toString) .set(spark.shuffle.compress, shuffleCompress.toString) .set(spark.shuffle.memoryFraction, 0.001) resetSparkContext() - sc = new SparkContext(conf) + sc = new SparkContext(myConf) try { sc.parallelize(0 until 10).map(i = (i / 4, i)).groupByKey().collect() } catch { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6753] Clone SparkConf in ShuffleSuite tests
Repository: spark Updated Branches: refs/heads/branch-1.3 e967ecaca - 3b655680c [SPARK-6753] Clone SparkConf in ShuffleSuite tests Prior to this change, the unit test for SPARK-3426 did not clone the original SparkConf, which meant that that test did not use the options set by suites that subclass ShuffleSuite.scala. This commit fixes that problem. JoshRosen would be great if you could take a look at this, since you wrote this test originally. Author: Kay Ousterhout kayousterh...@gmail.com Closes #5401 from kayousterhout/SPARK-6753 and squashes the following commits: 368c540 [Kay Ousterhout] [SPARK-6753] Clone SparkConf in ShuffleSuite tests (cherry picked from commit 9d44ddce1d1e19011026605549c37d0db6d6afa1) Signed-off-by: Josh Rosen joshro...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b655680 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b655680 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b655680 Branch: refs/heads/branch-1.3 Commit: 3b655680c4f5c71c903af5a71c96447e03350f93 Parents: e967eca Author: Kay Ousterhout kayousterh...@gmail.com Authored: Wed Apr 8 10:26:45 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Wed Apr 8 10:27:07 2015 -0700 -- core/src/test/scala/org/apache/spark/ShuffleSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3b655680/core/src/test/scala/org/apache/spark/ShuffleSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index f57921b..30b6184 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -242,14 +242,14 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex shuffleSpillCompress - Set(true, false); shuffleCompress - Set(true, false) ) { - val conf = new SparkConf() + val myConf = conf.clone() .setAppName(test) .setMaster(local) .set(spark.shuffle.spill.compress, shuffleSpillCompress.toString) .set(spark.shuffle.compress, shuffleCompress.toString) .set(spark.shuffle.memoryFraction, 0.001) resetSparkContext() - sc = new SparkContext(conf) + sc = new SparkContext(myConf) try { sc.parallelize(0 until 10).map(i = (i / 4, i)).groupByKey().collect() } catch { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6765] Fix test code style for graphx.
Repository: spark Updated Branches: refs/heads/master 9d44ddce1 - 8d812f998 [SPARK-6765] Fix test code style for graphx. So we can turn style checker on for test code. Author: Reynold Xin r...@databricks.com Closes #5410 from rxin/test-style-graphx and squashes the following commits: 89e253a [Reynold Xin] [SPARK-6765] Fix test code style for graphx. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d812f99 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d812f99 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d812f99 Branch: refs/heads/master Commit: 8d812f9986f2edf420a18ca822711c9765f480e2 Parents: 9d44ddc Author: Reynold Xin r...@databricks.com Authored: Wed Apr 8 11:31:48 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Wed Apr 8 11:31:48 2015 -0700 -- .../org/apache/spark/graphx/GraphSuite.scala| 71 +++- .../apache/spark/graphx/LocalSparkContext.scala | 2 +- .../apache/spark/graphx/VertexRDDSuite.scala| 26 +++ .../graphx/lib/ConnectedComponentsSuite.scala | 18 ++--- .../apache/spark/graphx/lib/PageRankSuite.scala | 33 - .../lib/StronglyConnectedComponentsSuite.scala | 23 --- 6 files changed, 88 insertions(+), 85 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8d812f99/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 8d15150..a570e4e 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -38,12 +38,12 @@ class GraphSuite extends FunSuite with LocalSparkContext { val doubleRing = ring ++ ring val graph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1) assert(graph.edges.count() === doubleRing.size) - assert(graph.edges.collect.forall(e = e.attr == 1)) + assert(graph.edges.collect().forall(e = e.attr == 1)) // uniqueEdges option should uniquify edges and store duplicate count in edge attributes val uniqueGraph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1, Some(RandomVertexCut)) assert(uniqueGraph.edges.count() === ring.size) - assert(uniqueGraph.edges.collect.forall(e = e.attr == 2)) + assert(uniqueGraph.edges.collect().forall(e = e.attr == 2)) } } @@ -64,7 +64,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { assert( graph.edges.count() === rawEdges.size ) // Vertices not explicitly provided but referenced by edges should be created automatically assert( graph.vertices.count() === 100) - graph.triplets.collect.map { et = + graph.triplets.collect().map { et = assert((et.srcId 10 et.srcAttr) || (et.srcId = 10 !et.srcAttr)) assert((et.dstId 10 et.dstAttr) || (et.dstId = 10 !et.dstAttr)) } @@ -75,15 +75,17 @@ class GraphSuite extends FunSuite with LocalSparkContext { withSpark { sc = val n = 5 val star = starGraph(sc, n) - assert(star.triplets.map(et = (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet === -(1 to n).map(x = (0: VertexId, x: VertexId, v, v)).toSet) + assert(star.triplets.map(et = (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect().toSet +=== (1 to n).map(x = (0: VertexId, x: VertexId, v, v)).toSet) } } test(partitionBy) { withSpark { sc = - def mkGraph(edges: List[(Long, Long)]) = Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0) - def nonemptyParts(graph: Graph[Int, Int]) = { + def mkGraph(edges: List[(Long, Long)]): Graph[Int, Int] = { +Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0) + } + def nonemptyParts(graph: Graph[Int, Int]): RDD[List[Edge[Int]]] = { graph.edges.partitionsRDD.mapPartitions { iter = Iterator(iter.next()._2.iterator.toList) }.filter(_.nonEmpty) @@ -102,7 +104,8 @@ class GraphSuite extends FunSuite with LocalSparkContext { assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count === 1) // partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into // the same partition - assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1) + assert( + nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1) // partitionBy(EdgePartition2D) puts identical edges in the same partition assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count
spark git commit: [SPARK-6767][SQL] Fixed Query DSL error in spark sql Readme
Repository: spark Updated Branches: refs/heads/master 6ada4f6f5 - 2f482d706 [SPARK-6767][SQL] Fixed Query DSL error in spark sql Readme Fixed the following error query.where('key 30).select(avg('key)).collect() console:43: error: value is not a member of Symbol query.where('key 30).select(avg('key)).collect() Author: Tijo Thomas tijopara...@gmail.com Closes #5415 from tijoparacka/ERROR_SQL_DATAFRAME_EXAMPLE and squashes the following commits: 234751e [Tijo Thomas] Fixed Query DSL error in spark sql Readme Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f482d70 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f482d70 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f482d70 Branch: refs/heads/master Commit: 2f482d706b9d38820472c3152dbd1612c98729bd Parents: 6ada4f6 Author: Tijo Thomas tijopara...@gmail.com Authored: Wed Apr 8 13:42:29 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Wed Apr 8 13:42:29 2015 -0700 -- sql/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2f482d70/sql/README.md -- diff --git a/sql/README.md b/sql/README.md index fbb3200..237620e 100644 --- a/sql/README.md +++ b/sql/README.md @@ -56,6 +56,6 @@ res2: Array[org.apache.spark.sql.Row] = Array([238,val_238], [86,val_86], [311,v You can also build further queries on top of these `DataFrames` using the query DSL. ``` -scala query.where('key 30).select(avg('key)).collect() +scala query.where(query(key) 30).select(avg(query(key))).collect() res3: Array[org.apache.spark.sql.Row] = Array([274.79025423728814]) ``` - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6781] [SQL] use sqlContext in python shell
Repository: spark Updated Branches: refs/heads/master 66159c350 - 6ada4f6f5 [SPARK-6781] [SQL] use sqlContext in python shell Use `sqlContext` in PySpark shell, make it consistent with SQL programming guide. `sqlCtx` is also kept for compatibility. Author: Davies Liu dav...@databricks.com Closes #5425 from davies/sqlCtx and squashes the following commits: af67340 [Davies Liu] sqlCtx - sqlContext 15a278f [Davies Liu] use sqlContext in python shell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ada4f6f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ada4f6f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ada4f6f Branch: refs/heads/master Commit: 6ada4f6f52cf1d992c7ab0c32318790cf08b0a0d Parents: 66159c3 Author: Davies Liu dav...@databricks.com Authored: Wed Apr 8 13:31:45 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Wed Apr 8 13:31:45 2015 -0700 -- docs/ml-guide.md| 2 +- docs/sql-programming-guide.md | 4 +- .../apache/spark/examples/sql/JavaSparkSQL.java | 20 ++--- .../ml/simple_text_classification_pipeline.py | 2 +- .../src/main/python/mllib/dataset_example.py| 6 +- python/pyspark/ml/classification.py | 4 +- python/pyspark/ml/feature.py| 4 +- python/pyspark/shell.py | 6 +- python/pyspark/sql/context.py | 79 ++-- python/pyspark/sql/dataframe.py | 6 +- python/pyspark/sql/functions.py | 2 +- python/pyspark/sql/types.py | 4 +- 12 files changed, 69 insertions(+), 70 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6ada4f6f/docs/ml-guide.md -- diff --git a/docs/ml-guide.md b/docs/ml-guide.md index c08c76d..771a071 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -493,7 +493,7 @@ from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.sql import Row, SQLContext sc = SparkContext(appName=SimpleTextClassificationPipeline) -sqlCtx = SQLContext(sc) +sqlContext = SQLContext(sc) # Prepare training documents, which are labeled. LabeledDocument = Row(id, text, label) http://git-wip-us.apache.org/repos/asf/spark/blob/6ada4f6f/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 4441d6a..663f656 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1642,7 +1642,7 @@ moved into the udf object in `SQLContext`. div data-lang=scala markdown=1 {% highlight java %} -sqlCtx.udf.register(strLen, (s: String) = s.length()) +sqlContext.udf.register(strLen, (s: String) = s.length()) {% endhighlight %} /div @@ -1650,7 +1650,7 @@ sqlCtx.udf.register(strLen, (s: String) = s.length()) div data-lang=java markdown=1 {% highlight java %} -sqlCtx.udf().register(strLen, (String s) - { s.length(); }); +sqlContext.udf().register(strLen, (String s) - { s.length(); }); {% endhighlight %} /div http://git-wip-us.apache.org/repos/asf/spark/blob/6ada4f6f/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java index dee7948..8159ffb 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java @@ -55,7 +55,7 @@ public class JavaSparkSQL { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(JavaSparkSQL); JavaSparkContext ctx = new JavaSparkContext(sparkConf); -SQLContext sqlCtx = new SQLContext(ctx); +SQLContext sqlContext = new SQLContext(ctx); System.out.println(=== Data source: RDD ===); // Load a text file and convert each line to a Java Bean. @@ -74,11 +74,11 @@ public class JavaSparkSQL { }); // Apply a schema to an RDD of Java Beans and register it as a table. -DataFrame schemaPeople = sqlCtx.createDataFrame(people, Person.class); +DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class); schemaPeople.registerTempTable(people); // SQL can be run over RDDs that have been registered as tables. -DataFrame teenagers = sqlCtx.sql(SELECT name FROM people WHERE age = 13 AND age = 19); +DataFrame teenagers = sqlContext.sql(SELECT name FROM people WHERE age = 13 AND age = 19); // The results of SQL queries are
spark git commit: [SPARK-6781] [SQL] use sqlContext in python shell
Repository: spark Updated Branches: refs/heads/branch-1.3 3b655680c - e1afd479b [SPARK-6781] [SQL] use sqlContext in python shell Use `sqlContext` in PySpark shell, make it consistent with SQL programming guide. `sqlCtx` is also kept for compatibility. Author: Davies Liu dav...@databricks.com Closes #5425 from davies/sqlCtx and squashes the following commits: af67340 [Davies Liu] sqlCtx - sqlContext 15a278f [Davies Liu] use sqlContext in python shell (cherry picked from commit 6ada4f6f52cf1d992c7ab0c32318790cf08b0a0d) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1afd479 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1afd479 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1afd479 Branch: refs/heads/branch-1.3 Commit: e1afd479b3446483e6e1626afdec549cc214d80e Parents: 3b65568 Author: Davies Liu dav...@databricks.com Authored: Wed Apr 8 13:31:45 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Wed Apr 8 13:32:00 2015 -0700 -- docs/ml-guide.md| 2 +- docs/sql-programming-guide.md | 4 +- .../apache/spark/examples/sql/JavaSparkSQL.java | 20 ++--- .../ml/simple_text_classification_pipeline.py | 2 +- .../src/main/python/mllib/dataset_example.py| 6 +- python/pyspark/ml/classification.py | 4 +- python/pyspark/ml/feature.py| 4 +- python/pyspark/shell.py | 6 +- python/pyspark/sql/context.py | 79 ++-- python/pyspark/sql/dataframe.py | 6 +- python/pyspark/sql/functions.py | 2 +- python/pyspark/sql/types.py | 4 +- 12 files changed, 69 insertions(+), 70 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e1afd479/docs/ml-guide.md -- diff --git a/docs/ml-guide.md b/docs/ml-guide.md index c08c76d..771a071 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -493,7 +493,7 @@ from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.sql import Row, SQLContext sc = SparkContext(appName=SimpleTextClassificationPipeline) -sqlCtx = SQLContext(sc) +sqlContext = SQLContext(sc) # Prepare training documents, which are labeled. LabeledDocument = Row(id, text, label) http://git-wip-us.apache.org/repos/asf/spark/blob/e1afd479/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 4441d6a..663f656 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1642,7 +1642,7 @@ moved into the udf object in `SQLContext`. div data-lang=scala markdown=1 {% highlight java %} -sqlCtx.udf.register(strLen, (s: String) = s.length()) +sqlContext.udf.register(strLen, (s: String) = s.length()) {% endhighlight %} /div @@ -1650,7 +1650,7 @@ sqlCtx.udf.register(strLen, (s: String) = s.length()) div data-lang=java markdown=1 {% highlight java %} -sqlCtx.udf().register(strLen, (String s) - { s.length(); }); +sqlContext.udf().register(strLen, (String s) - { s.length(); }); {% endhighlight %} /div http://git-wip-us.apache.org/repos/asf/spark/blob/e1afd479/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java index dee7948..8159ffb 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java @@ -55,7 +55,7 @@ public class JavaSparkSQL { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(JavaSparkSQL); JavaSparkContext ctx = new JavaSparkContext(sparkConf); -SQLContext sqlCtx = new SQLContext(ctx); +SQLContext sqlContext = new SQLContext(ctx); System.out.println(=== Data source: RDD ===); // Load a text file and convert each line to a Java Bean. @@ -74,11 +74,11 @@ public class JavaSparkSQL { }); // Apply a schema to an RDD of Java Beans and register it as a table. -DataFrame schemaPeople = sqlCtx.createDataFrame(people, Person.class); +DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class); schemaPeople.registerTempTable(people); // SQL can be run over RDDs that have been registered as tables. -DataFrame teenagers = sqlCtx.sql(SELECT name FROM people WHERE age = 13 AND age = 19); +
spark git commit: [SPARK-6767][SQL] Fixed Query DSL error in spark sql Readme
Repository: spark Updated Branches: refs/heads/branch-1.3 e1afd479b - 4453c591a [SPARK-6767][SQL] Fixed Query DSL error in spark sql Readme Fixed the following error query.where('key 30).select(avg('key)).collect() console:43: error: value is not a member of Symbol query.where('key 30).select(avg('key)).collect() Author: Tijo Thomas tijopara...@gmail.com Closes #5415 from tijoparacka/ERROR_SQL_DATAFRAME_EXAMPLE and squashes the following commits: 234751e [Tijo Thomas] Fixed Query DSL error in spark sql Readme (cherry picked from commit 2f482d706b9d38820472c3152dbd1612c98729bd) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4453c591 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4453c591 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4453c591 Branch: refs/heads/branch-1.3 Commit: 4453c591a2eaa9381766f6155bfd3e7749f721e0 Parents: e1afd47 Author: Tijo Thomas tijopara...@gmail.com Authored: Wed Apr 8 13:42:29 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Wed Apr 8 13:43:05 2015 -0700 -- sql/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4453c591/sql/README.md -- diff --git a/sql/README.md b/sql/README.md index a792499..0125de6 100644 --- a/sql/README.md +++ b/sql/README.md @@ -57,6 +57,6 @@ res2: Array[org.apache.spark.sql.Row] = Array([238,val_238], [86,val_86], [311,v You can also build further queries on top of these `DataFrames` using the query DSL. ``` -scala query.where('key 30).select(avg('key)).collect() +scala query.where(query(key) 30).select(avg(query(key))).collect() res3: Array[org.apache.spark.sql.Row] = Array([274.79025423728814]) ``` - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6451][SQL] supported code generation for CombineSum
Repository: spark Updated Branches: refs/heads/master 941828054 - 7d7384c78 [SPARK-6451][SQL] supported code generation for CombineSum Author: Venkata Ramana Gollamudi ramana.gollam...@huawei.com Closes #5138 from gvramana/sum_fix_codegen and squashes the following commits: 95f5fe4 [Venkata Ramana Gollamudi] rebase merge changes 12f45a5 [Venkata Ramana Gollamudi] Combined and added code generations tests as per comment d6a76ac [Venkata Ramana Gollamudi] added support for codegeneration for CombineSum and tests Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d7384c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d7384c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d7384c7 Branch: refs/heads/master Commit: 7d7384c781ea72e1eabab3daca2e237e3b0fc666 Parents: 9418280 Author: Venkata Ramana Gollamudi ramana.gollam...@huawei.com Authored: Wed Apr 8 18:42:34 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Wed Apr 8 18:42:34 2015 -0700 -- .../sql/execution/GeneratedAggregate.scala | 44 +- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala| 92 +++- 3 files changed, 133 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7d7384c7/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index a8018b9..861a2c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -99,7 +99,10 @@ case class GeneratedAggregate( // but really, common sub expression elimination would be better val zero = Cast(Literal(0), calcType) val updateFunction = Coalesce( - Add(Coalesce(currentSum :: zero :: Nil), Cast(expr, calcType)) :: currentSum :: Nil) + Add( +Coalesce(currentSum :: zero :: Nil), +Cast(expr, calcType) + ) :: currentSum :: zero :: Nil) val result = expr.dataType match { case DecimalType.Fixed(_, _) = @@ -109,6 +112,45 @@ case class GeneratedAggregate( AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + case cs @ CombineSum(expr) = +val calcType = expr.dataType + expr.dataType match { +case DecimalType.Fixed(_, _) = + DecimalType.Unlimited +case _ = + expr.dataType + } + +val currentSum = AttributeReference(currentSum, calcType, nullable = true)() +val initialValue = Literal.create(null, calcType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val zero = Cast(Literal(0), calcType) +// If we're evaluating UnscaledValue(x), we can do Count on x directly, since its +// UnscaledValue will be null if and only if x is null; helps with Average on decimals +val actualExpr = expr match { + case UnscaledValue(e) = e + case _ = expr +} +// partial sum result can be null only when no input rows present +val updateFunction = If( + IsNotNull(actualExpr), + Coalesce( +Add( + Coalesce(currentSum :: zero :: Nil), + Cast(expr, calcType)) :: currentSum :: zero :: Nil), + currentSum) + +val result = + expr.dataType match { +case DecimalType.Fixed(_, _) = + Cast(currentSum, cs.dataType) +case _ = currentSum + } + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + case a @ Average(expr) = val calcType = expr.dataType match { http://git-wip-us.apache.org/repos/asf/spark/blob/7d7384c7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f754fa7..23f7e56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -155,7 +155,7 @@ private[sql] abstract
spark git commit: [SPARK-5242]: Add --private-ips flag to EC2 script
Repository: spark Updated Branches: refs/heads/master 2f482d706 - 86403f552 [SPARK-5242]: Add --private-ips flag to EC2 script The `spark_ec2.py` script currently references the `ip_address` and `public_dns_name` attributes of an instance. On private networks, these fields aren't set, so we have problems. This PR introduces a `--private-ips` flag that instead refers to the `private_ip_address` attribute in both cases. Author: Michelangelo D'Agostino mdagost...@civisanalytics.com Closes #5244 from mdagost/ec2_private_nets and squashes the following commits: b684c67 [Michelangelo D'Agostino] STY: A few python lint changes. a4a2eac [Michelangelo D'Agostino] ENH: Fix IP's typo and refactor conditional logic into functions. c004604 [Michelangelo D'Agostino] ENH: Add --private-ips flag. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86403f55 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86403f55 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86403f55 Branch: refs/heads/master Commit: 86403f5525782bc9656ab11790f7020baa6b2c1f Parents: 2f482d7 Author: Michelangelo D'Agostino mdagost...@civisanalytics.com Authored: Wed Apr 8 16:48:45 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Wed Apr 8 16:48:45 2015 -0400 -- ec2/spark_ec2.py | 64 +-- 1 file changed, 47 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/86403f55/ec2/spark_ec2.py -- diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 879a52c..0c1f247 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -282,6 +282,10 @@ def parse_args(): parser.add_option( --vpc-id, default=None, help=VPC to launch instances in) +parser.add_option( +--private-ips, action=store_true, default=False, +help=Use private IPs for instances rather than public if VPC/subnet + + requires that.) (opts, args) = parser.parse_args() if len(args) != 2: @@ -707,7 +711,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): -master = master_nodes[0].public_dns_name +master = get_dns_name(master_nodes[0], opts.private_ips) if deploy_ssh_key: print Generating cluster's SSH key on master... key_setup = @@ -719,8 +723,9 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) print Transferring cluster's SSH key to slaves... for slave in slave_nodes: -print slave.public_dns_name -ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar) +slave_address = get_dns_name(slave, opts.private_ips) +print slave_address +ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar) modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs', 'mapreduce', 'spark-standalone', 'tachyon'] @@ -809,7 +814,8 @@ def is_cluster_ssh_available(cluster_instances, opts): Check if SSH is available on all the instances in a cluster. for i in cluster_instances: -if not is_ssh_available(host=i.public_dns_name, opts=opts): +dns_name = get_dns_name(i, opts.private_ips) +if not is_ssh_available(host=dns_name, opts=opts): return False else: return True @@ -923,7 +929,7 @@ def get_num_disks(instance_type): # # root_dir should be an absolute path to the directory with the files we want to deploy. def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): -active_master = master_nodes[0].public_dns_name +active_master = get_dns_name(master_nodes[0], opts.private_ips) num_disks = get_num_disks(opts.instance_type) hdfs_data_dirs = /mnt/ephemeral-hdfs/data @@ -948,10 +954,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): print Deploying Spark via git hash; Tachyon won't be set up modules = filter(lambda x: x != tachyon, modules) +master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes] +slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes] template_vars = { -master_list: '\n'.join([i.public_dns_name for i in master_nodes]), +master_list: '\n'.join(master_addresses), active_master: active_master, -slave_list: '\n'.join([i.public_dns_name for i in slave_nodes]), +slave_list:
[5/7] spark git commit: [SPARK-5654] Integrate SparkR
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/RDD.R -- diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R new file mode 100644 index 000..604ad03 --- /dev/null +++ b/R/pkg/R/RDD.R @@ -0,0 +1,1539 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# RDD in R implemented in S4 OO system. + +setOldClass(jobj) + +#' @title S4 class that represents an RDD +#' @description RDD can be created using functions like +#' \code{parallelize}, \code{textFile} etc. +#' @rdname RDD +#' @seealso parallelize, textFile +#' +#' @slot env An R environment that stores bookkeeping states of the RDD +#' @slot jrdd Java object reference to the backing JavaRDD +#' to an RDD +#' @export +setClass(RDD, + slots = list(env = environment, + jrdd = jobj)) + +setClass(PipelinedRDD, + slots = list(prev = RDD, + func = function, + prev_jrdd = jobj), + contains = RDD) + +setMethod(initialize, RDD, function(.Object, jrdd, serializedMode, +isCached, isCheckpointed) { + # Check that RDD constructor is using the correct version of serializedMode + stopifnot(class(serializedMode) == character) + stopifnot(serializedMode %in% c(byte, string, row)) + # RDD has three serialization types: + # byte: The RDD stores data serialized in R. + # string: The RDD stores data as strings. + # row: The RDD stores the serialized rows of a DataFrame. + + # We use an environment to store mutable states inside an RDD object. + # Note that R's call-by-value semantics makes modifying slots inside an + # object (passed as an argument into a function, such as cache()) difficult: + # i.e. one needs to make a copy of the RDD object and sets the new slot value + # there. + + # The slots are inheritable from superclass. Here, both `env' and `jrdd' are + # inherited from RDD, but only the former is used. + .Object@env - new.env() + .Object@env$isCached - isCached + .Object@env$isCheckpointed - isCheckpointed + .Object@env$serializedMode - serializedMode + + .Object@jrdd - jrdd + .Object +}) + +setMethod(initialize, PipelinedRDD, function(.Object, prev, func, jrdd_val) { + .Object@env - new.env() + .Object@env$isCached - FALSE + .Object@env$isCheckpointed - FALSE + .Object@env$jrdd_val - jrdd_val + if (!is.null(jrdd_val)) { +# This tracks the serialization mode for jrdd_val +.Object@env$serializedMode - prev@env$serializedMode + } + + .Object@prev - prev + + isPipelinable - function(rdd) { +e - rdd@env +!(e$isCached || e$isCheckpointed) + } + + if (!inherits(prev, PipelinedRDD) || !isPipelinable(prev)) { +# This transformation is the first in its stage: +.Object@func - func +.Object@prev_jrdd - getJRDD(prev) +.Object@env$prev_serializedMode - prev@env$serializedMode +# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD +# prev_serializedMode is used during the delayed computation of JRDD in getJRDD + } else { +pipelinedFunc - function(split, iterator) { + func(split, prev@func(split, iterator)) +} +.Object@func - pipelinedFunc +.Object@prev_jrdd - prev@prev_jrdd # maintain the pipeline +# Get the serialization mode of the parent RDD +.Object@env$prev_serializedMode - prev@env$prev_serializedMode + } + + .Object +}) + +#' @rdname RDD +#' @export +#' +#' @param jrdd Java object reference to the backing JavaRDD +#' @param serializedMode Use byte if the RDD stores data serialized in R, string if the RDD +#' stores strings, and row if the RDD stores the rows of a DataFrame +#' @param isCached TRUE if the RDD is cached +#' @param isCheckpointed TRUE if the RDD has been checkpointed +RDD - function(jrdd, serializedMode = byte, isCached = FALSE, +isCheckpointed = FALSE) { + new(RDD, jrdd, serializedMode, isCached, isCheckpointed) +} + +PipelinedRDD - function(prev, func) { + new(PipelinedRDD, prev, func, NULL) +} + +# Return the serialization mode for an RDD. +setGeneric(getSerializedMode, function(rdd, ...) { standardGeneric(getSerializedMode) }) +# For normal RDDs we can
[2/2] spark git commit: [SPARK-6765] Fix test code style for SQL
[SPARK-6765] Fix test code style for SQL So we can turn style checker on for test code. Author: Reynold Xin r...@databricks.com Closes #5412 from rxin/test-style-sql and squashes the following commits: 9098a31 [Reynold Xin] One more compilation error ... 8c7250a [Reynold Xin] Fix compilation. 82d0944 [Reynold Xin] Indentation. 0b03fbb [Reynold Xin] code review. f2f4348 [Reynold Xin] oops. ef4ec48 [Reynold Xin] Hive module. 7e0db5e [Reynold Xin] sql module 04ec7ac [Reynold Xin] catalyst module Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b2aab8d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b2aab8d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b2aab8d Branch: refs/heads/master Commit: 1b2aab8d5b9cc2ff702506038bd71aa8debe7ca0 Parents: 891ada5 Author: Reynold Xin r...@databricks.com Authored: Wed Apr 8 20:35:29 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Wed Apr 8 20:35:29 2015 -0700 -- .../spark/sql/catalyst/DistributionSuite.scala | 3 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 10 +- .../analysis/HiveTypeCoercionSuite.scala| 8 +- .../expressions/ExpressionEvaluationSuite.scala | 134 +-- .../optimizer/ConstantFoldingSuite.scala| 51 --- .../optimizer/FilterPushdownSuite.scala | 3 +- .../catalyst/optimizer/OptimizeInSuite.scala| 2 +- .../spark/sql/catalyst/plans/PlanTest.scala | 5 +- .../sql/catalyst/plans/SameResultSuite.scala| 2 +- .../sql/catalyst/trees/TreeNodeSuite.scala | 8 +- .../org/apache/spark/sql/CachedTableSuite.scala | 3 +- .../org/apache/spark/sql/DataFrameSuite.scala | 3 +- .../scala/org/apache/spark/sql/QueryTest.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala| 30 +++-- .../sql/ScalaReflectionRelationSuite.scala | 5 +- .../apache/spark/sql/UserDefinedTypeSuite.scala | 2 +- .../spark/sql/columnar/ColumnarTestUtils.scala | 4 +- .../columnar/NullableColumnAccessorSuite.scala | 3 +- .../columnar/NullableColumnBuilderSuite.scala | 3 +- .../TestCompressibleColumnBuilder.scala | 2 +- .../sql/execution/debug/DebuggingSuite.scala| 2 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 98 +++--- .../org/apache/spark/sql/json/JsonSuite.scala | 17 ++- .../spark/sql/parquet/ParquetIOSuite.scala | 2 +- .../spark/sql/parquet/ParquetSchemaSuite.scala | 2 + .../apache/spark/sql/sources/DDLTestSuite.scala | 8 +- .../spark/sql/sources/FilteredScanSuite.scala | 3 +- .../spark/sql/sources/PrunedScanSuite.scala | 5 +- .../spark/sql/sources/SaveLoadSuite.scala | 2 +- .../spark/sql/sources/TableScanSuite.scala | 9 +- .../spark/sql/hive/ErrorPositionSuite.scala | 2 +- .../spark/sql/hive/HiveInspectorSuite.scala | 33 +++-- .../sql/hive/InsertIntoHiveTableSuite.scala | 57 ++-- .../apache/spark/sql/hive/StatisticsSuite.scala | 2 +- .../hive/execution/BigDataBenchmarkSuite.scala | 12 +- .../sql/hive/execution/HiveComparisonTest.scala | 27 ++-- .../sql/hive/execution/HiveQueryFileTest.scala | 11 +- .../sql/hive/execution/HiveQuerySuite.scala | 13 +- .../hive/execution/HiveResolutionSuite.scala| 3 +- .../sql/hive/execution/HiveSerDeSuite.scala | 3 +- .../hive/execution/HiveTypeCoercionSuite.scala | 6 +- .../spark/sql/hive/execution/HiveUdfSuite.scala | 16 ++- .../spark/sql/hive/execution/PruningSuite.scala | 2 +- .../sql/hive/execution/SQLQuerySuite.scala | 4 +- .../apache/spark/sql/hive/parquetSuites.scala | 7 +- 45 files changed, 395 insertions(+), 234 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1b2aab8d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala index 46b2250..ea82cd2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala @@ -30,7 +30,7 @@ class DistributionSuite extends FunSuite { inputPartitioning: Partitioning, requiredDistribution: Distribution, satisfied: Boolean) { -if (inputPartitioning.satisfies(requiredDistribution) != satisfied) +if (inputPartitioning.satisfies(requiredDistribution) != satisfied) { fail( s |== Input Partitioning == @@ -40,6 +40,7 @@ class DistributionSuite extends FunSuite { |== Does input partitioning satisfy required distribution? ==
spark git commit: [SPARK-6696] [SQL] Adds HiveContext.refreshTable to PySpark
Repository: spark Updated Branches: refs/heads/master 7d7384c78 - 891ada5be [SPARK-6696] [SQL] Adds HiveContext.refreshTable to PySpark !-- Reviewable:start -- [img src=https://reviewable.io/review_button.png; height=40 alt=Review on Reviewable/](https://reviewable.io/reviews/apache/spark/5349) !-- Reviewable:end -- Author: Cheng Lian l...@databricks.com Closes #5349 from liancheng/py-refresh-table and squashes the following commits: 004bec0 [Cheng Lian] Adds HiveContext.refreshTable to PySpark Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/891ada5b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/891ada5b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/891ada5b Branch: refs/heads/master Commit: 891ada5be1e7fdd796380e2626d80843f2ef6017 Parents: 7d7384c Author: Cheng Lian l...@databricks.com Authored: Wed Apr 8 18:47:39 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Wed Apr 8 18:47:39 2015 -0700 -- python/pyspark/sql/context.py | 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/891ada5b/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 93e2d17..e8529a8 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -574,6 +574,15 @@ class HiveContext(SQLContext): def _get_hive_ctx(self): return self._jvm.HiveContext(self._jsc.sc()) +def refreshTable(self, tableName): +Invalidate and refresh all the cached the metadata of the given +table. For performance reasons, Spark SQL or the external data source +library it uses might cache certain metadata about a table, such as the +location of blocks. When those change outside of Spark SQL, users should +call this function to invalidate the cache. + +self._ssql_ctx.refreshTable(tableName) + class UDFRegistration(object): Wrapper for user-defined function registration. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[7/7] spark git commit: [SPARK-5654] Integrate SparkR
[SPARK-5654] Integrate SparkR This pull requests integrates SparkR, an R frontend for Spark. The SparkR package contains both RDD and DataFrame APIs in R and is integrated with Spark's submission scripts to work on different cluster managers. Some integration points that would be great to get feedback on: 1. Build procedure: SparkR requires R to be installed on the machine to be built. Right now we have a new Maven profile `-PsparkR` that can be used to enable SparkR builds 2. YARN cluster mode: The R package that is built needs to be present on the driver and all the worker nodes during execution. The R package location is currently set using SPARK_HOME, but this might not work on YARN cluster mode. The SparkR package represents the work of many contributors and attached below is a list of people along with areas they worked on edwardt (edwart) - Documentation improvements Felix Cheung (felixcheung) - Documentation improvements Hossein Falaki (falaki) - Documentation improvements Chris Freeman (cafreeman) - DataFrame API, Programming Guide Todd Gao (7c00) - R worker Internals Ryan Hafen (hafen) - SparkR Internals Qian Huang (hqzizania) - RDD API Hao Lin (hlin09) - RDD API, Closure cleaner Evert Lammerts (evertlammerts) - DataFrame API Davies Liu (davies) - DataFrame API, R worker internals, Merging with Spark Yi Lu (lythesia) - RDD API, Worker internals Matt Massie (massie) - Jenkins build Harihar Nahak (hnahak87) - SparkR examples Oscar Olmedo (oscaroboto) - Spark configuration Antonio Piccolboni (piccolbo) - SparkR examples, Namespace bug fixes Dan Putler (dputler) - Dataframe API, SparkR Install Guide Ashutosh Raina (ashutoshraina) - Build improvements Josh Rosen (joshrosen) - Travis CI build Sun Rui (sun-rui)- RDD API, JVM Backend, Shuffle improvements Shivaram Venkataraman (shivaram) - RDD API, JVM Backend, Worker Internals Zongheng Yang (concretevitamin) - RDD API, Pipelined RDDs, Examples and EC2 guide Author: Shivaram Venkataraman shiva...@cs.berkeley.edu Author: Shivaram Venkataraman shivaram.venkatara...@gmail.com Author: Zongheng Yang zonghen...@gmail.com Author: cafreeman cfree...@alteryx.com Author: Shivaram Venkataraman shiva...@eecs.berkeley.edu Author: Davies Liu dav...@databricks.com Author: Davies Liu davies@gmail.com Author: hlin09 hlin0...@gmail.com Author: Sun Rui rui@intel.com Author: lythesia iranaik...@gmail.com Author: oscaroboto osca...@gmail.com Author: Antonio Piccolboni anto...@piccolboni.info Author: root edward Author: edwardt edwardt.t...@gmail.com Author: hqzizania qian.hu...@intel.com Author: dputler dan.put...@gmail.com Author: Todd Gao todd.gao.2...@gmail.com Author: Chris Freeman cfree...@alteryx.com Author: Felix Cheung fcheung@AVVOMAC-119.local Author: Hossein hoss...@databricks.com Author: Evert Lammerts ev...@apache.org Author: Felix Cheung fche...@avvomac-119.t-mobile.com Author: felixcheung felixcheun...@hotmail.com Author: Ryan Hafen rha...@gmail.com Author: Ashutosh Raina ashutoshra...@users.noreply.github.com Author: Oscar Olmedo osca...@gmail.com Author: Josh Rosen rosenvi...@gmail.com Author: Yi Lu iranaik...@gmail.com Author: Harihar Nahak hnaha...@users.noreply.github.com Closes #5096 from shivaram/R and squashes the following commits: da64742 [Davies Liu] fix Date serialization 59266d1 [Davies Liu] check exclusive of primary-py-file and primary-r-file 55808e4 [Davies Liu] fix tests 5581c75 [Davies Liu] update author of SparkR f731b48 [Shivaram Venkataraman] Only run SparkR tests if R is installed 64eda24 [Shivaram Venkataraman] Merge branch 'R' of https://github.com/amplab-extras/spark into R d7c3f22 [Shivaram Venkataraman] Address code review comments Changes include 1. Adding SparkR docs to API docs generated 2. Style fixes in SparkR scala files 3. Clean up of shell scripts and explanation of install-dev.sh 377151f [Shivaram Venkataraman] Merge remote-tracking branch 'apache/master' into R eb5da53 [Shivaram Venkataraman] Merge pull request #3 from davies/R2 a18ff5c [Davies Liu] Update sparkR.R 5133f3a [Shivaram Venkataraman] Merge pull request #7 from hqzizania/R3 940b631 [hqzizania] [SPARKR-92] Phase 2: implement sum(rdd) 0e788c0 [Shivaram Venkataraman] Merge pull request #5 from hlin09/doc-fix 3487461 [hlin09] Add tests log in .gitignore. 1d1802e [Shivaram Venkataraman] Merge pull request #4 from felixcheung/r-require 11981b7 [felixcheung] Update R to fail early if SparkR package is missing c300e08 [Davies Liu] remove duplicated file b045701 [Davies Liu] Merge branch 'remote_r' into R 19c9368 [Davies Liu] Merge branch 'sparkr-sql' of github.com:amplab-extras/SparkR-pkg into remote_r f8fa8af [Davies Liu] mute logging when start/stop context e7104b6 [Davies Liu] remove ::: in SparkR a1777eb [Davies Liu] move rules into R/.gitignore e88b649 [Davies Liu] Merge branch 'R' of github.com:amplab-extras/spark into R 6e20e71 [Davies Liu] address comments b433817 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R
[1/2] spark git commit: [SPARK-6765] Fix test code style for SQL
Repository: spark Updated Branches: refs/heads/master 891ada5be - 1b2aab8d5 http://git-wip-us.apache.org/repos/asf/spark/blob/1b2aab8d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala index 02518d5..f7b37da 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala @@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.util._ /** * A framework for running the query tests that are listed as a set of text files. * - * TestSuites that derive from this class must provide a map of testCaseName - testCaseFiles that should be included. - * Additionally, there is support for whitelisting and blacklisting tests as development progresses. + * TestSuites that derive from this class must provide a map of testCaseName - testCaseFiles + * that should be included. Additionally, there is support for whitelisting and blacklisting + * tests as development progresses. */ abstract class HiveQueryFileTest extends HiveComparisonTest { /** A list of tests deemed out of scope and thus completely disregarded */ @@ -54,15 +55,17 @@ abstract class HiveQueryFileTest extends HiveComparisonTest { case (testCaseName, testCaseFile) = if (blackList.map(_.r.pattern.matcher(testCaseName).matches()).reduceLeft(_||_)) { logDebug(sBlacklisted test skipped $testCaseName) - } else if (realWhiteList.map(_.r.pattern.matcher(testCaseName).matches()).reduceLeft(_||_) || runAll) { + } else if (realWhiteList.map(_.r.pattern.matcher(testCaseName).matches()).reduceLeft(_||_) || +runAll) { // Build a test case and submit it to scala test framework... val queriesString = fileToString(testCaseFile) createQueryTest(testCaseName, queriesString) } else { // Only output warnings for the built in whitelist as this clutters the output when the user // trying to execute a single test from the commandline. -if(System.getProperty(whiteListProperty) == null !runAll) +if (System.getProperty(whiteListProperty) == null !runAll) { ignore(testCaseName) {} +} } } } http://git-wip-us.apache.org/repos/asf/spark/blob/1b2aab8d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index de140fc..af781a5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -37,7 +37,8 @@ import org.apache.spark.sql.hive.test.TestHive._ case class TestData(a: Int, b: String) /** - * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. + * A set of test cases expressed in Hive QL that are not covered by the tests + * included in the hive distribution. */ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { private val originalTimeZone = TimeZone.getDefault @@ -237,7 +238,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } createQueryTest(modulus, -SELECT 11 % 10, IF((101.1 % 100.0) BETWEEN 1.01 AND 1.11, \true\, \false\), (101 / 2) % 10 FROM src LIMIT 1) +SELECT 11 % 10, IF((101.1 % 100.0) BETWEEN 1.01 AND 1.11, \true\, \false\), + + (101 / 2) % 10 FROM src LIMIT 1) test(Query expressed in SQL) { setConf(spark.sql.dialect, sql) @@ -309,7 +311,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { SELECT * FROM src a JOIN src b ON a.key = b.key) createQueryTest(small.cartesian, -SELECT a.key, b.key FROM (SELECT key FROM src WHERE key 1) a JOIN (SELECT key FROM src WHERE key = 2) b) +SELECT a.key, b.key FROM (SELECT key FROM src WHERE key 1) a JOIN + + (SELECT key FROM src WHERE key = 2) b) createQueryTest(length.udf, SELECT length(\test\) FROM src LIMIT 1) @@ -457,6 +460,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { createQueryTest(lateral view3, FROM src SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX) + // scalastyle:off createQueryTest(lateral view4, |create table src_lv1 (key string, value string); @@ -466,6 +470,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { |insert overwrite table src_lv1 SELECT key,
spark git commit: [SQL][minor] remove duplicated resolveGetField and update comment
Repository: spark Updated Branches: refs/heads/master 55a92ef34 - 941828054 [SQL][minor] remove duplicated resolveGetField and update comment It's after https://github.com/apache/spark/pull/5189 Author: Wenchen Fan cloud0...@outlook.com Closes #5304 from cloud-fan/tmp and squashes the following commits: c58c9b3 [Wenchen Fan] remove duplicated code and update comment Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94182805 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94182805 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94182805 Branch: refs/heads/master Commit: 9418280547f962eaf309bfff9986cdd848409643 Parents: 55a92ef Author: Wenchen Fan cloud0...@outlook.com Authored: Wed Apr 8 13:57:01 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Wed Apr 8 13:57:01 2015 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 32 +--- .../catalyst/plans/logical/LogicalPlan.scala| 13 +++- 2 files changed, 6 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/94182805/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 119cb9c..b3aba4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -293,7 +293,7 @@ class Analyzer( logDebug(sResolving $u to $result) result case UnresolvedGetField(child, fieldName) if child.resolved = -resolveGetField(child, fieldName) +q.resolveGetField(child, fieldName, resolver) } } @@ -313,36 +313,6 @@ class Analyzer( */ protected def containsStar(exprs: Seq[Expression]): Boolean = exprs.exists(_.collect { case _: Star = true }.nonEmpty) - -/** - * Returns the resolved `GetField`, and report error if no desired field or over one - * desired fields are found. - */ -protected def resolveGetField(expr: Expression, fieldName: String): Expression = { - def findField(fields: Array[StructField]): Int = { -val checkField = (f: StructField) = resolver(f.name, fieldName) -val ordinal = fields.indexWhere(checkField) -if (ordinal == -1) { - throw new AnalysisException( -sNo such struct field $fieldName in ${fields.map(_.name).mkString(, )}) -} else if (fields.indexWhere(checkField, ordinal + 1) != -1) { - throw new AnalysisException( -sAmbiguous reference to fields ${fields.filter(checkField).mkString(, )}) -} else { - ordinal -} - } - expr.dataType match { -case StructType(fields) = - val ordinal = findField(fields) - StructGetField(expr, fields(ordinal), ordinal) -case ArrayType(StructType(fields), containsNull) = - val ordinal = findField(fields) - ArrayGetField(expr, fields(ordinal), ordinal, containsNull) -case otherType = - throw new AnalysisException(sGetField is not valid on fields of type $otherType) - } -} } /** http://git-wip-us.apache.org/repos/asf/spark/blob/94182805/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 2e9f3aa..d8f5858 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -205,11 +205,10 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { // One match, but we also need to extract the requested nested field. case Seq((a, nestedFields)) = try { - - // The foldLeft adds UnresolvedGetField for every remaining parts of the name, - // and aliased it with the last part of the name. - // For example, consider name a.b.c, where a is resolved to an existing attribute. - // Then this will add UnresolvedGetField(b) and UnresolvedGetField(c), and alias + // The foldLeft adds GetFields for every remaining parts of the identifier, + // and aliases it with the last part of the
[6/7] spark git commit: [SPARK-5654] Integrate SparkR
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE new file mode 100644 index 000..a354cdc --- /dev/null +++ b/R/pkg/NAMESPACE @@ -0,0 +1,182 @@ +#exportPattern(^[[:alpha:]]+) +exportClasses(RDD) +exportClasses(Broadcast) +exportMethods( + aggregateByKey, + aggregateRDD, + cache, + checkpoint, + coalesce, + cogroup, + collect, + collectAsMap, + collectPartition, + combineByKey, + count, + countByKey, + countByValue, + distinct, + Filter, + filterRDD, + first, + flatMap, + flatMapValues, + fold, + foldByKey, + foreach, + foreachPartition, + fullOuterJoin, + glom, + groupByKey, + join, + keyBy, + keys, + length, + lapply, + lapplyPartition, + lapplyPartitionsWithIndex, + leftOuterJoin, + lookup, + map, + mapPartitions, + mapPartitionsWithIndex, + mapValues, + maximum, + minimum, + numPartitions, + partitionBy, + persist, + pipeRDD, + reduce, + reduceByKey, + reduceByKeyLocally, + repartition, + rightOuterJoin, + sampleRDD, + saveAsTextFile, + saveAsObjectFile, + sortBy, + sortByKey, + sumRDD, + take, + takeOrdered, + takeSample, + top, + unionRDD, + unpersist, + value, + values, + zipRDD, + zipWithIndex, + zipWithUniqueId + ) + +# S3 methods exported +export( + textFile, + objectFile, + parallelize, + hashCode, + includePackage, + broadcast, + setBroadcastValue, + setCheckpointDir + ) +export(sparkR.init) +export(sparkR.stop) +export(print.jobj) +useDynLib(SparkR, stringHashCode) +importFrom(methods, setGeneric, setMethod, setOldClass) + +# SparkRSQL + +exportClasses(DataFrame) + +exportMethods(columns, + distinct, + dtypes, + explain, + filter, + groupBy, + head, + insertInto, + intersect, + isLocal, + limit, + orderBy, + names, + printSchema, + registerTempTable, + repartition, + sampleDF, + saveAsParquetFile, + saveAsTable, + saveDF, + schema, + select, + selectExpr, + show, + showDF, + sortDF, + subtract, + toJSON, + toRDD, + unionAll, + where, + withColumn, + withColumnRenamed) + +exportClasses(Column) + +exportMethods(abs, + alias, + approxCountDistinct, + asc, + avg, + cast, + contains, + countDistinct, + desc, + endsWith, + getField, + getItem, + isNotNull, + isNull, + last, + like, + lower, + max, + mean, + min, + rlike, + sqrt, + startsWith, + substr, + sum, + sumDistinct, + upper) + +exportClasses(GroupedData) +exportMethods(agg) + +export(sparkRSQL.init, + sparkRHive.init) + +export(cacheTable, + clearCache, + createDataFrame, + createExternalTable, + dropTempTable, + jsonFile, + jsonRDD, + loadDF, + parquetFile, + sql, + table, + tableNames, + tables, + toDF, + uncacheTable) + +export(print.structType, + print.structField) http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R new file mode 100644 index 000..feafd56 --- /dev/null +++ b/R/pkg/R/DataFrame.R @@ -0,0 +1,1270 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional
[4/7] spark git commit: [SPARK-5654] Integrate SparkR
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/context.R -- diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R new file mode 100644 index 000..2fc0bb2 --- /dev/null +++ b/R/pkg/R/context.R @@ -0,0 +1,225 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# context.R: SparkContext driven functions + +getMinSplits - function(sc, minSplits) { + if (is.null(minSplits)) { +defaultParallelism - callJMethod(sc, defaultParallelism) +minSplits - min(defaultParallelism, 2) + } + as.integer(minSplits) +} + +#' Create an RDD from a text file. +#' +#' This function reads a text file from HDFS, a local file system (available on all +#' nodes), or any Hadoop-supported file system URI, and creates an +#' RDD of strings from it. +#' +#' @param sc SparkContext to use +#' @param path Path of file to read. A vector of multiple paths is allowed. +#' @param minSplits Minimum number of splits to be created. If NULL, the default +#' value is chosen based on available parallelism. +#' @return RDD where each item is of type \code{character} +#' @export +#' @examples +#'\dontrun{ +#' sc - sparkR.init() +#' lines - textFile(sc, myfile.txt) +#'} +textFile - function(sc, path, minSplits = NULL) { + # Allow the user to have a more flexible definiton of the text file path + path - suppressWarnings(normalizePath(path)) + #' Convert a string vector of paths to a string containing comma separated paths + path - paste(path, collapse = ,) + + jrdd - callJMethod(sc, textFile, path, getMinSplits(sc, minSplits)) + # jrdd is of type JavaRDD[String] + RDD(jrdd, string) +} + +#' Load an RDD saved as a SequenceFile containing serialized objects. +#' +#' The file to be loaded should be one that was previously generated by calling +#' saveAsObjectFile() of the RDD class. +#' +#' @param sc SparkContext to use +#' @param path Path of file to read. A vector of multiple paths is allowed. +#' @param minSplits Minimum number of splits to be created. If NULL, the default +#' value is chosen based on available parallelism. +#' @return RDD containing serialized R objects. +#' @seealso saveAsObjectFile +#' @export +#' @examples +#'\dontrun{ +#' sc - sparkR.init() +#' rdd - objectFile(sc, myfile) +#'} +objectFile - function(sc, path, minSplits = NULL) { + # Allow the user to have a more flexible definiton of the text file path + path - suppressWarnings(normalizePath(path)) + #' Convert a string vector of paths to a string containing comma separated paths + path - paste(path, collapse = ,) + + jrdd - callJMethod(sc, objectFile, path, getMinSplits(sc, minSplits)) + # Assume the RDD contains serialized R objects. + RDD(jrdd, byte) +} + +#' Create an RDD from a homogeneous list or vector. +#' +#' This function creates an RDD from a local homogeneous list in R. The elements +#' in the list are split into \code{numSlices} slices and distributed to nodes +#' in the cluster. +#' +#' @param sc SparkContext to use +#' @param coll collection to parallelize +#' @param numSlices number of partitions to create in the RDD +#' @return an RDD created from this collection +#' @export +#' @examples +#'\dontrun{ +#' sc - sparkR.init() +#' rdd - parallelize(sc, 1:10, 2) +#' # The RDD should contain 10 elements +#' length(rdd) +#'} +parallelize - function(sc, coll, numSlices = 1) { + # TODO: bound/safeguard numSlices + # TODO: unit tests for if the split works for all primitives + # TODO: support matrix, data frame, etc + if ((!is.list(coll) !is.vector(coll)) || is.data.frame(coll)) { +if (is.data.frame(coll)) { + message(paste(context.R: A data frame is parallelized by columns.)) +} else { + if (is.matrix(coll)) { +message(paste(context.R: A matrix is parallelized by elements.)) + } else { +message(paste(context.R: parallelize() currently only supports lists and vectors., + Calling as.list() to coerce coll into a list.)) + } +} +coll - as.list(coll) + } + + if (numSlices length(coll)) +numSlices - length(coll) + + sliceLen - ceiling(length(coll) / numSlices) + slices - split(coll, rep(1:(numSlices + 1), each =
[1/7] spark git commit: [SPARK-5654] Integrate SparkR
Repository: spark Updated Branches: refs/heads/master 1b2aab8d5 - 2fe0a1aae http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/core/src/main/scala/org/apache/spark/api/r/SerDe.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala new file mode 100644 index 000..ccb2a37 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.r + +import java.io.{DataInputStream, DataOutputStream} +import java.sql.{Date, Time} + +import scala.collection.JavaConversions._ + +/** + * Utility functions to serialize, deserialize objects to / from R + */ +private[spark] object SerDe { + + // Type mapping from R to Java + // + // NULL - void + // integer - Int + // character - String + // logical - Boolean + // double, numeric - Double + // raw - Array[Byte] + // Date - Date + // POSIXlt/POSIXct - Time + // + // list[T] - Array[T], where T is one of above mentioned types + // environment - Map[String, T], where T is a native type + // jobj - Object, where jobj is an object created in the backend + + def readObjectType(dis: DataInputStream): Char = { +dis.readByte().toChar + } + + def readObject(dis: DataInputStream): Object = { +val dataType = readObjectType(dis) +readTypedObject(dis, dataType) + } + + def readTypedObject( + dis: DataInputStream, + dataType: Char): Object = { +dataType match { + case 'n' = null + case 'i' = new java.lang.Integer(readInt(dis)) + case 'd' = new java.lang.Double(readDouble(dis)) + case 'b' = new java.lang.Boolean(readBoolean(dis)) + case 'c' = readString(dis) + case 'e' = readMap(dis) + case 'r' = readBytes(dis) + case 'l' = readList(dis) + case 'D' = readDate(dis) + case 't' = readTime(dis) + case 'j' = JVMObjectTracker.getObject(readString(dis)) + case _ = throw new IllegalArgumentException(sInvalid type $dataType) +} + } + + def readBytes(in: DataInputStream): Array[Byte] = { +val len = readInt(in) +val out = new Array[Byte](len) +val bytesRead = in.readFully(out) +out + } + + def readInt(in: DataInputStream): Int = { +in.readInt() + } + + def readDouble(in: DataInputStream): Double = { +in.readDouble() + } + + def readString(in: DataInputStream): String = { +val len = in.readInt() +val asciiBytes = new Array[Byte](len) +in.readFully(asciiBytes) +assert(asciiBytes(len - 1) == 0) +val str = new String(asciiBytes.dropRight(1).map(_.toChar)) +str + } + + def readBoolean(in: DataInputStream): Boolean = { +val intVal = in.readInt() +if (intVal == 0) false else true + } + + def readDate(in: DataInputStream): Date = { +Date.valueOf(readString(in)) + } + + def readTime(in: DataInputStream): Time = { +val t = in.readDouble() +new Time((t * 1000L).toLong) + } + + def readBytesArr(in: DataInputStream): Array[Array[Byte]] = { +val len = readInt(in) +(0 until len).map(_ = readBytes(in)).toArray + } + + def readIntArr(in: DataInputStream): Array[Int] = { +val len = readInt(in) +(0 until len).map(_ = readInt(in)).toArray + } + + def readDoubleArr(in: DataInputStream): Array[Double] = { +val len = readInt(in) +(0 until len).map(_ = readDouble(in)).toArray + } + + def readBooleanArr(in: DataInputStream): Array[Boolean] = { +val len = readInt(in) +(0 until len).map(_ = readBoolean(in)).toArray + } + + def readStringArr(in: DataInputStream): Array[String] = { +val len = readInt(in) +(0 until len).map(_ = readString(in)).toArray + } + + def readList(dis: DataInputStream): Array[_] = { +val arrType = readObjectType(dis) +arrType match { + case 'i' = readIntArr(dis) + case 'c' = readStringArr(dis) + case 'd' = readDoubleArr(dis) + case 'b' = readBooleanArr(dis) + case 'j' = readStringArr(dis).map(x = JVMObjectTracker.getObject(x)) + case 'r' = readBytesArr(dis) + case _ = throw new