spark git commit: [SPARK-11216][SQL][FOLLOW-UP] add encoder/decoder for external row

2015-10-22 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master f6d06adf0 -> 42d225f44


[SPARK-11216][SQL][FOLLOW-UP] add encoder/decoder for external row

address comments in https://github.com/apache/spark/pull/9184

Author: Wenchen Fan 

Closes #9212 from cloud-fan/encoder.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42d225f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42d225f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42d225f4

Branch: refs/heads/master
Commit: 42d225f449c633be7465493c57b9881303ee14ba
Parents: f6d06ad
Author: Wenchen Fan 
Authored: Thu Oct 22 10:53:59 2015 -0700
Committer: Michael Armbrust 
Committed: Thu Oct 22 10:53:59 2015 -0700

--
 .../spark/sql/catalyst/encoders/ClassEncoder.scala| 14 +++---
 .../spark/sql/catalyst/encoders/RowEncoder.scala  |  9 ++---
 .../spark/sql/catalyst/expressions/objects.scala  |  8 +++-
 .../spark/sql/catalyst/encoders/RowEncoderSuite.scala |  2 +-
 4 files changed, 17 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/42d225f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala
index f3a1063..54096f1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ClassEncoder.scala
@@ -48,20 +48,12 @@ case class ClassEncoder[T](
   private val dataType = ObjectType(clsTag.runtimeClass)
 
   override def toRow(t: T): InternalRow = {
-if (t == null) {
-  null
-} else {
-  inputRow(0) = t
-  extractProjection(inputRow)
-}
+inputRow(0) = t
+extractProjection(inputRow)
   }
 
   override def fromRow(row: InternalRow): T = {
-if (row eq null) {
-  null.asInstanceOf[T]
-} else {
-  constructProjection(row).get(0, dataType).asInstanceOf[T]
-}
+constructProjection(row).get(0, dataType).asInstanceOf[T]
   }
 
   override def bind(schema: Seq[Attribute]): ClassEncoder[T] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/42d225f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index 3e74aab..5142856 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -26,8 +26,11 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
+/**
+ * A factory for constructing encoders that convert external row to/from the 
Spark SQL
+ * internal binary representation.
+ */
 object RowEncoder {
-
   def apply(schema: StructType): ClassEncoder[Row] = {
 val cls = classOf[Row]
 val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
@@ -136,7 +139,7 @@ object RowEncoder {
 constructorFor(BoundReference(i, f.dataType, f.nullable), f.dataType)
   )
 }
-CreateRow(fields)
+CreateExternalRow(fields)
   }
 
   private def constructorFor(input: Expression, dataType: DataType): 
Expression = dataType match {
@@ -195,7 +198,7 @@ object RowEncoder {
   Literal.create(null, externalDataTypeFor(f.dataType)),
   constructorFor(getField(input, i, f.dataType), f.dataType))
   }
-  CreateRow(convertedFields)
+  CreateExternalRow(convertedFields)
   }
 
   private def getField(

http://git-wip-us.apache.org/repos/asf/spark/blob/42d225f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
index 8fc00ad..b42d6c5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
@@ -456,7 +456,13 @@ case class MapObjects(
   }
 }
 
-case class CreateRow(children: Seq[Expression]) extends Expression {

spark git commit: [SPARK-11232][CORE] Use 'offer' instead of 'put' to make sure calling send won't be interrupted

2015-10-22 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 42d225f44 -> 7bb6d31cf


[SPARK-11232][CORE] Use 'offer' instead of 'put' to make sure calling send 
won't be interrupted

The current `NettyRpcEndpointRef.send` can be interrupted because it uses 
`LinkedBlockingQueue.put`, which may hang the application.

Image the following execution order:

  | thread 1: TaskRunner.kill | thread 2: TaskRunner.run
- | - | -
1 | killed = true |
2 |  | if (killed) {
3 |  | throw new TaskKilledException
4 |  | case _: TaskKilledException  _: InterruptedException if task.killed =>
5 | task.kill(interruptThread): interruptThread is true |
6 | | execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled))
7 | | localEndpoint.send(StatusUpdate(taskId, state, serializedData)): in 
LocalBackend

Then `localEndpoint.send(StatusUpdate(taskId, state, serializedData))` will 
throw `InterruptedException`. This will prevent the executor from updating the 
task status and hang the application.

An failure caused by the above issue here: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44062/consoleFull

Since `receivers` is an unbounded `LinkedBlockingQueue`, we can just use 
`LinkedBlockingQueue.offer` to resolve this issue.

Author: zsxwing 

Closes #9198 from zsxwing/dont-interrupt-send.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bb6d31c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bb6d31c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bb6d31c

Branch: refs/heads/master
Commit: 7bb6d31cff279776f90744407291682774cfe1c2
Parents: 42d225f
Author: zsxwing 
Authored: Thu Oct 22 11:31:47 2015 -0700
Committer: Marcelo Vanzin 
Committed: Thu Oct 22 11:31:47 2015 -0700

--
 .../scala/org/apache/spark/rpc/netty/Dispatcher.scala | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7bb6d31c/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index f1a8273..7bf44a6 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -66,7 +66,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
   }
   val data = endpoints.get(name)
   endpointRefs.put(data.endpoint, data.ref)
-  receivers.put(data)  // for the OnStart message
+  receivers.offer(data)  // for the OnStart message
 }
 endpointRef
   }
@@ -80,7 +80,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
 val data = endpoints.remove(name)
 if (data != null) {
   data.inbox.stop()
-  receivers.put(data)  // for the OnStop message
+  receivers.offer(data)  // for the OnStop message
 }
 // Don't clean `endpointRefs` here because it's possible that some 
messages are being processed
 // now and they can use `getRpcEndpointRef`. So `endpointRefs` will be 
cleaned in Inbox via
@@ -163,7 +163,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
 true
   } else {
 data.inbox.post(createMessageFn(data.ref))
-receivers.put(data)
+receivers.offer(data)
 false
   }
 }
@@ -183,7 +183,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
 // Stop all endpoints. This will queue all endpoints for processing by the 
message loops.
 endpoints.keySet().asScala.foreach(unregisterRpcEndpoint)
 // Enqueue a message that tells the message loops to stop.
-receivers.put(PoisonPill)
+receivers.offer(PoisonPill)
 threadpool.shutdown()
   }
 
@@ -218,7 +218,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
 val data = receivers.take()
 if (data == PoisonPill) {
   // Put PoisonPill back so that other MessageLoops can see it.
-  receivers.put(PoisonPill)
+  receivers.offer(PoisonPill)
   return
 }
 data.inbox.process(Dispatcher.this)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11121][CORE] Correct the TaskLocation type

2015-10-22 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 1d9733271 -> c03b6d115


[SPARK-11121][CORE] Correct the TaskLocation type

Correct the logic to return `HDFSCacheTaskLocation` instance when the input 
`str` is a in memory location.

Author: zhichao.li 

Closes #9096 from zhichao-li/uselessBranch.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c03b6d11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c03b6d11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c03b6d11

Branch: refs/heads/master
Commit: c03b6d11589102b91f08728519e8520025db91e1
Parents: 1d97332
Author: zhichao.li 
Authored: Thu Oct 22 03:59:26 2015 -0700
Committer: Sean Owen 
Committed: Thu Oct 22 03:59:26 2015 -0700

--
 .../scala/org/apache/spark/scheduler/TaskLocation.scala  |  2 +-
 .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 11 ---
 2 files changed, 9 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c03b6d11/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
