git commit: SPARK-2738. Remove redundant imports in BlockManagerSuite
Repository: spark Updated Branches: refs/heads/master 149910111 - cb9e7d5af SPARK-2738. Remove redundant imports in BlockManagerSuite Author: Sandy Ryza sa...@cloudera.com Closes #1642 from sryza/sandy-spark-2738 and squashes the following commits: a923e4e [Sandy Ryza] SPARK-2738. Remove redundant imports in BlockManagerSuite Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb9e7d5a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb9e7d5a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb9e7d5a Branch: refs/heads/master Commit: cb9e7d5aff2ce9cb501a2825651224311263ce20 Parents: 1499101 Author: Sandy Ryza sa...@cloudera.com Authored: Thu Jul 31 23:12:38 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Thu Jul 31 23:12:38 2014 -0700 -- .../test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 3 --- 1 file changed, 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb9e7d5a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index dd4fd53..58ea0cc 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -21,9 +21,6 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import java.util.Arrays import akka.actor._ -import org.apache.spark.SparkConf -import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} -import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester} import org.scalatest.concurrent.Eventually._
git commit: SPARK-983. Support external sorting in sortByKey()
Repository: spark Updated Branches: refs/heads/master 8ff4417f7 - 72e336997 SPARK-983. Support external sorting in sortByKey() This patch simply uses the ExternalSorter class from sort-based shuffle. Closes #931 and Closes #1090 Author: Matei Zaharia ma...@databricks.com Closes #1677 from mateiz/spark-983 and squashes the following commits: 96b3fda [Matei Zaharia] SPARK-983. Support external sorting in sortByKey() Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72e33699 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72e33699 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72e33699 Branch: refs/heads/master Commit: 72e33699732496fa71e8c8b0de2203b908423fb2 Parents: 8ff4417 Author: Matei Zaharia ma...@databricks.com Authored: Fri Aug 1 00:16:18 2014 -0700 Committer: Matei Zaharia ma...@databricks.com Committed: Fri Aug 1 00:16:18 2014 -0700 -- .../spark/shuffle/hash/HashShuffleReader.scala | 22 +--- .../util/collection/ExternalSorterSuite.scala | 10 + 2 files changed, 20 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72e33699/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index e32ad9c..7c9dc8e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -20,6 +20,7 @@ package org.apache.spark.shuffle.hash import org.apache.spark.{InterruptibleIterator, TaskContext} import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader} +import org.apache.spark.util.collection.ExternalSorter private[spark] class HashShuffleReader[K, C]( handle: BaseShuffleHandle[K, _, C], @@ -35,8 +36,8 @@ private[spark] class HashShuffleReader[K, C]( /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { -val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, - Serializer.getSerializer(dep.serializer)) +val ser = Serializer.getSerializer(dep.serializer) +val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser) val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { @@ -54,16 +55,13 @@ private[spark] class HashShuffleReader[K, C]( // Sort the output if there is a sort ordering defined. dep.keyOrdering match { case Some(keyOrd: Ordering[K]) = -// Define a Comparator for the whole record based on the key Ordering. -val cmp = new Ordering[Product2[K, C]] { - override def compare(o1: Product2[K, C], o2: Product2[K, C]): Int = { -keyOrd.compare(o1._1, o2._1) - } -} -val sortBuffer: Array[Product2[K, C]] = aggregatedIter.toArray -// TODO: do external sort. -scala.util.Sorting.quickSort(sortBuffer)(cmp) -sortBuffer.iterator +// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, +// the ExternalSorter won't spill to disk. +val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) +sorter.write(aggregatedIter) +context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled +context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled +sorter.iterator case None = aggregatedIter } http://git-wip-us.apache.org/repos/asf/spark/blob/72e33699/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index ddb5df4..65a71e5 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -190,6 +190,11 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { fail(sValue 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}) } } + +// sortByKey - should spill ~17 times +val rddE = sc.parallelize(0 until 10).map(i = (i/4, i)) +val resultE = rddE.sortByKey().collect().toSeq +assert(resultE === (0 until 10).map(i = (i/4,
git commit: SPARK-2134: Report metrics before application finishes
Repository: spark Updated Branches: refs/heads/master 72e336997 - f1957e116 SPARK-2134: Report metrics before application finishes Author: Rahul Singhal rahul.sing...@guavus.com Closes #1076 from rahulsinghaliitd/SPARK-2134 and squashes the following commits: 15f18b6 [Rahul Singhal] SPARK-2134: Report metrics before application finishes Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1957e11 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1957e11 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1957e11 Branch: refs/heads/master Commit: f1957e11652a537efd40771f843591a4c9341014 Parents: 72e3369 Author: Rahul Singhal rahul.sing...@guavus.com Authored: Fri Aug 1 00:33:15 2014 -0700 Committer: Matei Zaharia ma...@databricks.com Committed: Fri Aug 1 00:33:15 2014 -0700 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 1 + core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 ++ core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 1 + .../org/apache/spark/executor/CoarseGrainedExecutorBackend.scala | 1 + core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala | 4 .../main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala | 4 core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala | 4 .../main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala | 4 core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala | 2 ++ .../scala/org/apache/spark/metrics/sink/MetricsServlet.scala | 2 ++ core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala | 1 + .../main/scala/org/apache/spark/metrics/sink/GangliaSink.scala | 4 13 files changed, 34 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b25f081..f5a0549 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -990,6 +990,7 @@ class SparkContext(config: SparkConf) extends Logging { val dagSchedulerCopy = dagScheduler dagScheduler = null if (dagSchedulerCopy != null) { + env.metricsSystem.report() metadataCleaner.cancel() cleaner.foreach(_.stop()) dagSchedulerCopy.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 21f8667..a70ecdb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -154,6 +154,8 @@ private[spark] class Master( } override def postStop() { +masterMetricsSystem.report() +applicationMetricsSystem.report() // prevent the CompleteRecovery message sending to restarted master if (recoveryCompletionTask != null) { recoveryCompletionTask.cancel() http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ce42544..fb5252d 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -357,6 +357,7 @@ private[spark] class Worker( } override def postStop() { +metricsSystem.report() registrationRetryTimer.foreach(_.cancel()) executors.values.foreach(_.kill()) drivers.values.foreach(_.kill()) http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 860b47e..af736de 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -88,6 +88,7 @@ private[spark] class
git commit: [Spark 2557] fix LOCAL_N_REGEX in createTaskScheduler and make local-n and local-n-failures consistent
Repository: spark Updated Branches: refs/heads/master f1957e116 - 284771efb [Spark 2557] fix LOCAL_N_REGEX in createTaskScheduler and make local-n and local-n-failures consistent [SPARK-2557](https://issues.apache.org/jira/browse/SPARK-2557) Author: Ye Xianjin advance...@gmail.com Closes #1464 from advancedxy/SPARK-2557 and squashes the following commits: d844d67 [Ye Xianjin] add local-*-n-failures, bad-local-n, bad-local-n-failures test case 3bbc668 [Ye Xianjin] fix LOCAL_N_REGEX regular expression and make local_n_failures accept * as all cores on the computer Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/284771ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/284771ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/284771ef Branch: refs/heads/master Commit: 284771efbef2d6b22212afd49dd62732a2cf52a8 Parents: f1957e1 Author: Ye Xianjin advance...@gmail.com Authored: Fri Aug 1 00:34:39 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Fri Aug 1 00:34:39 2014 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 10 ++--- .../SparkContextSchedulerCreationSuite.scala| 23 2 files changed, 30 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/284771ef/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f5a0549..0e51356 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1452,9 +1452,9 @@ object SparkContext extends Logging { /** Creates a task scheduler based on a given master URL. Extracted for testing. */ private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = { // Regular expression used for local[N] and local[*] master formats -val LOCAL_N_REGEX = local\[([0-9\*]+)\].r +val LOCAL_N_REGEX = local\[([0-9]+|\*)\].r // Regular expression for local[N, maxRetries], used in tests with failing tasks -val LOCAL_N_FAILURES_REGEX = local\[([0-9]+)\s*,\s*([0-9]+)\].r +val LOCAL_N_FAILURES_REGEX = local\[([0-9]+|\*)\s*,\s*([0-9]+)\].r // Regular expression for simulating a Spark cluster of [N, cores, memory] locally val LOCAL_CLUSTER_REGEX = local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*].r // Regular expression for connecting to Spark deploy clusters @@ -1484,8 +1484,12 @@ object SparkContext extends Logging { scheduler case LOCAL_N_FAILURES_REGEX(threads, maxFailures) = +def localCpuCount = Runtime.getRuntime.availableProcessors() +// local[*, M] means the number of cores on the computer with M failures +// local[N, M] means exactly N threads with M failures +val threadCount = if (threads == *) localCpuCount else threads.toInt val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) -val backend = new LocalBackend(scheduler, threads.toInt) +val backend = new LocalBackend(scheduler, threadCount) scheduler.initialize(backend) scheduler http://git-wip-us.apache.org/repos/asf/spark/blob/284771ef/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 67e3be2..4b727e5 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -68,6 +68,15 @@ class SparkContextSchedulerCreationSuite } } + test(local-*-n-failures) { +val sched = createTaskScheduler(local[* ,2]) +assert(sched.maxTaskFailures === 2) +sched.backend match { + case s: LocalBackend = assert(s.totalCores === Runtime.getRuntime.availableProcessors()) + case _ = fail() +} + } + test(local-n-failures) { val sched = createTaskScheduler(local[4, 2]) assert(sched.maxTaskFailures === 2) @@ -77,6 +86,20 @@ class SparkContextSchedulerCreationSuite } } + test(bad-local-n) { +val e = intercept[SparkException] { + createTaskScheduler(local[2*]) +} +assert(e.getMessage.contains(Could not parse Master URL)) + } + + test(bad-local-n-failures) { +val e = intercept[SparkException] { + createTaskScheduler(local[2*,4]) +} +assert(e.getMessage.contains(Could not parse Master URL)) + }
git commit: [SPARK-2103][Streaming] Change to ClassTag for KafkaInputDStream and fix reflection issue
Repository: spark Updated Branches: refs/heads/master 284771efb - a32f0fb73 [SPARK-2103][Streaming] Change to ClassTag for KafkaInputDStream and fix reflection issue This PR updates previous Manifest for KafkaInputDStream's Decoder to ClassTag, also fix the problem addressed in [SPARK-2103](https://issues.apache.org/jira/browse/SPARK-2103). Previous Java interface cannot actually get the type of Decoder, so when using this Manifest to reconstruct the decode object will meet reflection exception. Also for other two Java interfaces, ClassTag[String] is useless because calling Scala API will get the right implicit ClassTag. Current Kafka unit test cannot actually verify the interface. I've tested these interfaces in my local and distribute settings. Author: jerryshao saisai.s...@intel.com Closes #1508 from jerryshao/SPARK-2103 and squashes the following commits: e90c37b [jerryshao] Add Mima excludes 7529810 [jerryshao] Change Manifest to ClassTag for KafkaInputDStream's Decoder and fix Decoder construct issue when using Java API Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a32f0fb7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a32f0fb7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a32f0fb7 Branch: refs/heads/master Commit: a32f0fb73a739c56208cafcd9f08618fb6dd8859 Parents: 284771e Author: jerryshao saisai.s...@intel.com Authored: Fri Aug 1 04:32:46 2014 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Fri Aug 1 04:32:46 2014 -0700 -- .../spark/streaming/kafka/KafkaInputDStream.scala | 14 +++--- .../apache/spark/streaming/kafka/KafkaUtils.scala | 16 +--- project/MimaExcludes.scala | 7 ++- 3 files changed, 18 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a32f0fb7/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 38095e8..e20e2c8 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.kafka import scala.collection.Map -import scala.reflect.ClassTag +import scala.reflect.{classTag, ClassTag} import java.util.Properties import java.util.concurrent.Executors @@ -48,8 +48,8 @@ private[streaming] class KafkaInputDStream[ K: ClassTag, V: ClassTag, - U : Decoder[_]: Manifest, - T : Decoder[_]: Manifest]( + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag]( @transient ssc_ : StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], @@ -66,8 +66,8 @@ private[streaming] class KafkaReceiver[ K: ClassTag, V: ClassTag, - U : Decoder[_]: Manifest, - T : Decoder[_]: Manifest]( + U : Decoder[_]: ClassTag, + T : Decoder[_]: ClassTag]( kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel @@ -103,10 +103,10 @@ class KafkaReceiver[ tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(group.id)) } -val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) +val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(consumerConfig.props) .asInstanceOf[Decoder[K]] -val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) +val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(consumerConfig.props) .asInstanceOf[Decoder[V]] http://git-wip-us.apache.org/repos/asf/spark/blob/a32f0fb7/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 86bb91f..48668f7 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -65,7 +65,7 @@ object KafkaUtils { *in its own thread. * @param storageLevel Storage level to use for storing the received objects */ - def createStream[K: ClassTag, V: ClassTag, U : Decoder[_]: Manifest, T :
[1/2] [SPARK-2179][SQL] A minor refactoring Java data type APIs (2179 follow-up).
Repository: spark Updated Branches: refs/heads/master 8d338f64c - c41fdf04f http://git-wip-us.apache.org/repos/asf/spark/blob/c41fdf04/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala index d1aa3c8..77353f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.types.util import org.apache.spark.sql._ -import org.apache.spark.sql.api.java.types.{DataType = JDataType, StructField = JStructField} +import org.apache.spark.sql.api.java.{DataType = JDataType, StructField = JStructField} import scala.collection.JavaConverters._ @@ -74,37 +74,37 @@ protected[sql] object DataTypeConversions { * Returns the equivalent DataType in Scala for the given DataType in Java. */ def asScalaDataType(javaDataType: JDataType): DataType = javaDataType match { -case stringType: org.apache.spark.sql.api.java.types.StringType = +case stringType: org.apache.spark.sql.api.java.StringType = StringType -case binaryType: org.apache.spark.sql.api.java.types.BinaryType = +case binaryType: org.apache.spark.sql.api.java.BinaryType = BinaryType -case booleanType: org.apache.spark.sql.api.java.types.BooleanType = +case booleanType: org.apache.spark.sql.api.java.BooleanType = BooleanType -case timestampType: org.apache.spark.sql.api.java.types.TimestampType = +case timestampType: org.apache.spark.sql.api.java.TimestampType = TimestampType -case decimalType: org.apache.spark.sql.api.java.types.DecimalType = +case decimalType: org.apache.spark.sql.api.java.DecimalType = DecimalType -case doubleType: org.apache.spark.sql.api.java.types.DoubleType = +case doubleType: org.apache.spark.sql.api.java.DoubleType = DoubleType -case floatType: org.apache.spark.sql.api.java.types.FloatType = +case floatType: org.apache.spark.sql.api.java.FloatType = FloatType -case byteType: org.apache.spark.sql.api.java.types.ByteType = +case byteType: org.apache.spark.sql.api.java.ByteType = ByteType -case integerType: org.apache.spark.sql.api.java.types.IntegerType = +case integerType: org.apache.spark.sql.api.java.IntegerType = IntegerType -case longType: org.apache.spark.sql.api.java.types.LongType = +case longType: org.apache.spark.sql.api.java.LongType = LongType -case shortType: org.apache.spark.sql.api.java.types.ShortType = +case shortType: org.apache.spark.sql.api.java.ShortType = ShortType -case arrayType: org.apache.spark.sql.api.java.types.ArrayType = +case arrayType: org.apache.spark.sql.api.java.ArrayType = ArrayType(asScalaDataType(arrayType.getElementType), arrayType.isContainsNull) -case mapType: org.apache.spark.sql.api.java.types.MapType = +case mapType: org.apache.spark.sql.api.java.MapType = MapType( asScalaDataType(mapType.getKeyType), asScalaDataType(mapType.getValueType), mapType.isValueContainsNull) -case structType: org.apache.spark.sql.api.java.types.StructType = +case structType: org.apache.spark.sql.api.java.StructType = StructType(structType.getFields.map(asScalaStructField)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/c41fdf04/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java -- diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java index 8ee4591..3c92906 100644 --- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java +++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java @@ -28,9 +28,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.apache.spark.sql.api.java.types.DataType; -import org.apache.spark.sql.api.java.types.StructField; -import org.apache.spark.sql.api.java.types.StructType; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; http://git-wip-us.apache.org/repos/asf/spark/blob/c41fdf04/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaSideDataTypeConversionSuite.java -- diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaSideDataTypeConversionSuite.java
git commit: [SQL][SPARK-2212]Hash Outer Join
Repository: spark Updated Branches: refs/heads/master c41fdf04f - 4415722e9 [SQL][SPARK-2212]Hash Outer Join This patch is to support the hash based outer join. Currently, outer join for big relations are resort to `BoradcastNestedLoopJoin`, which is super slow. This PR will create 2 hash tables for both relations in the same partition, which greatly reduce the table scans. Here is the testing code that I used: ``` package org.apache.spark.sql.hive import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql._ case class Record(key: String, value: String) object JoinTablePrepare extends App { import TestHive2._ val rdd = sparkContext.parallelize((1 to 300).map(i = Record(s${i % 828193}, sval_$i))) runSqlHive(SHOW TABLES) runSqlHive(DROP TABLE if exists a) runSqlHive(DROP TABLE if exists b) runSqlHive(DROP TABLE if exists result) rdd.registerAsTable(records) runSqlHive(CREATE TABLE a (key STRING, value STRING) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' | STORED AS RCFILE .stripMargin) runSqlHive(CREATE TABLE b (key STRING, value STRING) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' | STORED AS RCFILE .stripMargin) runSqlHive(CREATE TABLE result (key STRING, value STRING) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' | STORED AS RCFILE .stripMargin) hql(sfrom records | insert into table a | select key, value .stripMargin) hql(sfrom records | insert into table b select key + 10, value .stripMargin) } object JoinTablePerformanceTest extends App { import TestHive2._ hql(SHOW TABLES) hql(set spark.sql.shuffle.partitions=20) val leftOuterJoin = insert overwrite table result select a.key, b.value from a left outer join b on a.key=b.key val rightOuterJoin = insert overwrite table result select a.key, b.value from a right outer join b on a.key=b.key val fullOuterJoin = insert overwrite table result select a.key, b.value from a full outer join b on a.key=b.key val results = (LeftOuterJoin, benchmark(leftOuterJoin)) :: (LeftOuterJoin, benchmark(leftOuterJoin)) :: (RightOuterJoin, benchmark(rightOuterJoin)) :: (RightOuterJoin, benchmark(rightOuterJoin)) :: (FullOuterJoin, benchmark(fullOuterJoin)) :: (FullOuterJoin, benchmark(fullOuterJoin)) :: Nil val explains = hql(sexplain $leftOuterJoin).collect ++ hql(sexplain $rightOuterJoin).collect ++ hql(sexplain $fullOuterJoin).collect println(explains.mkString(,\n)) results.foreach { case (prompt, result) = { println(s$prompt: took ${result._1} ms (${result._2} records)) } } def benchmark(cmd: String) = { val begin = System.currentTimeMillis() val result = hql(cmd) val end = System.currentTimeMillis() val count = hql(select count(1) from result).collect.mkString() ((end - begin), count) } } ``` And the result as shown below: ``` [Physical execution plan:], [InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true], [ Project [key#95,value#98]], [ HashOuterJoin [key#95], [key#97], LeftOuter, None], [ Exchange (HashPartitioning [key#95], 20)], [HiveTableScan [key#95], (MetastoreRelation default, a, None), None], [ Exchange (HashPartitioning [key#97], 20)], [HiveTableScan [key#97,value#98], (MetastoreRelation default, b, None), None], [Physical execution plan:], [InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true], [ Project [key#102,value#105]], [ HashOuterJoin [key#102], [key#104], RightOuter, None], [ Exchange (HashPartitioning [key#102], 20)], [HiveTableScan [key#102], (MetastoreRelation default, a, None), None], [ Exchange (HashPartitioning [key#104], 20)], [HiveTableScan [key#104,value#105], (MetastoreRelation default, b, None), None], [Physical execution plan:], [InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true], [ Project [key#109,value#112]], [ HashOuterJoin [key#109], [key#111], FullOuter, None], [ Exchange (HashPartitioning [key#109], 20)], [HiveTableScan [key#109], (MetastoreRelation default, a, None), None], [ Exchange (HashPartitioning [key#111], 20)], [HiveTableScan [key#111,value#112], (MetastoreRelation default, b, None), None] LeftOuterJoin: took 16072 ms ([300] records) LeftOuterJoin: took 14394 ms ([300] records) RightOuterJoin: took 14802 ms ([300] records) RightOuterJoin: took 14747 ms ([300] records) FullOuterJoin: took 17715 ms ([600] records) FullOuterJoin: took 17629 ms ([600] records) ``` Without this PR, the benchmark will run seems never end. Author:
git commit: [SPARK-2729] [SQL] Forgot to match Timestamp type in ColumnBuilder
Repository: spark Updated Branches: refs/heads/master 4415722e9 - 580c7011c [SPARK-2729] [SQL] Forgot to match Timestamp type in ColumnBuilder just a match forgot, found after SPARK-2710 , TimestampType can be used by a SchemaRDD generated from JDBC ResultSet Author: chutium teng@gmail.com Closes #1636 from chutium/SPARK-2729 and squashes the following commits: 71af77a [chutium] [SPARK-2729] [SQL] added Timestamp in NullableColumnAccessorSuite 39cf9f8 [chutium] [SPARK-2729] add Timestamp Type into ColumnBuilder TestSuite, ref. #1636 ab6ff97 [chutium] [SPARK-2729] Forgot to match Timestamp type in ColumnBuilder Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/580c7011 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/580c7011 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/580c7011 Branch: refs/heads/master Commit: 580c7011cab6bc93294b6486e778557216bedb10 Parents: 4415722 Author: chutium teng@gmail.com Authored: Fri Aug 1 11:31:44 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Aug 1 11:31:44 2014 -0700 -- .../main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala | 1 + .../apache/spark/sql/columnar/NullableColumnAccessorSuite.scala| 2 +- .../org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/580c7011/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index 74f5630..c416a74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -154,6 +154,7 @@ private[sql] object ColumnBuilder { case STRING.typeId = new StringColumnBuilder case BINARY.typeId = new BinaryColumnBuilder case GENERIC.typeId = new GenericColumnBuilder + case TIMESTAMP.typeId = new TimestampColumnBuilder }).asInstanceOf[ColumnBuilder] builder.initialize(initialSize, columnName, useCompression) http://git-wip-us.apache.org/repos/asf/spark/blob/580c7011/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala index 35ab14c..3baa6f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala @@ -41,7 +41,7 @@ object TestNullableColumnAccessor { class NullableColumnAccessorSuite extends FunSuite { import ColumnarTestUtils._ - Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC).foreach { + Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC, TIMESTAMP).foreach { testNullableColumnAccessor(_) } http://git-wip-us.apache.org/repos/asf/spark/blob/580c7011/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala index d889852..dc813fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala @@ -37,7 +37,7 @@ object TestNullableColumnBuilder { class NullableColumnBuilderSuite extends FunSuite { import ColumnarTestUtils._ - Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC).foreach { + Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC, TIMESTAMP).foreach { testNullableColumnBuilder(_) }
git commit: SPARK-1612: Fix potential resource leaks
Repository: spark Updated Branches: refs/heads/master baf9ce1a4 - f5d9bea20 SPARK-1612: Fix potential resource leaks JIRA: https://issues.apache.org/jira/browse/SPARK-1612 Move the close statements into a finally block. Author: zsxwing zsxw...@gmail.com Closes #535 from zsxwing/SPARK-1612 and squashes the following commits: ae52f50 [zsxwing] Update to follow the code style 549ba13 [zsxwing] SPARK-1612: Fix potential resource leaks Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5d9bea2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5d9bea2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5d9bea2 Branch: refs/heads/master Commit: f5d9bea20e0db22c09c1191ca44a6471de765739 Parents: baf9ce1 Author: zsxwing zsxw...@gmail.com Authored: Fri Aug 1 13:25:04 2014 -0700 Committer: Matei Zaharia ma...@databricks.com Committed: Fri Aug 1 13:25:04 2014 -0700 -- .../scala/org/apache/spark/util/Utils.scala | 35 1 file changed, 22 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f5d9bea2/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f8fbb3a..30073a8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -286,17 +286,23 @@ private[spark] object Utils extends Logging { out: OutputStream, closeStreams: Boolean = false) { -val buf = new Array[Byte](8192) -var n = 0 -while (n != -1) { - n = in.read(buf) - if (n != -1) { -out.write(buf, 0, n) +try { + val buf = new Array[Byte](8192) + var n = 0 + while (n != -1) { +n = in.read(buf) +if (n != -1) { + out.write(buf, 0, n) +} + } +} finally { + if (closeStreams) { +try { + in.close() +} finally { + out.close() +} } -} -if (closeStreams) { - in.close() - out.close() } } @@ -868,9 +874,12 @@ private[spark] object Utils extends Logging { val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt) val stream = new FileInputStream(file) -stream.skip(effectiveStart) -stream.read(buff) -stream.close() +try { + stream.skip(effectiveStart) + stream.read(buff) +} finally { + stream.close() +} Source.fromBytes(buff).mkString }
git commit: [SPARK-2379] Fix the bug that streaming's receiver may fall into a dead loop
Repository: spark Updated Branches: refs/heads/master f5d9bea20 - b270309d7 [SPARK-2379] Fix the bug that streaming's receiver may fall into a dead loop Author: joyyoj suns...@gmail.com Closes #1694 from joyyoj/SPARK-2379 and squashes the following commits: d73790d [joyyoj] SPARK-2379 Fix the bug that streaming's receiver may fall into a dead loop 22e7821 [joyyoj] Merge remote-tracking branch 'apache/master' 3f4a602 [joyyoj] Merge remote-tracking branch 'remotes/apache/master' f4660c5 [joyyoj] [SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes are not read properly Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b270309d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b270309d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b270309d Branch: refs/heads/master Commit: b270309d7608fb749e402cd5afd36087446be398 Parents: f5d9bea Author: joyyoj suns...@gmail.com Authored: Fri Aug 1 13:41:55 2014 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Fri Aug 1 13:41:55 2014 -0700 -- .../org/apache/spark/streaming/receiver/ReceiverSupervisor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b270309d/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 09be3a5..1f0244c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -138,7 +138,7 @@ private[streaming] abstract class ReceiverSupervisor( onReceiverStop(message, error) } catch { case t: Throwable = -stop(Error stopping receiver + streamId, Some(t)) +logError(Error stopping receiver + streamId + t.getStackTraceString) } }
git commit: [SPARK-2379] Fix the bug that streaming's receiver may fall into a dead loop
Repository: spark Updated Branches: refs/heads/branch-1.0 886508d3b - 952e0d698 [SPARK-2379] Fix the bug that streaming's receiver may fall into a dead loop Author: joyyoj suns...@gmail.com Closes #1694 from joyyoj/SPARK-2379 and squashes the following commits: d73790d [joyyoj] SPARK-2379 Fix the bug that streaming's receiver may fall into a dead loop 22e7821 [joyyoj] Merge remote-tracking branch 'apache/master' 3f4a602 [joyyoj] Merge remote-tracking branch 'remotes/apache/master' f4660c5 [joyyoj] [SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes are not read properly (cherry picked from commit b270309d7608fb749e402cd5afd36087446be398) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/952e0d69 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/952e0d69 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/952e0d69 Branch: refs/heads/branch-1.0 Commit: 952e0d69841b6218e7d1b8b23e7d74a4fcb1b381 Parents: 886508d Author: joyyoj suns...@gmail.com Authored: Fri Aug 1 13:41:55 2014 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Fri Aug 1 13:42:16 2014 -0700 -- .../org/apache/spark/streaming/receiver/ReceiverSupervisor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/952e0d69/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 09be3a5..1f0244c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -138,7 +138,7 @@ private[streaming] abstract class ReceiverSupervisor( onReceiverStop(message, error) } catch { case t: Throwable = -stop(Error stopping receiver + streamId, Some(t)) +logError(Error stopping receiver + streamId + t.getStackTraceString) } }
git commit: SPARK-2791: Fix committing, reverting and state tracking in shuffle file consolidation
Repository: spark Updated Branches: refs/heads/master b270309d7 - 78f2af582 SPARK-2791: Fix committing, reverting and state tracking in shuffle file consolidation All changes from this PR are by mridulm and are drawn from his work in #1609. This patch is intended to fix all major issues related to shuffle file consolidation that mridulm found, while minimizing changes to the code, with the hope that it may be more easily merged into 1.1. This patch is **not** intended as a replacement for #1609, which provides many additional benefits, including fixes to ExternalAppendOnlyMap, improvements to DiskBlockObjectWriter's API, and several new unit tests. If it is feasible to merge #1609 for the 1.1 deadline, that is a preferable option. Author: Aaron Davidson aa...@databricks.com Closes #1678 from aarondav/consol and squashes the following commits: 53b3f6d [Aaron Davidson] Correct behavior when writing unopened file 701d045 [Aaron Davidson] Rebase with sort-based shuffle 9160149 [Aaron Davidson] SPARK-2532: Minimal shuffle consolidation fixes Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78f2af58 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78f2af58 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78f2af58 Branch: refs/heads/master Commit: 78f2af582286b81e6dc9fa9d455ed2b369d933bd Parents: b270309 Author: Aaron Davidson aa...@databricks.com Authored: Fri Aug 1 13:57:19 2014 -0700 Committer: Matei Zaharia ma...@databricks.com Committed: Fri Aug 1 13:57:19 2014 -0700 -- .../spark/shuffle/hash/HashShuffleWriter.scala | 14 ++-- .../spark/shuffle/sort/SortShuffleWriter.scala | 3 +- .../spark/storage/BlockObjectWriter.scala | 53 +++- .../spark/storage/ShuffleBlockManager.scala | 28 --- .../util/collection/ExternalAppendOnlyMap.scala | 2 +- .../spark/util/collection/ExternalSorter.scala | 6 +- .../spark/storage/DiskBlockManagerSuite.scala | 87 +++- .../apache/spark/tools/StoragePerfTester.scala | 5 +- 8 files changed, 146 insertions(+), 52 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 1923f7c..45d3b8b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -65,7 +65,8 @@ private[spark] class HashShuffleWriter[K, V]( } /** Close this writer, passing along whether the map completed */ - override def stop(success: Boolean): Option[MapStatus] = { + override def stop(initiallySuccess: Boolean): Option[MapStatus] = { +var success = initiallySuccess try { if (stopping) { return None @@ -73,15 +74,16 @@ private[spark] class HashShuffleWriter[K, V]( stopping = true if (success) { try { - return Some(commitWritesAndBuildStatus()) + Some(commitWritesAndBuildStatus()) } catch { case e: Exception = +success = false revertWrites() throw e } } else { revertWrites() -return None +None } } finally { // Release the writers back to the shuffle block manager. @@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V]( var totalBytes = 0L var totalTime = 0L val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter = - writer.commit() - writer.close() + writer.commitAndClose() val size = writer.fileSegment().length totalBytes += size totalTime += writer.timeWriting() @@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V]( private def revertWrites(): Unit = { if (shuffle != null shuffle.writers != null) { for (writer - shuffle.writers) { -writer.revertPartialWrites() -writer.close() +writer.revertPartialWritesAndClose() } } } http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 42fcd07..9a356d0 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++
git commit: [SPARK-2786][mllib] Python correlations
Repository: spark Updated Branches: refs/heads/master 78f2af582 - d88e69561 [SPARK-2786][mllib] Python correlations Author: Doris Xin doris.s@gmail.com Closes #1713 from dorx/pythonCorrelation and squashes the following commits: 5f1e60c [Doris Xin] reviewer comments. 46ff6eb [Doris Xin] reviewer comments. ad44085 [Doris Xin] style fix e69d446 [Doris Xin] fixed missed conflicts. eb5bf56 [Doris Xin] merge master cc9f725 [Doris Xin] units passed. 9141a63 [Doris Xin] WIP2 d199f1f [Doris Xin] Moved correlation names into a public object cd163d6 [Doris Xin] WIP Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d88e6956 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d88e6956 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d88e6956 Branch: refs/heads/master Commit: d88e69561367d65e1a2b94527b80a1f65a2cba90 Parents: 78f2af5 Author: Doris Xin doris.s@gmail.com Authored: Fri Aug 1 15:02:17 2014 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Fri Aug 1 15:02:17 2014 -0700 -- .../spark/mllib/api/python/PythonMLLibAPI.scala | 39 ++- .../apache/spark/mllib/stat/Statistics.scala| 10 +- .../mllib/stat/correlation/Correlation.scala| 49 + .../mllib/api/python/PythonMLLibAPISuite.scala | 21 +++- python/pyspark/mllib/_common.py | 6 +- python/pyspark/mllib/stat.py| 104 +++ 6 files changed, 199 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d88e6956/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index d2e8ccf..122925d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -20,13 +20,15 @@ package org.apache.spark.mllib.api.python import java.nio.{ByteBuffer, ByteOrder} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} +import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ -import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} +import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors} import org.apache.spark.mllib.random.{RandomRDDGenerators = RG} import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.stat.Statistics +import org.apache.spark.mllib.stat.correlation.CorrelationNames import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -227,7 +229,7 @@ class PythonMLLibAPI extends Serializable { jsc: JavaSparkContext, path: String, minPartitions: Int): JavaRDD[Array[Byte]] = -MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint).toJavaRDD() +MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint) private def trainRegressionModel( trainFunc: (RDD[LabeledPoint], Vector) = GeneralizedLinearModel, @@ -456,6 +458,37 @@ class PythonMLLibAPI extends Serializable { ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) } + /** + * Java stub for mllib Statistics.corr(X: RDD[Vector], method: String). + * Returns the correlation matrix serialized into a byte array understood by deserializers in + * pyspark. + */ + def corr(X: JavaRDD[Array[Byte]], method: String): Array[Byte] = { +val inputMatrix = X.rdd.map(deserializeDoubleVector(_)) +val result = Statistics.corr(inputMatrix, getCorrNameOrDefault(method)) +serializeDoubleMatrix(to2dArray(result)) + } + + /** + * Java stub for mllib Statistics.corr(x: RDD[Double], y: RDD[Double], method: String). + */ + def corr(x: JavaRDD[Array[Byte]], y: JavaRDD[Array[Byte]], method: String): Double = { +val xDeser = x.rdd.map(deserializeDouble(_)) +val yDeser = y.rdd.map(deserializeDouble(_)) +Statistics.corr(xDeser, yDeser, getCorrNameOrDefault(method)) + } + + // used by the corr methods to retrieve the name of the correlation method passed in via pyspark + private def getCorrNameOrDefault(method: String) = { +if (method == null) CorrelationNames.defaultCorrName else method + } + + // Reformat a Matrix into Array[Array[Double]] for serialization + private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = { +val values = matrix.toArray +
git commit: [SPARK-2796] [mllib] DecisionTree bug fix: ordered categorical features
Repository: spark Updated Branches: refs/heads/master d88e69561 - 7058a5393 [SPARK-2796] [mllib] DecisionTree bug fix: ordered categorical features Bug: In DecisionTree, the method sequentialBinSearchForOrderedCategoricalFeatureInClassification() indexed bins from 0 to (math.pow(2, featureCategories.toInt - 1) - 1). This upper bound is the bound for unordered categorical features, not ordered ones. The upper bound should be the arity (i.e., max value) of the feature. Added new test to DecisionTreeSuite to catch this: regression stump with categorical variables of arity 2 Bug fix: Modified upper bound discussed above. Also: Small improvements to coding style in DecisionTree. CC mengxr manishamde Author: Joseph K. Bradley joseph.kurata.brad...@gmail.com Closes #1720 from jkbradley/decisiontree-bugfix2 and squashes the following commits: 225822f [Joseph K. Bradley] Bug: In DecisionTree, the method sequentialBinSearchForOrderedCategoricalFeatureInClassification() indexed bins from 0 to (math.pow(2, featureCategories.toInt - 1) - 1). This upper bound is the bound for unordered categorical features, not ordered ones. The upper bound should be the arity (i.e., max value) of the feature. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7058a539 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7058a539 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7058a539 Branch: refs/heads/master Commit: 7058a5393bccc2f917189fa9b4cf7f314410b0de Parents: d88e695 Author: Joseph K. Bradley joseph.kurata.brad...@gmail.com Authored: Fri Aug 1 15:52:21 2014 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Fri Aug 1 15:52:21 2014 -0700 -- .../apache/spark/mllib/tree/DecisionTree.scala | 45 .../spark/mllib/tree/DecisionTreeSuite.scala| 29 + 2 files changed, 56 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7058a539/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala index 7d123dd..382e76a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala @@ -498,7 +498,7 @@ object DecisionTree extends Serializable with Logging { val bin = binForFeatures(mid) val lowThreshold = bin.lowSplit.threshold val highThreshold = bin.highSplit.threshold - if ((lowThreshold feature) (highThreshold = feature)){ + if ((lowThreshold feature) (highThreshold = feature)) { return mid } else if (lowThreshold = feature) { @@ -522,28 +522,36 @@ object DecisionTree extends Serializable with Logging { } /** - * Sequential search helper method to find bin for categorical feature. + * Sequential search helper method to find bin for categorical feature + * (for classification and regression). */ - def sequentialBinSearchForOrderedCategoricalFeatureInClassification(): Int = { + def sequentialBinSearchForOrderedCategoricalFeature(): Int = { val featureCategories = strategy.categoricalFeaturesInfo(featureIndex) -val numCategoricalBins = math.pow(2.0, featureCategories - 1).toInt - 1 +val featureValue = labeledPoint.features(featureIndex) var binIndex = 0 -while (binIndex numCategoricalBins) { +while (binIndex featureCategories) { val bin = bins(featureIndex)(binIndex) val categories = bin.highSplit.categories - val features = labeledPoint.features - if (categories.contains(features(featureIndex))) { + if (categories.contains(featureValue)) { return binIndex } binIndex += 1 } +if (featureValue 0 || featureValue = featureCategories) { + throw new IllegalArgumentException( +sDecisionTree given invalid data: + +s Feature $featureIndex is categorical with values in + +s {0,...,${featureCategories - 1}, + +s but a data point gives it value $featureValue.\n + + Bad data point: + labeledPoint.toString) +} -1 } if (isFeatureContinuous) { // Perform binary search for finding bin for continuous features. val binIndex = binarySearchForBins() -if (binIndex == -1){ +if (binIndex == -1) { throw new UnknownError(no bin was found for continuous variable.) } binIndex @@
git commit: [SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD
Repository: spark Updated Branches: refs/heads/master 7058a5393 - 880eabec3 [SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD Convert Row in JavaSchemaRDD into Array[Any] and unpickle them as tuple in Python, then convert them into namedtuple, so use can access fields just like attributes. This will let nested structure can be accessed as object, also it will reduce the size of serialized data and better performance. root |-- field1: integer (nullable = true) |-- field2: string (nullable = true) |-- field3: struct (nullable = true) ||-- field4: integer (nullable = true) ||-- field5: array (nullable = true) |||-- element: integer (containsNull = false) |-- field6: array (nullable = true) ||-- element: struct (containsNull = false) |||-- field7: string (nullable = true) Then we can access them by row.field3.field5[0] or row.field6[5].field7 It also will infer the schema in Python, convert Row/dict/namedtuple/objects into tuple before serialization, then call applySchema in JVM. During inferSchema(), the top level of dict in row will be StructType, but any nested dictionary will be MapType. You can use pyspark.sql.Row to convert unnamed structure into Row object, make the RDD can be inferable. Such as: ctx.inferSchema(rdd.map(lambda x: Row(a=x[0], b=x[1])) Or you could use Row to create a class just like namedtuple, for example: Person = Row(name, age) ctx.inferSchema(rdd.map(lambda x: Person(*x))) Also, you can call applySchema to apply an schema to a RDD of tuple/list and turn it into a SchemaRDD. The `schema` should be StructType, see the API docs for details. schema = StructType([StructField(name, StringType, True), StructType(age, IntegerType, True)]) ctx.applySchema(rdd, schema) PS: In order to use namedtuple to inferSchema, you should make namedtuple picklable. Author: Davies Liu davies@gmail.com Closes #1598 from davies/nested and squashes the following commits: f1d15b6 [Davies Liu] verify schema with the first few rows 8852aaf [Davies Liu] check type of schema abe9e6e [Davies Liu] address comments 61b2292 [Davies Liu] add @deprecated to pythonToJavaMap 1e5b801 [Davies Liu] improve cache of classes 51aa135 [Davies Liu] use Row to infer schema e9c0d5c [Davies Liu] remove string typed schema 353a3f2 [Davies Liu] fix code style 63de8f8 [Davies Liu] fix typo c79ca67 [Davies Liu] fix serialization of nested data 6b258b5 [Davies Liu] fix pep8 9d8447c [Davies Liu] apply schema provided by string of names f5df97f [Davies Liu] refactor, address comments 9d9af55 [Davies Liu] use arrry to applySchema and infer schema in Python 84679b3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into nested 0eaaf56 [Davies Liu] fix doc tests b3559b4 [Davies Liu] use generated Row instead of namedtuple c4ddc30 [Davies Liu] fix conflict between name of fields and variables 7f6f251 [Davies Liu] address all comments d69d397 [Davies Liu] refactor 2cc2d45 [Davies Liu] refactor 182fb46 [Davies Liu] refactor bc6e9e1 [Davies Liu] switch to new Schema API 547bf3e [Davies Liu] Merge branch 'master' into nested a435b5a [Davies Liu] add docs and code refactor 2c8debc [Davies Liu] Merge branch 'master' into nested 644665a [Davies Liu] use tuple and namedtuple for schemardd Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/880eabec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/880eabec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/880eabec Branch: refs/heads/master Commit: 880eabec37c69ce4e9594d7babfac291b0f93f50 Parents: 7058a53 Author: Davies Liu davies@gmail.com Authored: Fri Aug 1 18:47:41 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Aug 1 18:47:41 2014 -0700 -- .../org/apache/spark/api/python/PythonRDD.scala | 69 +- python/pyspark/rdd.py |8 +- python/pyspark/sql.py | 1258 +- .../scala/org/apache/spark/sql/SQLContext.scala | 87 +- .../scala/org/apache/spark/sql/SchemaRDD.scala | 18 +- 5 files changed, 996 insertions(+), 444 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/880eabec/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 94d666a..fe9a9e5 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -25,7 +25,7 @@ import java.util.{List = JList, ArrayList = JArrayList, Map = JMap, Collectio import
git commit: [SPARK-2212][SQL] Hash Outer Join (follow-up bug fix).
Repository: spark Updated Branches: refs/heads/master 880eabec3 - 3822f33f3 [SPARK-2212][SQL] Hash Outer Join (follow-up bug fix). We need to carefully set the ouputPartitioning of the HashOuterJoin Operator. Otherwise, we may not correctly handle nulls. Author: Yin Huai h...@cse.ohio-state.edu Closes #1721 from yhuai/SPARK-2212-BugFix and squashes the following commits: ed5eef7 [Yin Huai] Correctly choosing outputPartitioning for the HashOuterJoin operator. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3822f33f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3822f33f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3822f33f Branch: refs/heads/master Commit: 3822f33f3ce1428703a4796d7a119b40a6b32259 Parents: 880eabe Author: Yin Huai h...@cse.ohio-state.edu Authored: Fri Aug 1 18:52:01 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Aug 1 18:52:01 2014 -0700 -- .../org/apache/spark/sql/execution/joins.scala | 9 +- .../scala/org/apache/spark/sql/JoinSuite.scala | 99 .../scala/org/apache/spark/sql/TestData.scala | 8 ++ 3 files changed, 114 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3822f33f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 82f0a74..cc138c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -158,7 +158,12 @@ case class HashOuterJoin( left: SparkPlan, right: SparkPlan) extends BinaryNode { - override def outputPartitioning: Partitioning = left.outputPartitioning + override def outputPartitioning: Partitioning = joinType match { +case LeftOuter = left.outputPartitioning +case RightOuter = right.outputPartitioning +case FullOuter = UnknownPartitioning(left.outputPartitioning.numPartitions) +case x = throw new Exception(sHashOuterJoin should not take $x as the JoinType) + } override def requiredChildDistribution = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil @@ -309,7 +314,7 @@ case class HashOuterJoin( leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST), rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST)) } -case x = throw new Exception(sNeed to add implementation for $x) +case x = throw new Exception(sHashOuterJoin should not take $x as the JoinType) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/3822f33f/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 0378906..2fc8058 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -197,6 +197,31 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { (4, D, 4, d) :: (5, E, null, null) :: (6, F, null, null) :: Nil) + +// Make sure we are choosing left.outputPartitioning as the +// outputPartitioning for the outer join operator. +checkAnswer( + sql( + + |SELECT l.N, count(*) + |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a) + |GROUP BY l.N +.stripMargin), + (1, 1) :: + (2, 1) :: + (3, 1) :: + (4, 1) :: + (5, 1) :: + (6, 1) :: Nil) + +checkAnswer( + sql( + + |SELECT r.a, count(*) + |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a) + |GROUP BY r.a +.stripMargin), + (null, 6) :: Nil) } test(right outer join) { @@ -232,6 +257,31 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { (4, d, 4, D) :: (null, null, 5, E) :: (null, null, 6, F) :: Nil) + +// Make sure we are choosing right.outputPartitioning as the +// outputPartitioning for the outer join operator. +checkAnswer( + sql( + + |SELECT l.a, count(*) + |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N) + |GROUP BY l.a +.stripMargin), + (null, 6) :: Nil) + +checkAnswer( + sql( + + |SELECT r.N, count(*) + |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N) + |GROUP BY r.N +
git commit: [SPARK-2116] Load spark-defaults.conf from SPARK_CONF_DIR if set
Repository: spark Updated Branches: refs/heads/master 3822f33f3 - 0da07da53 [SPARK-2116] Load spark-defaults.conf from SPARK_CONF_DIR if set If SPARK_CONF_DIR environment variable is set, search it for spark-defaults.conf. Author: Albert Chu ch...@llnl.gov Closes #1059 from chu11/SPARK-2116 and squashes the following commits: 9f3ac94 [Albert Chu] SPARK-2116: If SPARK_CONF_DIR environment variable is set, search it for spark-defaults.conf. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0da07da5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0da07da5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0da07da5 Branch: refs/heads/master Commit: 0da07da53e5466ec44c8050020cbc4b9957cb949 Parents: 3822f33 Author: Albert Chu ch...@llnl.gov Authored: Fri Aug 1 19:00:38 2014 -0700 Committer: Matei Zaharia ma...@databricks.com Committed: Fri Aug 1 19:00:46 2014 -0700 -- .../org/apache/spark/deploy/SparkSubmitArguments.scala | 11 +++ 1 file changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0da07da5/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index dd044e6..9391f24 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -86,6 +86,17 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { private def mergeSparkProperties(): Unit = { // Use common defaults file, if not specified by user if (propertiesFile == null) { + sys.env.get(SPARK_CONF_DIR).foreach { sparkConfDir = +val sep = File.separator +val defaultPath = s${sparkConfDir}${sep}spark-defaults.conf +val file = new File(defaultPath) +if (file.exists()) { + propertiesFile = file.getAbsolutePath +} + } +} + +if (propertiesFile == null) { sys.env.get(SPARK_HOME).foreach { sparkHome = val sep = File.separator val defaultPath = s${sparkHome}${sep}conf${sep}spark-defaults.conf
git commit: [SPARK-2800]: Exclude scalastyle-output.xml Apache RAT checks
Repository: spark Updated Branches: refs/heads/master 0da07da53 - a38d3c9ef [SPARK-2800]: Exclude scalastyle-output.xml Apache RAT checks Author: GuoQiang Li wi...@qq.com Closes #1729 from witgo/SPARK-2800 and squashes the following commits: 13ca966 [GuoQiang Li] Add scalastyle-output.xml to .rat-excludes file Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a38d3c9e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a38d3c9e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a38d3c9e Branch: refs/heads/master Commit: a38d3c9efcc0386b52ac4f041920985ae7300e28 Parents: 0da07da Author: GuoQiang Li wi...@qq.com Authored: Fri Aug 1 19:35:16 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Fri Aug 1 19:35:16 2014 -0700 -- .rat-excludes | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a38d3c9e/.rat-excludes -- diff --git a/.rat-excludes b/.rat-excludes index 372bc25..bccb043 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -55,3 +55,4 @@ dist/* .*ipr .*iws logs +.*scalastyle-output.xml
git commit: [SPARK-2764] Simplify daemon.py process structure
Repository: spark Updated Branches: refs/heads/master a38d3c9ef - e8e0fd691 [SPARK-2764] Simplify daemon.py process structure Curently, daemon.py forks a pool of numProcessors subprocesses, and those processes fork themselves again to create the actual Python worker processes that handle data. I think that this extra layer of indirection is unnecessary and adds a lot of complexity. This commit attempts to remove this middle layer of subprocesses by launching the workers directly from daemon.py. See https://github.com/mesos/spark/pull/563 for the original PR that added daemon.py, where I raise some issues with the current design. Author: Josh Rosen joshro...@apache.org Closes #1680 from JoshRosen/pyspark-daemon and squashes the following commits: 5abbcb9 [Josh Rosen] Replace magic number: 4 - EINTR 5495dff [Josh Rosen] Throw IllegalStateException if worker launch fails. b79254d [Josh Rosen] Detect failed fork() calls; improve error logging. 282c2c4 [Josh Rosen] Remove daemon.py exit logging, since it caused problems: 8554536 [Josh Rosen] Fix daemonâs shutdown(); log shutdown reason. 4e0fab8 [Josh Rosen] Remove shared-memory exit_flag; don't die on worker death. e9892b4 [Josh Rosen] [WIP] [SPARK-2764] Simplify daemon.py process structure. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8e0fd69 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8e0fd69 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8e0fd69 Branch: refs/heads/master Commit: e8e0fd691a06a2887fdcffb2217b96805ace0cb0 Parents: a38d3c9 Author: Josh Rosen joshro...@apache.org Authored: Fri Aug 1 19:38:21 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Fri Aug 1 19:38:21 2014 -0700 -- .../spark/api/python/PythonWorkerFactory.scala | 10 +- python/pyspark/daemon.py| 179 --- 2 files changed, 79 insertions(+), 110 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e8e0fd69/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 759cbe2..15fe8a9 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -64,10 +64,16 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String // Attempt to connect, restart and retry once if it fails try { -new Socket(daemonHost, daemonPort) +val socket = new Socket(daemonHost, daemonPort) +val launchStatus = new DataInputStream(socket.getInputStream).readInt() +if (launchStatus != 0) { + throw new IllegalStateException(Python daemon failed to launch worker) +} +socket } catch { case exc: SocketException = - logWarning(Python daemon unexpectedly quit, attempting to restart) + logWarning(Failed to open socket to Python daemon:, exc) + logWarning(Assuming that daemon unexpectedly quit, attempting to restart) stopDaemon() startDaemon() new Socket(daemonHost, daemonPort) http://git-wip-us.apache.org/repos/asf/spark/blob/e8e0fd69/python/pyspark/daemon.py -- diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 8a5873d..9fde0dd 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -15,64 +15,39 @@ # limitations under the License. # +import numbers import os import signal +import select import socket import sys import traceback -import multiprocessing -from ctypes import c_bool from errno import EINTR, ECHILD from socket import AF_INET, SOCK_STREAM, SOMAXCONN from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN from pyspark.worker import main as worker_main from pyspark.serializers import write_int -try: -POOLSIZE = multiprocessing.cpu_count() -except NotImplementedError: -POOLSIZE = 4 - -exit_flag = multiprocessing.Value(c_bool, False) - - -def should_exit(): -global exit_flag -return exit_flag.value - def compute_real_exit_code(exit_code): # SystemExit's code can be integer or string, but os._exit only accepts integers -import numbers if isinstance(exit_code, numbers.Integral): return exit_code else: return 1 -def worker(listen_sock): +def worker(sock): + +Called by a worker process after the fork(). + # Redirect stdout to stderr os.dup2(2, 1) sys.stdout
git commit: Streaming mllib [SPARK-2438][MLLIB]
Repository: spark Updated Branches: refs/heads/master e8e0fd691 - f6a189930 Streaming mllib [SPARK-2438][MLLIB] This PR implements a streaming linear regression analysis, in which a linear regression model is trained online as new data arrive. The design is based on discussions with tdas and mengxr, in which we determined how to add this functionality in a general way, with minimal changes to existing libraries. __Summary of additions:__ _StreamingLinearAlgorithm_ - An abstract class for fitting generalized linear models online to streaming data, including training on (and updating) a model, and making predictions. _StreamingLinearRegressionWithSGD_ - Class and companion object for running streaming linear regression _StreamingLinearRegressionTestSuite_ - Unit tests _StreamingLinearRegression_ - Example use case: fitting a model online to data from one stream, and making predictions on other data __Notes__ - If this looks good, I can use the StreamingLinearAlgorithm class to easily implement other analyses that follow the same logic (Ridge, Lasso, Logistic, SVM). Author: Jeremy Freeman the.freeman@gmail.com Author: freeman the.freeman@gmail.com Closes #1361 from freeman-lab/streaming-mllib and squashes the following commits: 775ea29 [Jeremy Freeman] Throw error if user doesn't initialize weights 4086fee [Jeremy Freeman] Fixed current weight formatting 8b95b27 [Jeremy Freeman] Restored broadcasting 29f27ec [Jeremy Freeman] Formatting 8711c41 [Jeremy Freeman] Used return to avoid indentation 777b596 [Jeremy Freeman] Restored treeAggregate 74cf440 [Jeremy Freeman] Removed static methods d28cf9a [Jeremy Freeman] Added usage notes c3326e7 [Jeremy Freeman] Improved documentation 9541a41 [Jeremy Freeman] Merge remote-tracking branch 'upstream/master' into streaming-mllib 66eba5e [Jeremy Freeman] Fixed line lengths 2fe0720 [Jeremy Freeman] Minor cleanup 7d51378 [Jeremy Freeman] Moved streaming loader to MLUtils b9b69f6 [Jeremy Freeman] Added setter methods c3f8b5a [Jeremy Freeman] Modified logging 00aafdc [Jeremy Freeman] Add modifiers 14b801e [Jeremy Freeman] Name changes c7d38a3 [Jeremy Freeman] Move check for empty data to GradientDescent 4b0a5d3 [Jeremy Freeman] Cleaned up tests 74188d6 [Jeremy Freeman] Eliminate dependency on commons 50dd237 [Jeremy Freeman] Removed experimental tag 6bfe1e6 [Jeremy Freeman] Fixed imports a2a63ad [freeman] Makes convergence test more robust 86220bc [freeman] Streaming linear regression unit tests fb4683a [freeman] Minor changes for scalastyle consistency fd31e03 [freeman] Changed logging behavior 453974e [freeman] Fixed indentation c4b1143 [freeman] Streaming linear regression 604f4d7 [freeman] Expanded private class to include mllib d99aa85 [freeman] Helper methods for streaming MLlib apps 0898add [freeman] Added dependency on streaming Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6a18993 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6a18993 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6a18993 Branch: refs/heads/master Commit: f6a1899306c5ad766fea122d3ab4b83436d9f6fd Parents: e8e0fd6 Author: Jeremy Freeman the.freeman@gmail.com Authored: Fri Aug 1 20:10:26 2014 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Fri Aug 1 20:10:26 2014 -0700 -- .../mllib/StreamingLinearRegression.scala | 73 ++ mllib/pom.xml | 5 + .../mllib/optimization/GradientDescent.scala| 9 ++ .../mllib/regression/LinearRegression.scala | 4 +- .../regression/StreamingLinearAlgorithm.scala | 106 +++ .../StreamingLinearRegressionWithSGD.scala | 88 .../org/apache/spark/mllib/util/MLUtils.scala | 15 +++ .../StreamingLinearRegressionSuite.scala| 135 +++ 8 files changed, 433 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6a18993/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala new file mode 100644 index 000..1fd37ed --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not
git commit: [SPARK-2550][MLLIB][APACHE SPARK] Support regularization and intercept in pyspark's linear methods.
Repository: spark Updated Branches: refs/heads/master f6a189930 - c28118922 [SPARK-2550][MLLIB][APACHE SPARK] Support regularization and intercept in pyspark's linear methods. Related to issue: [SPARK-2550](https://issues.apache.org/jira/browse/SPARK-2550?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20priority%20%3D%20Major%20ORDER%20BY%20key%20DESC). Author: Michael Giannakopoulos miccagi...@gmail.com Closes #1624 from miccagiann/new-branch and squashes the following commits: c02e5f5 [Michael Giannakopoulos] Merge cleanly with upstream/master. 8dcb888 [Michael Giannakopoulos] Putting the if/else if statements in brackets. fed8eaa [Michael Giannakopoulos] Adding a space in the message related to the IllegalArgumentException. 44e6ff0 [Michael Giannakopoulos] Adding a blank line before python class LinearRegressionWithSGD. 8eba9c5 [Michael Giannakopoulos] Change function signatures. Exception is thrown from the scala component and not from the python one. 638be47 [Michael Giannakopoulos] Modified code to comply with code standards. ec50ee9 [Michael Giannakopoulos] Shorten the if-elif-else statement in regression.py file b962744 [Michael Giannakopoulos] Replaced the enum classes, with strings-keywords for defining the values of 'regType' parameter. 78853ec [Michael Giannakopoulos] Providing intercept and regualizer functionallity for linear methods in only one function. 3ac8874 [Michael Giannakopoulos] Added support for regularizer and intercection parameters for linear regression method. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2811892 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2811892 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2811892 Branch: refs/heads/master Commit: c281189222e645d2c87277c269e2102c3c8ccc95 Parents: f6a1899 Author: Michael Giannakopoulos miccagi...@gmail.com Authored: Fri Aug 1 21:00:31 2014 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Fri Aug 1 21:00:31 2014 -0700 -- .../spark/mllib/api/python/PythonMLLibAPI.scala | 28 - python/pyspark/mllib/regression.py | 32 +--- 2 files changed, 49 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c2811892/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 122925d..7d91273 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -23,6 +23,8 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ +import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} +import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors} import org.apache.spark.mllib.random.{RandomRDDGenerators = RG} import org.apache.spark.mllib.recommendation._ @@ -252,15 +254,27 @@ class PythonMLLibAPI extends Serializable { numIterations: Int, stepSize: Double, miniBatchFraction: Double, - initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { + initialWeightsBA: Array[Byte], + regParam: Double, + regType: String, + intercept: Boolean): java.util.List[java.lang.Object] = { +val lrAlg = new LinearRegressionWithSGD() +lrAlg.setIntercept(intercept) +lrAlg.optimizer + .setNumIterations(numIterations) + .setRegParam(regParam) + .setStepSize(stepSize) +if (regType == l2) { + lrAlg.optimizer.setUpdater(new SquaredL2Updater) +} else if (regType == l1) { + lrAlg.optimizer.setUpdater(new L1Updater) +} else if (regType != none) { + throw new java.lang.IllegalArgumentException(Invalid value for 'regType' parameter. ++ Can only be initialized using the following string values: [l1, l2, none].) +} trainRegressionModel( (data, initialWeights) = -LinearRegressionWithSGD.train( - data, - numIterations, - stepSize, - miniBatchFraction, - initialWeights), +lrAlg.run(data, initialWeights), dataBytesJRDD, initialWeightsBA) } http://git-wip-us.apache.org/repos/asf/spark/blob/c2811892/python/pyspark/mllib/regression.py -- diff --git
git commit: [SPARK-2801][MLlib]: DistributionGenerator renamed to RandomDataGenerator. RandomRDD is now of generic type
Repository: spark Updated Branches: refs/heads/master e25ec0617 - fda475987 [SPARK-2801][MLlib]: DistributionGenerator renamed to RandomDataGenerator. RandomRDD is now of generic type The RandomRDDGenerators used to only output RDD[Double]. Now RandomRDDGenerators.randomRDD can be used to generate a random RDD[T] via a class that extends RandomDataGenerator, by supplying a type T and overriding the nextValue() function as they wish. Author: Burak brk...@gmail.com Closes #1732 from brkyvz/SPARK-2801 and squashes the following commits: c94a694 [Burak] [SPARK-2801][MLlib] Missing ClassTags added 22d96fe [Burak] [SPARK-2801][MLlib]: DistributionGenerator renamed to RandomDataGenerator, generic types added for RandomRDD instead of Double Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fda47598 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fda47598 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fda47598 Branch: refs/heads/master Commit: fda475987f3b8b37d563033b0e45706ce433824a Parents: e25ec06 Author: Burak brk...@gmail.com Authored: Fri Aug 1 22:32:12 2014 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Fri Aug 1 22:32:12 2014 -0700 -- .../mllib/random/DistributionGenerator.scala| 101 --- .../mllib/random/RandomDataGenerator.scala | 101 +++ .../mllib/random/RandomRDDGenerators.scala | 32 +++--- .../org/apache/spark/mllib/rdd/RandomRDD.scala | 34 --- .../random/DistributionGeneratorSuite.scala | 90 - .../mllib/random/RandomDataGeneratorSuite.scala | 90 + .../mllib/random/RandomRDDGeneratorsSuite.scala | 8 +- 7 files changed, 231 insertions(+), 225 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fda47598/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala deleted file mode 100644 index 7ecb409..000 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.random - -import cern.jet.random.Poisson -import cern.jet.random.engine.DRand - -import org.apache.spark.annotation.Experimental -import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} - -/** - * :: Experimental :: - * Trait for random number generators that generate i.i.d. values from a distribution. - */ -@Experimental -trait DistributionGenerator extends Pseudorandom with Serializable { - - /** - * Returns an i.i.d. sample as a Double from an underlying distribution. - */ - def nextValue(): Double - - /** - * Returns a copy of the DistributionGenerator with a new instance of the rng object used in the - * class when applicable for non-locking concurrent usage. - */ - def copy(): DistributionGenerator -} - -/** - * :: Experimental :: - * Generates i.i.d. samples from U[0.0, 1.0] - */ -@Experimental -class UniformGenerator extends DistributionGenerator { - - // XORShiftRandom for better performance. Thread safety isn't necessary here. - private val random = new XORShiftRandom() - - override def nextValue(): Double = { -random.nextDouble() - } - - override def setSeed(seed: Long) = random.setSeed(seed) - - override def copy(): UniformGenerator = new UniformGenerator() -} - -/** - * :: Experimental :: - * Generates i.i.d. samples from the standard normal distribution. - */ -@Experimental -class StandardNormalGenerator extends DistributionGenerator { - - // XORShiftRandom for better performance. Thread safety isn't necessary here. - private val random = new XORShiftRandom() - - override def nextValue(): Double = { - random.nextGaussian() - } - - override def setSeed(seed: Long) =