index da07ce2..1b65926 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
@@ -67,7 +67,7 @@ private[spark] object TaskLocation {
 if (hstr.equals(str)) {
   new HostTaskLocation(str)
 } else {
-  new HostTaskLocation(hstr)
+  new HDFSCacheTaskLocation(hstr)
 }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c03b6d11/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index f0eadf2..695523c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -759,9 +759,9 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 val sched = new FakeTaskScheduler(sc,
 ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
 val taskSet = FakeTask.createTaskSet(3,
-  Seq(HostTaskLocation("host1")),
-  Seq(HostTaskLocation("host2")),
-  Seq(HDFSCacheTaskLocation("host3")))
+  Seq(TaskLocation("host1")),
+  Seq(TaskLocation("host2")),
+  Seq(TaskLocation("hdfs_cache_host3")))
 val clock = new ManualClock
 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
 assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, 
NODE_LOCAL, ANY)))
@@ -776,6 +776,11 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(manager.myLocalityLevels.sameElements(Array(ANY)))
   }
 
+  test("Test TaskLocation for different host type.") {
+assert(TaskLocation("host1") === HostTaskLocation("host1"))
+assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1"))
+  }
+
   def createTaskResult(id: Int): DirectTaskResult[Int] = {
 val valueSer = SparkEnv.get.serializer.newInstance()
 new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new 
TaskMetrics)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9735][SQL] Respect the user specified schema than the infer partition schema for HadoopFsRelation

2015-10-22 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 3535b91dd -> d4950e6be


[SPARK-9735][SQL] Respect the user specified schema than the infer partition 
schema for HadoopFsRelation

To enable the unit test of `hadoopFsRelationSuite.Partition column type 
casting`. It previously threw exception like below, as we treat the auto infer 
partition schema with higher priority than the user specified one.

```
java.lang.ClassCastException: java.lang.Integer cannot be cast to 
org.apache.spark.unsafe.types.UTF8String
at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220)
at 
org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
07:44:01.344 ERROR org.apache.spark.executor.Executor: Exception in task 14.0 
in stage 3.0 (TID 206)
java.lang.ClassCastException: java.lang.Integer cannot be cast to 
org.apache.spark.unsafe.types.UTF8String
at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220)
at 
org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

spark git commit: [SPARK-11116][SQL] First Draft of Dataset API

2015-10-22 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 188ea348f -> 53e83a3a7


[SPARK-6][SQL] First Draft of Dataset API

*This PR adds a new experimental API to Spark, tentitively named Datasets.*

A `Dataset` is a strongly-typed collection of objects that can be transformed 
in parallel using functional or relational operations.  Example usage is as 
follows:

### Functional
```scala
> val ds: Dataset[Int] = Seq(1, 2, 3).toDS()
> ds.filter(_ % 1 == 0).collect()
res1: Array[Int] = Array(1, 2, 3)
```

### Relational
```scala
scala> ds.toDF().show()
+-+
|value|
+-+
|1|
|2|
|3|
+-+

> ds.select(expr("value + 1").as[Int]).collect()
res11: Array[Int] = Array(2, 3, 4)
```

## Comparison to RDDs
 A `Dataset` differs from an `RDD` in the following ways:
  - The creation of a `Dataset` requires the presence of an explicit `Encoder` 
that can be
used to serialize the object into a binary format.  Encoders are also 
capable of mapping the
schema of a given object to the Spark SQL type system.  In contrast, RDDs 
rely on runtime
reflection based serialization.
  - Internally, a `Dataset` is represented by a Catalyst logical plan and the 
data is stored
in the encoded form.  This representation allows for additional logical 
operations and
enables many operations (sorting, shuffling, etc.) to be performed without 
deserializing to
an object.

A `Dataset` can be converted to an `RDD` by calling the `.rdd` method.

## Comparison to DataFrames

A `Dataset` can be thought of as a specialized DataFrame, where the elements 
map to a specific
JVM object type, instead of to a generic `Row` container. A DataFrame can be 
transformed into
specific Dataset by calling `df.as[ElementType]`.  Similarly you can transform 
a strongly-typed
`Dataset` to a generic DataFrame by calling `ds.toDF()`.

## Implementation Status and TODOs

This is a rough cut at the least controversial parts of the API.  The primary 
purpose here is to get something committed so that we can better parallelize 
further work and get early feedback on the API.  The following is being 
deferred to future PRs:
 - Joins and Aggregations (prototype here 
https://github.com/apache/spark/commit/f11f91e6f08c8cf389b8388b626cd29eec32d937)
 - Support for Java

Additionally, the responsibility for binding an encoder to a given schema is 
currently done in a fairly ad-hoc fashion.  This is an internal detail, and 
what we are doing today works for the cases we care about.  However, as we add 
more APIs we'll probably need to do this in a more principled way (i.e. 
separate resolution from binding as we do in DataFrames).

## COMPATIBILITY NOTE
Long term we plan to make `DataFrame` extend `Dataset[Row]`.  However,
making this change to che class hierarchy would break the function signatures 
for the existing
function operations (map, flatMap, etc).  As such, this class should be 
considered a preview
of the final API.  Changes will be made to the interface after Spark 1.6.

Author: Michael Armbrust 

Closes #9190 from marmbrus/dataset-infra.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53e83a3a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53e83a3a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53e83a3a

Branch: refs/heads/master
Commit: 53e83a3a77cafc2ccd0764ecdb8b3ba735bc51fc
Parents: 188ea34
Author: Michael Armbrust 
Authored: Thu Oct 22 15:20:17 2015 -0700
Committer: Reynold Xin 
Committed: Thu Oct 22 15:20:17 2015 -0700

--
 .../spark/sql/catalyst/ScalaReflection.scala|   8 +-
 .../sql/catalyst/encoders/ClassEncoder.scala|  38 +-
 .../spark/sql/catalyst/encoders/Encoder.scala   |  19 +-
 .../sql/catalyst/encoders/ProductEncoder.scala  |  12 +-
 .../sql/catalyst/encoders/primitiveTypes.scala  | 100 +
 .../spark/sql/catalyst/encoders/tuples.scala| 173 
 .../sql/catalyst/expressions/AttributeMap.scala |   7 +
 .../sql/catalyst/expressions/AttributeSet.scala |   4 +
 .../expressions/complexTypeCreator.scala|   8 +
 .../sql/catalyst/expressions/package.scala  |  12 +
 .../catalyst/plans/logical/basicOperators.scala |  72 +++-
 .../encoders/PrimitiveEncoderSuite.scala|  43 ++
 .../catalyst/encoders/ProductEncoderSuite.scala |  21 +-
 .../scala/org/apache/spark/sql/Column.scala |  15 +
 .../scala/org/apache/spark/sql/DataFrame.scala  |  11 +
 .../scala/org/apache/spark/sql/Dataset.scala| 392 +++
 .../org/apache/spark/sql/DatasetHolder.scala|  30 ++
 .../org/apache/spark/sql/GroupedDataset.scala   |  68 
 .../scala/org/apache/spark/sql/SQLContext.scala |  12 +
 .../org/apache/spark/sql/SQLImplicits.scala |  16 +-
 .../spark/sql/execution/GroupedIterator.scala   | 141 +++
 

[2/2] spark git commit: [SPARK-10812] [YARN] Fix shutdown of token renewer.

2015-10-22 Thread vanzin
[SPARK-10812] [YARN] Fix shutdown of token renewer.

A recent change to fix the referenced bug caused this exception in
the `SparkContext.stop()` path:

org.apache.spark.SparkException: YarnSparkHadoopUtil is not available in 
non-YARN mode!
at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.get(YarnSparkHadoopUtil.scala:167)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:182)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:440)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1579)
at 
org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1730)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1729)

Author: Marcelo Vanzin 

Closes #8996 from vanzin/SPARK-10812.

(cherry picked from commit 4b74755122d51edb1257d4f3785fb24508681068)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e405c2a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e405c2a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e405c2a1

Branch: refs/heads/branch-1.5
Commit: e405c2a1f6c75b50324de1bd18363b031d34f3d0
Parents: c49e0c3
Author: Marcelo Vanzin 
Authored: Wed Oct 7 11:38:07 2015 -0700
Committer: Marcelo Vanzin 
Committed: Thu Oct 22 13:14:26 2015 -0700

--
 .../spark/scheduler/cluster/YarnClientSchedulerBackend.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e405c2a1/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d06d951..36d5759 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -178,8 +178,8 @@ private[spark] class YarnClientSchedulerBackend(
   monitorThread.stopMonitor()
 }
 super.stop()
-client.stop()
 YarnSparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
+client.stop()
 logInfo("Stopped")
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark git commit: [SPARK-10812] [YARN] Spark hadoop util support switching to yarn

2015-10-22 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 f9ad0e543 -> e405c2a1f


[SPARK-10812] [YARN] Spark hadoop util support switching to yarn

While this is likely not a huge issue for real production systems, for test 
systems which may setup a Spark Context and tear it down and stand up a Spark 
Context with a different master (e.g. some local mode & some yarn mode) tests 
this cane be an issue. Discovered during work on spark-testing-base on Spark 
1.4.1, but seems like the logic that triggers it is present in master (see 
SparkHadoopUtil object). A valid work around for users encountering this issue 
is to fork a different JVM, however this can be heavy weight.

```
[info] SampleMiniClusterTest:
[info] Exception encountered when attempting to run a suite with class name: 
com.holdenkarau.spark.testing.SampleMiniClusterTest *** ABORTED ***
[info] java.lang.ClassCastException: org.apache.spark.deploy.SparkHadoopUtil 
cannot be cast to org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
[info] at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.get(YarnSparkHadoopUtil.scala:163)
[info] at 
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:257)
[info] at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:561)
[info] at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:115)
[info] at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
[info] at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
[info] at org.apache.spark.SparkContext.(SparkContext.scala:497)
[info] at 
com.holdenkarau.spark.testing.SharedMiniCluster$class.setup(SharedMiniCluster.scala:186)
[info] at 
com.holdenkarau.spark.testing.SampleMiniClusterTest.setup(SampleMiniClusterTest.scala:26)
[info] at 
com.holdenkarau.spark.testing.SharedMiniCluster$class.beforeAll(SharedMiniCluster.scala:103)
```

Author: Holden Karau 

Closes #8911 from 
holdenk/SPARK-10812-spark-hadoop-util-support-switching-to-yarn.

(cherry picked from commit d8d50ed388d2e695b69d2b93a620045ef2f0bc18)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c49e0c3f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c49e0c3f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c49e0c3f

Branch: refs/heads/branch-1.5
Commit: c49e0c3f6d25aa15b7cc25db0e9ae5a869184480
Parents: f9ad0e5
Author: Holden Karau 
Authored: Mon Sep 28 06:33:45 2015 -0700
Committer: Marcelo Vanzin 
Committed: Thu Oct 22 13:14:21 2015 -0700

--
 .../scala/org/apache/spark/SparkContext.scala   |  2 ++
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 30 ++--
 .../org/apache/spark/deploy/yarn/Client.scala   |  6 +++-
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  | 12 
 4 files changed, 34 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c49e0c3f/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 011e19f..2a2fa75 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1750,6 +1750,8 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   }
   SparkEnv.set(null)
 }
+// Unset YARN mode system env variable, to allow switching between cluster 
types.
+System.clearProperty("SPARK_YARN_MODE")
 SparkContext.clearActiveContext()
 logInfo("Successfully stopped SparkContext")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c49e0c3f/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index dda4216..1157ee0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -380,20 +380,13 @@ class SparkHadoopUtil extends Logging {
 
 object SparkHadoopUtil {
 
-  private val hadoop = {
-val yarnMode = java.lang.Boolean.valueOf(
-System.getProperty("SPARK_YARN_MODE", 
System.getenv("SPARK_YARN_MODE")))
-if (yarnMode) {
-  try {
-Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
-  .newInstance()
-  .asInstanceOf[SparkHadoopUtil]
-  } catch {
-   case e: Exception => throw new 

spark git commit: [SPARK-11242][SQL] In conf/spark-env.sh.template SPARK_DRIVER_MEMORY is documented incorrectly

2015-10-22 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master d4950e6be -> 188ea348f


[SPARK-11242][SQL] In conf/spark-env.sh.template SPARK_DRIVER_MEMORY is 
documented incorrectly

Minor fix on the comment

Author: guoxi 

Closes #9201 from xguo27/SPARK-11242.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/188ea348
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/188ea348
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/188ea348

Branch: refs/heads/master
Commit: 188ea348fdcf877d86f3c433cd15f6468fe3b42a
Parents: d4950e6
Author: guoxi 
Authored: Thu Oct 22 13:56:18 2015 -0700
Committer: Sean Owen 
Committed: Thu Oct 22 13:56:18 2015 -0700

--
 conf/spark-env.sh.template | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/188ea348/conf/spark-env.sh.template
--
diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template
index 990ded4..771251f 100755
--- a/conf/spark-env.sh.template
+++ b/conf/spark-env.sh.template
@@ -36,10 +36,10 @@
 
 # Options read in YARN client mode
 # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
-# - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
-# - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
-# - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
-# - SPARK_DRIVER_MEMORY, Memory for Master (e.g. 1000M, 2G) (Default: 1G)
+# - SPARK_EXECUTOR_INSTANCES, Number of executors to start (Default: 2)
+# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
+# - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
+# - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)
 # - SPARK_YARN_APP_NAME, The name of your application (Default: Spark)
 # - SPARK_YARN_QUEUE, The hadoop queue to use for allocation requests 
(Default: ‘default’)
 # - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed 
with the job.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7021] Add JUnit output for Python unit tests

2015-10-22 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 53e83a3a7 -> 163d53e82


[SPARK-7021] Add JUnit output for Python unit tests

WIP

Author: Gábor Lipták 

Closes #8323 from gliptak/SPARK-7021.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/163d53e8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/163d53e8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/163d53e8

Branch: refs/heads/master
Commit: 163d53e829c166f061589cc379f61642d4c9a40f
Parents: 53e83a3
Author: Gábor Lipták 
Authored: Thu Oct 22 15:27:11 2015 -0700
Committer: Davies Liu 
Committed: Thu Oct 22 15:27:11 2015 -0700

--
 python/pyspark/ml/tests.py|  9 -
 python/pyspark/mllib/tests.py |  9 -
 python/pyspark/sql/tests.py   |  9 -
 python/pyspark/streaming/tests.py | 11 ++-
 python/pyspark/tests.py   | 19 ++-
 5 files changed, 48 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/163d53e8/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 6a2577d..7a16cf5 100644
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -20,6 +20,10 @@ Unit tests for Spark ML Python APIs.
 """
 
 import sys
+try:
+import xmlrunner
+except ImportError:
+xmlrunner = None
 
 if sys.version_info[:2] <= (2, 6):
 try:
@@ -368,4 +372,7 @@ class CrossValidatorTests(PySparkTestCase):
 
 
 if __name__ == "__main__":
-unittest.main()
+if xmlrunner:
+
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
+else:
+unittest.main()

http://git-wip-us.apache.org/repos/asf/spark/blob/163d53e8/python/pyspark/mllib/tests.py
--
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 2ad69a0..f8e8e0e 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -31,6 +31,10 @@ from numpy import (
 from numpy import sum as array_sum
 
 from py4j.protocol import Py4JJavaError
+try:
+import xmlrunner
+except ImportError:
+xmlrunner = None
 
 if sys.version > '3':
 basestring = str
@@ -1538,7 +1542,10 @@ class MLUtilsTests(MLlibTestCase):
 if __name__ == "__main__":
 if not _have_scipy:
 print("NOTE: Skipping SciPy tests as it does not seem to be installed")
-unittest.main()
+if xmlrunner:
+
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
+else:
+unittest.main()
 if not _have_scipy:
 print("NOTE: SciPy tests were skipped as it does not seem to be 
installed")
 sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/163d53e8/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index f465e1f..6356d4b 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -31,6 +31,10 @@ import time
 import datetime
 
 import py4j
+try:
+import xmlrunner
+except ImportError:
+xmlrunner = None
 
 if sys.version_info[:2] <= (2, 6):
 try:
@@ -1222,4 +1226,7 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
 
 
 if __name__ == "__main__":
-unittest.main()
+if xmlrunner:
+
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
+else:
+unittest.main()

http://git-wip-us.apache.org/repos/asf/spark/blob/163d53e8/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 4963425..2c908da 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -27,6 +27,11 @@ import struct
 import shutil
 from functools import reduce
 
+try:
+import xmlrunner
+except ImportError:
+xmlrunner = None
+
 if sys.version_info[:2] <= (2, 6):
 try:
 import unittest2 as unittest
@@ -1303,4 +1308,8 @@ if __name__ == "__main__":
 for testcase in testcases:
 sys.stderr.write("[Running %s]\n" % (testcase))
 tests = unittest.TestLoader().loadTestsFromTestCase(testcase)
-unittest.TextTestRunner(verbosity=3).run(tests)
+if xmlrunner:
+unittest.main(tests, verbosity=3,
+  
testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
+else:
+unittest.TextTestRunner(verbosity=3).run(tests)


spark git commit: [SPARK-11251] Fix page size calculation in local mode

2015-10-22 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 e405c2a1f -> a76cf51ed


[SPARK-11251] Fix page size calculation in local mode

```
// My machine only has 8 cores
$ bin/spark-shell --master local[32]
scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b")
scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count()

Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
```

Author: Andrew Or 

Closes #9209 from andrewor14/fix-local-page-size.

(cherry picked from commit 34e71c6d89c1f2b6236dbf0d75cd12da08003c84)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a76cf51e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a76cf51e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a76cf51e

Branch: refs/heads/branch-1.5
Commit: a76cf51ed91d99c88f301ec85f3cda1288bcf346
Parents: e405c2a
Author: Andrew Or 
Authored: Thu Oct 22 15:58:08 2015 -0700
Committer: Reynold Xin 
Committed: Thu Oct 22 15:58:17 2015 -0700

--
 .../scala/org/apache/spark/SparkContext.scala   | 48 ++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |  4 +-
 .../OutputCommitCoordinatorSuite.scala  |  3 +-
 3 files changed, 40 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a76cf51e/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2a2fa75..a8f6047 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -274,7 +274,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   conf: SparkConf,
   isLocal: Boolean,
   listenerBus: LiveListenerBus): SparkEnv = {
-SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
+SparkEnv.createDriverEnv(conf, isLocal, listenerBus, 
SparkContext.numDriverCores(master))
   }
 
   private[spark] def env: SparkEnv = _env
@@ -2548,24 +2548,28 @@ object SparkContext extends Logging {
   }
 
   /**
+   * The number of driver cores to use for execution in local mode, 0 
otherwise.
+   */
+  private[spark] def numDriverCores(master: String): Int = {
+def convertToInt(threads: String): Int = {
+  if (threads == "*") Runtime.getRuntime.availableProcessors() else 
threads.toInt
+}
+master match {
+  case "local" => 1
+  case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
+  case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => 
convertToInt(threads)
+  case _ => 0 // driver is not used for execution
+}
+  }
+
+  /**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
   private def createTaskScheduler(
   sc: SparkContext,
   master: String): (SchedulerBackend, TaskScheduler) = {
-// Regular expression used for local[N] and local[*] master formats
-val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
-// Regular expression for local[N, maxRetries], used in tests with failing 
tasks
-val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
-// Regular expression for simulating a Spark cluster of [N, cores, memory] 
locally
-val LOCAL_CLUSTER_REGEX = 
"""local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
-// Regular expression for connecting to Spark deploy clusters
-val SPARK_REGEX = """spark://(.*)""".r
-// Regular expression for connection to Mesos cluster by mesos:// or zk:// 
url
-val MESOS_REGEX = """(mesos|zk)://.*""".r
-// Regular expression for connection to Simr cluster
-val SIMR_REGEX = """simr://(.*)""".r
+import SparkMasterRegex._
 
 // When running locally, don't try to re-execute tasks on failure.
 val MAX_LOCAL_TASK_FAILURES = 1
@@ -2707,6 +2711,24 @@ object SparkContext extends Logging {
 }
 
 /**
+ * A collection of regexes for extracting information from the master string.
+ */
+private object SparkMasterRegex {
+  // Regular expression used for local[N] and local[*] master formats
+  val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
+  // Regular expression for local[N, maxRetries], used in tests with failing 
tasks
+  val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
+  // Regular expression for simulating a Spark cluster of [N, cores, memory] 
locally
+  val 

spark git commit: [SPARK-11251] Fix page size calculation in local mode

2015-10-22 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 163d53e82 -> 34e71c6d8


[SPARK-11251] Fix page size calculation in local mode

```
// My machine only has 8 cores
$ bin/spark-shell --master local[32]
scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b")
scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count()

Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
```

Author: Andrew Or 

Closes #9209 from andrewor14/fix-local-page-size.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34e71c6d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34e71c6d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34e71c6d

Branch: refs/heads/master
Commit: 34e71c6d89c1f2b6236dbf0d75cd12da08003c84
Parents: 163d53e
Author: Andrew Or 
Authored: Thu Oct 22 15:58:08 2015 -0700
Committer: Reynold Xin 
Committed: Thu Oct 22 15:58:08 2015 -0700

--
 .../scala/org/apache/spark/SparkContext.scala   | 48 ++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |  4 +-
 .../OutputCommitCoordinatorSuite.scala  |  3 +-
 3 files changed, 40 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/34e71c6d/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ccba3ed..a6857b4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -269,7 +269,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   conf: SparkConf,
   isLocal: Boolean,
   listenerBus: LiveListenerBus): SparkEnv = {
-SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
+SparkEnv.createDriverEnv(conf, isLocal, listenerBus, 
SparkContext.numDriverCores(master))
   }
 
   private[spark] def env: SparkEnv = _env
@@ -2561,24 +2561,28 @@ object SparkContext extends Logging {
   }
 
   /**
+   * The number of driver cores to use for execution in local mode, 0 
otherwise.
+   */
+  private[spark] def numDriverCores(master: String): Int = {
+def convertToInt(threads: String): Int = {
+  if (threads == "*") Runtime.getRuntime.availableProcessors() else 
threads.toInt
+}
+master match {
+  case "local" => 1
+  case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
+  case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => 
convertToInt(threads)
+  case _ => 0 // driver is not used for execution
+}
+  }
+
+  /**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
   private def createTaskScheduler(
   sc: SparkContext,
   master: String): (SchedulerBackend, TaskScheduler) = {
-// Regular expression used for local[N] and local[*] master formats
-val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
-// Regular expression for local[N, maxRetries], used in tests with failing 
tasks
-val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
-// Regular expression for simulating a Spark cluster of [N, cores, memory] 
locally
-val LOCAL_CLUSTER_REGEX = 
"""local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
-// Regular expression for connecting to Spark deploy clusters
-val SPARK_REGEX = """spark://(.*)""".r
-// Regular expression for connection to Mesos cluster by mesos:// or zk:// 
url
-val MESOS_REGEX = """(mesos|zk)://.*""".r
-// Regular expression for connection to Simr cluster
-val SIMR_REGEX = """simr://(.*)""".r
+import SparkMasterRegex._
 
 // When running locally, don't try to re-execute tasks on failure.
 val MAX_LOCAL_TASK_FAILURES = 1
@@ -2720,6 +2724,24 @@ object SparkContext extends Logging {
 }
 
 /**
+ * A collection of regexes for extracting information from the master string.
+ */
+private object SparkMasterRegex {
+  // Regular expression used for local[N] and local[*] master formats
+  val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
+  // Regular expression for local[N, maxRetries], used in tests with failing 
tasks
+  val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
+  // Regular expression for simulating a Spark cluster of [N, cores, memory] 
locally
+  val LOCAL_CLUSTER_REGEX = 
"""local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
+  // Regular expression for 

[2/2] spark git commit: Preparing development version 1.5.3-SNAPSHOT

2015-10-22 Thread pwendell
Preparing development version 1.5.3-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be3e3434
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be3e3434
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be3e3434

Branch: refs/heads/branch-1.5
Commit: be3e343453c032e0ae01bcaa5d359de9ef02e950
Parents: ad6ade1
Author: Patrick Wendell 
Authored: Thu Oct 22 16:02:11 2015 -0700
Committer: Patrick Wendell 
Committed: Thu Oct 22 16:02:11 2015 -0700

--
 assembly/pom.xml| 2 +-
 bagel/pom.xml   | 2 +-
 core/pom.xml| 2 +-
 examples/pom.xml| 2 +-
 external/flume-assembly/pom.xml | 2 +-
 external/flume-sink/pom.xml | 2 +-
 external/flume/pom.xml  | 2 +-
 external/kafka-assembly/pom.xml | 2 +-
 external/kafka/pom.xml  | 2 +-
 external/mqtt-assembly/pom.xml  | 2 +-
 external/mqtt/pom.xml   | 2 +-
 external/twitter/pom.xml| 2 +-
 external/zeromq/pom.xml | 2 +-
 extras/java8-tests/pom.xml  | 2 +-
 extras/kinesis-asl-assembly/pom.xml | 2 +-
 extras/kinesis-asl/pom.xml  | 2 +-
 extras/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml  | 2 +-
 launcher/pom.xml| 2 +-
 mllib/pom.xml   | 2 +-
 network/common/pom.xml  | 2 +-
 network/shuffle/pom.xml | 2 +-
 network/yarn/pom.xml| 2 +-
 pom.xml | 2 +-
 repl/pom.xml| 2 +-
 sql/catalyst/pom.xml| 2 +-
 sql/core/pom.xml| 2 +-
 sql/hive-thriftserver/pom.xml   | 2 +-
 sql/hive/pom.xml| 2 +-
 streaming/pom.xml   | 2 +-
 tools/pom.xml   | 2 +-
 unsafe/pom.xml  | 2 +-
 yarn/pom.xml| 2 +-
 33 files changed, 33 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index f0c6c0c..6114f8c 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.2
+1.5.3-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index fdbbf9d..dd9eb9e 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.2
+1.5.3-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index bdf355f..350aaab 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.2
+1.5.3-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 6b7f72c..0d87f37 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.2
+1.5.3-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index 4d43903..3982b3d 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.2
+1.5.3-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index e3fa0c0..033f222 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.2
+1.5.3-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be3e3434/external/flume/pom.xml
--
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 700e912..74e9cf4 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -21,7 

[1/2] spark git commit: Preparing Spark release v1.5.2-rc1

2015-10-22 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 a76cf51ed -> be3e34345


Preparing Spark release v1.5.2-rc1


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad6ade12
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad6ade12
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad6ade12

Branch: refs/heads/branch-1.5
Commit: ad6ade12412361b5998898e5ce6b1e007fea02eb
Parents: a76cf51
Author: Patrick Wendell 
Authored: Thu Oct 22 16:02:05 2015 -0700
Committer: Patrick Wendell 
Committed: Thu Oct 22 16:02:05 2015 -0700

--
 assembly/pom.xml| 2 +-
 bagel/pom.xml   | 2 +-
 core/pom.xml| 2 +-
 examples/pom.xml| 2 +-
 external/flume-assembly/pom.xml | 2 +-
 external/flume-sink/pom.xml | 2 +-
 external/flume/pom.xml  | 2 +-
 external/kafka-assembly/pom.xml | 2 +-
 external/kafka/pom.xml  | 2 +-
 external/mqtt-assembly/pom.xml  | 2 +-
 external/mqtt/pom.xml   | 2 +-
 external/twitter/pom.xml| 2 +-
 external/zeromq/pom.xml | 2 +-
 extras/java8-tests/pom.xml  | 2 +-
 extras/kinesis-asl-assembly/pom.xml | 2 +-
 extras/kinesis-asl/pom.xml  | 2 +-
 extras/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml  | 2 +-
 launcher/pom.xml| 2 +-
 mllib/pom.xml   | 2 +-
 network/common/pom.xml  | 2 +-
 network/shuffle/pom.xml | 2 +-
 network/yarn/pom.xml| 2 +-
 pom.xml | 2 +-
 repl/pom.xml| 2 +-
 sql/catalyst/pom.xml| 2 +-
 sql/core/pom.xml| 2 +-
 sql/hive-thriftserver/pom.xml   | 2 +-
 sql/hive/pom.xml| 2 +-
 streaming/pom.xml   | 2 +-
 tools/pom.xml   | 2 +-
 unsafe/pom.xml  | 2 +-
 yarn/pom.xml| 2 +-
 33 files changed, 33 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 7671ba2..f0c6c0c 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.2-SNAPSHOT
+1.5.2
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 02e920d..fdbbf9d 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.2-SNAPSHOT
+1.5.2
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 03d26df..bdf355f 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.2-SNAPSHOT
+1.5.2
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index eb1910e..6b7f72c 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.2-SNAPSHOT
+1.5.2
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index 0de2f03..4d43903 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.2-SNAPSHOT
+1.5.2
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 66ab1b2..e3fa0c0 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.2-SNAPSHOT
+1.5.2
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad6ade12/external/flume/pom.xml
--
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index c058490..700e912 

Git Push Summary

2015-10-22 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.5.2-rc1 [created] ad6ade124

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11098][CORE] Add Outbox to cache the sending messages to resolve the message disorder issue

2015-10-22 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 34e71c6d8 -> a88c66ca8


[SPARK-11098][CORE] Add Outbox to cache the sending messages to resolve the 
message disorder issue

The current NettyRpc has a message order issue because it uses a thread pool to 
send messages. E.g., running the following two lines in the same thread,

```
ref.send("A")
ref.send("B")
```

The remote endpoint may see "B" before "A" because sending "A" and "B" are in 
parallel.
To resolve this issue, this PR added an outbox for each connection, and if we 
are connecting to the remote node when sending messages, just cache the sending 
messages in the outbox and send them one by one when the connection is 
established.

Author: zsxwing 

Closes #9197 from zsxwing/rpc-outbox.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a88c66ca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a88c66ca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a88c66ca

Branch: refs/heads/master
Commit: a88c66ca8780c7228dc909f904d31cd9464ee0e3
Parents: 34e71c6
Author: zsxwing 
Authored: Thu Oct 22 21:01:01 2015 -0700
Committer: Reynold Xin 
Committed: Thu Oct 22 21:01:01 2015 -0700

--
 .../apache/spark/rpc/netty/NettyRpcEnv.scala| 145 +++-
 .../org/apache/spark/rpc/netty/Outbox.scala | 222 +++
 2 files changed, 310 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a88c66ca/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index e01cf1a..284284e 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -20,6 +20,7 @@ import java.io._
 import java.net.{InetSocketAddress, URI}
 import java.nio.ByteBuffer
 import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
@@ -70,12 +71,30 @@ private[netty] class NettyRpcEnv(
   // Because TransportClientFactory.createClient is blocking, we need to run 
it in this thread pool
   // to implement non-blocking send/ask.
   // TODO: a non-blocking TransportClientFactory.createClient in future
-  private val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
+  private[netty] val clientConnectionExecutor = 
ThreadUtils.newDaemonCachedThreadPool(
 "netty-rpc-connection",
 conf.getInt("spark.rpc.connect.threads", 64))
 
   @volatile private var server: TransportServer = _
 
+  private val stopped = new AtomicBoolean(false)
+
+  /**
+   * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a 
remote [[RpcAddress]],
+   * we just put messages to its [[Outbox]] to implement a non-blocking `send` 
method.
+   */
+  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
+
+  /**
+   * Remove the address's Outbox and stop it.
+   */
+  private[netty] def removeOutbox(address: RpcAddress): Unit = {
+val outbox = outboxes.remove(address)
+if (outbox != null) {
+  outbox.stop()
+}
+  }
+
   def start(port: Int): Unit = {
 val bootstraps: java.util.List[TransportServerBootstrap] =
   if (securityManager.isAuthenticationEnabled()) {
@@ -116,6 +135,30 @@ private[netty] class NettyRpcEnv(
 dispatcher.stop(endpointRef)
   }
 
+  private def postToOutbox(address: RpcAddress, message: OutboxMessage): Unit 
= {
+val targetOutbox = {
+  val outbox = outboxes.get(address)
+  if (outbox == null) {
+val newOutbox = new Outbox(this, address)
+val oldOutbox = outboxes.putIfAbsent(address, newOutbox)
+if (oldOutbox == null) {
+  newOutbox
+} else {
+  oldOutbox
+}
+  } else {
+outbox
+  }
+}
+if (stopped.get) {
+  // It's possible that we put `targetOutbox` after stopping. So we need 
to clean it.
+  outboxes.remove(address)
+  targetOutbox.stop()
+} else {
+  targetOutbox.send(message)
+}
+  }
+
   private[netty] def send(message: RequestMessage): Unit = {
 val remoteAddr = message.receiver.address
 if (remoteAddr == address) {
@@ -127,37 +170,28 @@ private[netty] class NettyRpcEnv(
   val ack = response.asInstanceOf[Ack]
   logTrace(s"Received ack from ${ack.sender}")
 case Failure(e) =>
-  logError(s"Exception when sending $message", e)
+  logWarning(s"Exception when sending $message", e)
   }(ThreadUtils.sameThread)
 } else {
 

spark git commit: [SPARK-11134][CORE] Increase LauncherBackendSuite timeout.

2015-10-22 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master a88c66ca8 -> fa6a4fbf0


[SPARK-11134][CORE] Increase LauncherBackendSuite timeout.

This test can take a little while to finish on slow / loaded machines.

Author: Marcelo Vanzin 

Closes #9235 from vanzin/SPARK-11134.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa6a4fbf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa6a4fbf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa6a4fbf

Branch: refs/heads/master
Commit: fa6a4fbf08c8cca36cbe9f0d2bd20bc7be2ca45d
Parents: a88c66c
Author: Marcelo Vanzin 
Authored: Thu Oct 22 22:41:21 2015 -0700
Committer: Reynold Xin 
Committed: Thu Oct 22 22:41:21 2015 -0700

--
 .../scala/org/apache/spark/launcher/LauncherBackendSuite.scala   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fa6a4fbf/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala 
b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
index 07e8869..639d1da 100644
--- a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
@@ -54,13 +54,13 @@ class LauncherBackendSuite extends SparkFunSuite with 
Matchers {
   .startApplication()
 
 try {
-  eventually(timeout(10 seconds), interval(100 millis)) {
+  eventually(timeout(30 seconds), interval(100 millis)) {
 handle.getAppId() should not be (null)
   }
 
   handle.stop()
 
-  eventually(timeout(10 seconds), interval(100 millis)) {
+  eventually(timeout(30 seconds), interval(100 millis)) {
 handle.getState() should be (SparkAppHandle.State.KILLED)
   }
 } finally {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Fix a (very tiny) typo

2015-10-22 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master fa6a4fbf0 -> b1c1597e3


Fix a (very tiny) typo

Author: Jacek Laskowski 

Closes #9230 from jaceklaskowski/utils-seconds-typo.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1c1597e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1c1597e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1c1597e

Branch: refs/heads/master
Commit: b1c1597e3c47f1912809f3c5ab21833fa4241b54
Parents: fa6a4fb
Author: Jacek Laskowski 
Authored: Thu Oct 22 22:42:15 2015 -0700
Committer: Reynold Xin 
Committed: Thu Oct 22 22:42:15 2015 -0700

--
 core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b1c1597e/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5595040..5a976ee 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -952,7 +952,7 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds 
for internal use. If
+   * Convert a time parameter such as (50s, 100ms, or 250us) to seconds for 
internal use. If
* no suffix is provided, the passed number is assumed to be in seconds.
*/
   def timeStringAsSeconds(str: String): Long = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[4/4] spark git commit: [SPARK-10708] Consolidate sort shuffle implementations

2015-10-22 Thread joshrosen
[SPARK-10708] Consolidate sort shuffle implementations

There's a lot of duplication between SortShuffleManager and 
UnsafeShuffleManager. Given that these now provide the same set of 
functionality, now that UnsafeShuffleManager supports large records, I think 
that we should replace SortShuffleManager's serialized shuffle implementation 
with UnsafeShuffleManager's and should merge the two managers together.

Author: Josh Rosen 

Closes #8829 from JoshRosen/consolidate-sort-shuffle-implementations.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6d06adf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6d06adf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6d06adf

Branch: refs/heads/master
Commit: f6d06adf05afa9c5386dc2396c94e7a98730289f
Parents: 94e2064
Author: Josh Rosen 
Authored: Thu Oct 22 09:46:30 2015 -0700
Committer: Josh Rosen 
Committed: Thu Oct 22 09:46:30 2015 -0700

--
 .../sort/BypassMergeSortShuffleWriter.java  | 106 +++-
 .../spark/shuffle/sort/PackedRecordPointer.java |  92 +++
 .../shuffle/sort/ShuffleExternalSorter.java | 491 
 .../shuffle/sort/ShuffleInMemorySorter.java | 124 
 .../shuffle/sort/ShuffleSortDataFormat.java |  67 +++
 .../shuffle/sort/SortShuffleFileWriter.java |  53 --
 .../apache/spark/shuffle/sort/SpillInfo.java|  37 ++
 .../spark/shuffle/sort/UnsafeShuffleWriter.java | 489 
 .../shuffle/unsafe/PackedRecordPointer.java |  92 ---
 .../apache/spark/shuffle/unsafe/SpillInfo.java  |  37 --
 .../unsafe/UnsafeShuffleExternalSorter.java | 479 
 .../unsafe/UnsafeShuffleInMemorySorter.java | 124 
 .../unsafe/UnsafeShuffleSortDataFormat.java |  67 ---
 .../shuffle/unsafe/UnsafeShuffleWriter.java | 489 
 .../main/scala/org/apache/spark/SparkEnv.scala  |   2 +-
 .../spark/shuffle/sort/SortShuffleManager.scala | 175 +-
 .../spark/shuffle/sort/SortShuffleWriter.scala  |  28 +-
 .../shuffle/unsafe/UnsafeShuffleManager.scala   | 202 ---
 .../spark/util/collection/ChainedBuffer.scala   | 146 -
 .../spark/util/collection/ExternalSorter.scala  |  35 +-
 .../PartitionedSerializedPairBuffer.scala   | 273 -
 .../shuffle/sort/PackedRecordPointerSuite.java  | 102 
 .../sort/ShuffleInMemorySorterSuite.java| 124 
 .../shuffle/sort/UnsafeShuffleWriterSuite.java  | 560 +++
 .../unsafe/PackedRecordPointerSuite.java| 101 
 .../UnsafeShuffleInMemorySorterSuite.java   | 124 
 .../unsafe/UnsafeShuffleWriterSuite.java| 560 ---
 .../org/apache/spark/SortShuffleSuite.scala |  65 +++
 .../spark/scheduler/DAGSchedulerSuite.scala |   6 +-
 .../BypassMergeSortShuffleWriterSuite.scala |  64 ++-
 .../shuffle/sort/SortShuffleManagerSuite.scala  | 131 +
 .../shuffle/sort/SortShuffleWriterSuite.scala   |  45 --
 .../unsafe/UnsafeShuffleManagerSuite.scala  | 129 -
 .../shuffle/unsafe/UnsafeShuffleSuite.scala | 102 
 .../util/collection/ChainedBufferSuite.scala| 144 -
 .../PartitionedSerializedPairBufferSuite.scala  | 148 -
 docs/configuration.md   |   7 +-
 project/MimaExcludes.scala  |   9 +-
 .../apache/spark/sql/execution/Exchange.scala   |  23 +-
 .../execution/UnsafeRowSerializerSuite.scala|   9 +-
 40 files changed, 2600 insertions(+), 3461 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index f5d80bb..ee82d67 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -21,21 +21,30 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import javax.annotation.Nullable;
 
+import scala.None$;
+import scala.Option;
 import scala.Product2;
 import scala.Tuple2;
 import scala.collection.Iterator;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.Closeables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.spark.Partitioner;
+import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
 import org.apache.spark.TaskContext;
 import 

[3/4] spark git commit: [SPARK-10708] Consolidate sort shuffle implementations

2015-10-22 Thread joshrosen
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
 
b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
deleted file mode 100644
index e73ba39..000
--- 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.unsafe;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.util.LinkedList;
-
-import scala.Tuple2;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.TaskContext;
-import org.apache.spark.executor.ShuffleWriteMetrics;
-import org.apache.spark.serializer.DummySerializerInstance;
-import org.apache.spark.serializer.SerializerInstance;
-import org.apache.spark.shuffle.ShuffleMemoryManager;
-import org.apache.spark.storage.BlockManager;
-import org.apache.spark.storage.DiskBlockObjectWriter;
-import org.apache.spark.storage.TempShuffleBlockId;
-import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.array.ByteArrayMethods;
-import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.unsafe.memory.TaskMemoryManager;
-import org.apache.spark.util.Utils;
-
-/**
- * An external sorter that is specialized for sort-based shuffle.
- * 
- * Incoming records are appended to data pages. When all records have been 
inserted (or when the
- * current thread's shuffle memory limit is reached), the in-memory records 
are sorted according to
- * their partition ids (using a {@link UnsafeShuffleInMemorySorter}). The 
sorted records are then
- * written to a single output file (or multiple files, if we've spilled). The 
format of the output
- * files is the same as the format of the final output file written by
- * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output 
partition's records are
- * written as a single serialized, compressed stream that can be read with a 
new decompression and
- * deserialization stream.
- * 
- * Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter 
does not merge its
- * spill files. Instead, this merging is performed in {@link 
UnsafeShuffleWriter}, which uses a
- * specialized merge procedure that avoids extra serialization/deserialization.
- */
-final class UnsafeShuffleExternalSorter {
-
-  private final Logger logger = 
LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);
-
-  @VisibleForTesting
-  static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
-
-  private final int initialSize;
-  private final int numPartitions;
-  private final int pageSizeBytes;
-  @VisibleForTesting
-  final int maxRecordSizeBytes;
-  private final TaskMemoryManager taskMemoryManager;
-  private final ShuffleMemoryManager shuffleMemoryManager;
-  private final BlockManager blockManager;
-  private final TaskContext taskContext;
-  private final ShuffleWriteMetrics writeMetrics;
-
-  /** The buffer size to use when writing spills using DiskBlockObjectWriter */
-  private final int fileBufferSizeBytes;
-
-  /**
-   * Memory pages that hold the records being sorted. The pages in this list 
are freed when
-   * spilling, although in principle we could recycle these pages across 
spills (on the other hand,
-   * this might not be necessary if we maintained a pool of re-usable pages in 
the TaskMemoryManager
-   * itself).
-   */
-  private final LinkedList allocatedPages = new 
LinkedList();
-
-  private final LinkedList spills = new LinkedList();
-
-  /** Peak memory used by this sorter so far, in bytes. **/
-  private long peakMemoryUsedBytes;
-
-  // These variables are reset after spilling:
-  @Nullable private UnsafeShuffleInMemorySorter inMemSorter;
-  @Nullable private MemoryBlock currentPage = null;
-  private long currentPagePosition = -1;
-  private long 

[2/4] spark git commit: [SPARK-10708] Consolidate sort shuffle implementations

2015-10-22 Thread joshrosen
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala
deleted file mode 100644
index 87a786b..000
--- 
a/core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util.collection
-
-import java.io.InputStream
-import java.nio.IntBuffer
-import java.util.Comparator
-
-import org.apache.spark.serializer.{JavaSerializerInstance, SerializerInstance}
-import org.apache.spark.storage.DiskBlockObjectWriter
-import org.apache.spark.util.collection.PartitionedSerializedPairBuffer._
-
-/**
- * Append-only buffer of key-value pairs, each with a corresponding partition 
ID, that serializes
- * its records upon insert and stores them as raw bytes.
- *
- * We use two data-structures to store the contents. The serialized records 
are stored in a
- * ChainedBuffer that can expand gracefully as records are added. This buffer 
is accompanied by a
- * metadata buffer that stores pointers into the data buffer as well as the 
partition ID of each
- * record. Each entry in the metadata buffer takes up a fixed amount of space.
- *
- * Sorting the collection means swapping entries in the metadata buffer - the 
record buffer need not
- * be modified at all. Storing the partition IDs in the metadata buffer means 
that comparisons can
- * happen without following any pointers, which should minimize cache misses.
- *
- * Currently, only sorting by partition is supported.
- *
- * Each record is laid out inside the the metaBuffer as follows. keyStart, a 
long, is split across
- * two integers:
- *
- *   +-+++-+
- *   | keyStart | keyValLen  | partitionId |
- *   +-+++-+
- *
- * The buffer can support up to `536870911 (2 ^ 29 - 1)` records.
- *
- * @param metaInitialRecords The initial number of entries in the metadata 
buffer.
- * @param kvBlockSize The size of each byte buffer in the ChainedBuffer used 
to store the records.
- * @param serializerInstance the serializer used for serializing inserted 
records.
- */
-private[spark] class PartitionedSerializedPairBuffer[K, V](
-metaInitialRecords: Int,
-kvBlockSize: Int,
-serializerInstance: SerializerInstance)
-  extends WritablePartitionedPairCollection[K, V] with SizeTracker {
-
-  if (serializerInstance.isInstanceOf[JavaSerializerInstance]) {
-throw new IllegalArgumentException("PartitionedSerializedPairBuffer does 
not support" +
-  " Java-serialized objects.")
-  }
-
-  require(metaInitialRecords <= MAXIMUM_RECORDS,
-s"Can't make capacity bigger than ${MAXIMUM_RECORDS} records")
-  private var metaBuffer = IntBuffer.allocate(metaInitialRecords * RECORD_SIZE)
-
-  private val kvBuffer: ChainedBuffer = new ChainedBuffer(kvBlockSize)
-  private val kvOutputStream = new ChainedBufferOutputStream(kvBuffer)
-  private val kvSerializationStream = 
serializerInstance.serializeStream(kvOutputStream)
-
-  def insert(partition: Int, key: K, value: V): Unit = {
-if (metaBuffer.position == metaBuffer.capacity) {
-  growMetaBuffer()
-}
-
-val keyStart = kvBuffer.size
-kvSerializationStream.writeKey[Any](key)
-kvSerializationStream.writeValue[Any](value)
-kvSerializationStream.flush()
-val keyValLen = (kvBuffer.size - keyStart).toInt
-
-// keyStart, a long, gets split across two ints
-metaBuffer.put(keyStart.toInt)
-metaBuffer.put((keyStart >> 32).toInt)
-metaBuffer.put(keyValLen)
-metaBuffer.put(partition)
-  }
-
-  /** Double the size of the array because we've reached capacity */
-  private def growMetaBuffer(): Unit = {
-if (metaBuffer.capacity >= MAXIMUM_META_BUFFER_CAPACITY) {
-  throw new IllegalStateException(s"Can't insert more than 

[1/4] spark git commit: [SPARK-10708] Consolidate sort shuffle implementations

2015-10-22 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 94e2064fa -> f6d06adf0


http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 341f56d..b92a302 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -33,7 +33,8 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark._
 import org.apache.spark.executor.{TaskMetrics, ShuffleWriteMetrics}
-import org.apache.spark.serializer.{SerializerInstance, Serializer, 
JavaSerializer}
+import org.apache.spark.shuffle.IndexShuffleBlockResolver
+import org.apache.spark.serializer.{JavaSerializer, SerializerInstance}
 import org.apache.spark.storage._
 import org.apache.spark.util.Utils
 
@@ -42,25 +43,31 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
   @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = 
_
   @Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: 
DiskBlockManager = _
   @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _
+  @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: 
IndexShuffleBlockResolver = _
+  @Mock(answer = RETURNS_SMART_NULLS) private var dependency: 
ShuffleDependency[Int, Int, Int] = _
 
   private var taskMetrics: TaskMetrics = _
-  private var shuffleWriteMetrics: ShuffleWriteMetrics = _
   private var tempDir: File = _
   private var outputFile: File = _
   private val conf: SparkConf = new SparkConf(loadDefaults = false)
   private val temporaryFilesCreated: mutable.Buffer[File] = new 
ArrayBuffer[File]()
   private val blockIdToFileMap: mutable.Map[BlockId, File] = new 
mutable.HashMap[BlockId, File]
-  private val shuffleBlockId: ShuffleBlockId = new ShuffleBlockId(0, 0, 0)
-  private val serializer: Serializer = new JavaSerializer(conf)
+  private var shuffleHandle: BypassMergeSortShuffleHandle[Int, Int] = _
 
   override def beforeEach(): Unit = {
 tempDir = Utils.createTempDir()
 outputFile = File.createTempFile("shuffle", null, tempDir)
-shuffleWriteMetrics = new ShuffleWriteMetrics
 taskMetrics = new TaskMetrics
-taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
 MockitoAnnotations.initMocks(this)
+shuffleHandle = new BypassMergeSortShuffleHandle[Int, Int](
+  shuffleId = 0,
+  numMaps = 2,
+  dependency = dependency
+)
+when(dependency.partitioner).thenReturn(new HashPartitioner(7))
+when(dependency.serializer).thenReturn(Some(new JavaSerializer(conf)))
 when(taskContext.taskMetrics()).thenReturn(taskMetrics)
+when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile)
 when(blockManager.diskBlockManager).thenReturn(diskBlockManager)
 when(blockManager.getDiskWriter(
   any[BlockId],
@@ -107,18 +114,20 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
 
   test("write empty iterator") {
 val writer = new BypassMergeSortShuffleWriter[Int, Int](
-  new SparkConf(loadDefaults = false),
   blockManager,
-  new HashPartitioner(7),
-  shuffleWriteMetrics,
-  serializer
+  blockResolver,
+  shuffleHandle,
+  0, // MapId
+  taskContext,
+  conf
 )
-writer.insertAll(Iterator.empty)
-val partitionLengths = writer.writePartitionedFile(shuffleBlockId, 
taskContext, outputFile)
-assert(partitionLengths.sum === 0)
+writer.write(Iterator.empty)
+writer.stop( /* success = */ true)
+assert(writer.getPartitionLengths.sum === 0)
 assert(outputFile.exists())
 assert(outputFile.length() === 0)
 assert(temporaryFilesCreated.isEmpty)
+val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
 assert(shuffleWriteMetrics.shuffleBytesWritten === 0)
 assert(shuffleWriteMetrics.shuffleRecordsWritten === 0)
 assert(taskMetrics.diskBytesSpilled === 0)
@@ -129,17 +138,19 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
 def records: Iterator[(Int, Int)] =
   Iterator((1, 1), (5, 5)) ++ (0 until 10).iterator.map(x => (2, 2))
 val writer = new BypassMergeSortShuffleWriter[Int, Int](
-  new SparkConf(loadDefaults = false),
   blockManager,
-  new HashPartitioner(7),
-  shuffleWriteMetrics,
-  serializer
+  blockResolver,
+  shuffleHandle,
+  0, // MapId
+  taskContext,
+  conf
 )
-writer.insertAll(records)
+writer.write(records)
+writer.stop( /* success = */ true)