Repository: spark
Updated Branches:
  refs/heads/master 257cde7c3 -> a95043b17


http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index ffe388c..e916e68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
 import scala.collection.mutable.HashSet
 
-import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext}
+import org.apache.spark.{AccumulatorParam, Accumulator}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.SparkContext._
 import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row}
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
 import org.apache.spark.sql.types._
@@ -43,7 +45,7 @@ package object debug {
    * Augments [[SQLContext]] with debug methods.
    */
   implicit class DebugSQLContext(sqlContext: SQLContext) {
-    def debug() = {
+    def debug(): Unit = {
       sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
     }
   }
@@ -88,7 +90,7 @@ package object debug {
   }
 
   private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
-    def output = child.output
+    def output: Seq[Attribute] = child.output
 
     implicit object SetAccumulatorParam extends 
AccumulatorParam[HashSet[String]] {
       def zero(initialValue: HashSet[String]): HashSet[String] = {
@@ -109,10 +111,10 @@ package object debug {
      */
     case class ColumnMetrics(
         elementTypes: Accumulator[HashSet[String]] = 
sparkContext.accumulator(HashSet.empty))
-    val tupleCount = sparkContext.accumulator[Int](0)
+    val tupleCount: Accumulator[Int] = sparkContext.accumulator[Int](0)
 
-    val numColumns = child.output.size
-    val columnStats = Array.fill(child.output.size)(new ColumnMetrics())
+    val numColumns: Int = child.output.size
+    val columnStats: Array[ColumnMetrics] = Array.fill(child.output.size)(new 
ColumnMetrics())
 
     def dumpStats(): Unit = {
       println(s"== ${child.simpleString} ==")
@@ -123,11 +125,11 @@ package object debug {
       }
     }
 
-    def execute() = {
+    def execute(): RDD[Row] = {
       child.execute().mapPartitions { iter =>
         new Iterator[Row] {
-          def hasNext = iter.hasNext
-          def next() = {
+          def hasNext: Boolean = iter.hasNext
+          def next(): Row = {
             val currentRow = iter.next()
             tupleCount += 1
             var i = 0
@@ -180,18 +182,18 @@ package object debug {
   private[sql] case class TypeCheck(child: SparkPlan) extends SparkPlan {
     import TypeCheck._
 
-    override def nodeName  = ""
+    override def nodeName: String = ""
 
     /* Only required when defining this class in a REPL.
     override def makeCopy(args: Array[Object]): this.type =
       TypeCheck(args(0).asInstanceOf[SparkPlan]).asInstanceOf[this.type]
     */
 
-    def output = child.output
+    def output: Seq[Attribute] = child.output
 
-    def children = child :: Nil
+    def children: List[SparkPlan] = child :: Nil
 
-    def execute() = {
+    def execute(): RDD[Row] = {
       child.execute().map { row =>
         try typeCheck(row, child.schema) catch {
           case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 2dd22c0..926f5e6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -17,13 +17,15 @@
 
 package org.apache.spark.sql.execution.joins
 
+import org.apache.spark.rdd.RDD
+
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.concurrent.ExecutionContext.Implicits.global
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.expressions.{Row, Expression}
-import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
Partitioning, UnspecifiedDistribution}
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 
 /**
@@ -42,7 +44,7 @@ case class BroadcastHashJoin(
     right: SparkPlan)
   extends BinaryNode with HashJoin {
 
-  val timeout = {
+  val timeout: Duration = {
     val timeoutValue = sqlContext.conf.broadcastTimeout
     if (timeoutValue < 0) {
       Duration.Inf
@@ -53,7 +55,7 @@ case class BroadcastHashJoin(
 
   override def outputPartitioning: Partitioning = 
streamedPlan.outputPartitioning
 
-  override def requiredChildDistribution =
+  override def requiredChildDistribution: Seq[Distribution] =
     UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
 
   @transient
@@ -64,7 +66,7 @@ case class BroadcastHashJoin(
     sparkContext.broadcast(hashed)
   }
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     val broadcastRelation = Await.result(broadcastFuture, timeout)
 
     streamedPlan.execute().mapPartitions { streamedIter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index 2ab064f..3ef1e0d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.{Expression, Row}
-import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 
 /**
@@ -34,11 +34,11 @@ case class BroadcastLeftSemiJoinHash(
     left: SparkPlan,
     right: SparkPlan) extends BinaryNode with HashJoin {
 
-  override val buildSide = BuildRight
+  override val buildSide: BuildSide = BuildRight
 
-  override def output = left.output
+  override def output: Seq[Attribute] = left.output
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator
     val hashSet = new java.util.HashSet[Row]()
     var currentRow: Row = null

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 36aad13..83b1a83 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, 
RightOuter}
@@ -44,7 +45,7 @@ case class BroadcastNestedLoopJoin(
 
   override def outputPartitioning: Partitioning = streamed.outputPartitioning
 
-  override def output = {
+  override def output: Seq[Attribute] = {
     joinType match {
       case LeftOuter =>
         left.output ++ right.output.map(_.withNullability(true))
@@ -63,7 +64,7 @@ case class BroadcastNestedLoopJoin(
         .map(c => BindReferences.bindReference(c, left.output ++ right.output))
         .getOrElse(Literal(true)))
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     val broadcastedRelation =
       
sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 76c14c0..1cbc983 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 
 /**
@@ -26,9 +28,9 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
  */
 @DeveloperApi
 case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends 
BinaryNode {
-  override def output = left.output ++ right.output
+  override def output: Seq[Attribute] = left.output ++ right.output
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     val leftResults = left.execute().map(_.copy())
     val rightResults = right.execute().map(_.copy())
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 4012d75..851de16 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -41,7 +41,7 @@ trait HashJoin {
     case BuildRight => (rightKeys, leftKeys)
   }
 
-  override def output = left.output ++ right.output
+  override def output: Seq[Attribute] = left.output ++ right.output
 
   @transient protected lazy val buildSideKeyGenerator: Projection =
     newProjection(buildKeys, buildPlan.output)
@@ -65,7 +65,7 @@ trait HashJoin {
         (currentMatchPosition != -1 && currentMatchPosition < 
currentHashMatches.size) ||
           (streamIter.hasNext && fetchNext())
 
-      override final def next() = {
+      override final def next(): Row = {
         val ret = buildSide match {
           case BuildRight => joinRow(currentStreamedRow, 
currentHashMatches(currentMatchPosition))
           case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), 
currentStreamedRow)

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index 59ef904..a396c0f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.joins
 
 import java.util.{HashMap => JavaHashMap}
 
+import org.apache.spark.rdd.RDD
+
 import scala.collection.JavaConversions._
 
 import org.apache.spark.annotation.DeveloperApi
@@ -49,10 +51,10 @@ case class HashOuterJoin(
     case x => throw new Exception(s"HashOuterJoin should not take $x as the 
JoinType")
   }
 
-  override def requiredChildDistribution =
+  override def requiredChildDistribution: Seq[ClusteredDistribution] =
     ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
-  override def output = {
+  override def output: Seq[Attribute] = {
     joinType match {
       case LeftOuter =>
         left.output ++ right.output.map(_.withNullability(true))
@@ -78,12 +80,12 @@ case class HashOuterJoin(
 
   private[this] def leftOuterIterator(
       key: Row, joinedRow: JoinedRow, rightIter: Iterable[Row]): Iterator[Row] 
= {
-    val ret: Iterable[Row] = (
+    val ret: Iterable[Row] = {
       if (!key.anyNull) {
         val temp = rightIter.collect {
-          case r if (boundCondition(joinedRow.withRight(r))) => joinedRow.copy
+          case r if boundCondition(joinedRow.withRight(r)) => joinedRow.copy()
         }
-        if (temp.size  == 0) {
+        if (temp.size == 0) {
           joinedRow.withRight(rightNullRow).copy :: Nil
         } else {
           temp
@@ -91,19 +93,19 @@ case class HashOuterJoin(
       } else {
         joinedRow.withRight(rightNullRow).copy :: Nil
       }
-    )
+    }
     ret.iterator
   }
 
   private[this] def rightOuterIterator(
       key: Row, leftIter: Iterable[Row], joinedRow: JoinedRow): Iterator[Row] 
= {
 
-    val ret: Iterable[Row] = (
+    val ret: Iterable[Row] = {
       if (!key.anyNull) {
         val temp = leftIter.collect {
-          case l if (boundCondition(joinedRow.withLeft(l))) => joinedRow.copy
+          case l if boundCondition(joinedRow.withLeft(l)) => joinedRow.copy
         }
-        if (temp.size  == 0) {
+        if (temp.size == 0) {
           joinedRow.withLeft(leftNullRow).copy :: Nil
         } else {
           temp
@@ -111,7 +113,7 @@ case class HashOuterJoin(
       } else {
         joinedRow.withLeft(leftNullRow).copy :: Nil
       }
-    )
+    }
     ret.iterator
   }
 
@@ -130,12 +132,12 @@ case class HashOuterJoin(
           // 1. For those matched (satisfy the join condition) records with 
both sides filled,
           //    append them directly
 
-          case (r, idx) if (boundCondition(joinedRow.withRight(r)))=> {
+          case (r, idx) if boundCondition(joinedRow.withRight(r)) =>
             matched = true
             // if the row satisfy the join condition, add its index into the 
matched set
             rightMatchedSet.add(idx)
-            joinedRow.copy
-          }
+            joinedRow.copy()
+
         } ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
           // 2. For those unmatched records in left, append additional records 
with empty right.
 
@@ -143,22 +145,21 @@ case class HashOuterJoin(
           // as we don't know whether we need to append it until finish 
iterating all
           // of the records in right side.
           // If we didn't get any proper row, then append a single row with 
empty right.
-          joinedRow.withRight(rightNullRow).copy
+          joinedRow.withRight(rightNullRow).copy()
         })
       } ++ rightIter.zipWithIndex.collect {
         // 3. For those unmatched records in right, append additional records 
with empty left.
 
         // Re-visiting the records in right, and append additional row with 
empty left, if its not
         // in the matched set.
-        case (r, idx) if (!rightMatchedSet.contains(idx)) => {
-          joinedRow(leftNullRow, r).copy
-        }
+        case (r, idx) if !rightMatchedSet.contains(idx) =>
+          joinedRow(leftNullRow, r).copy()
       }
     } else {
       leftIter.iterator.map[Row] { l =>
-        joinedRow(l, rightNullRow).copy
+        joinedRow(l, rightNullRow).copy()
       } ++ rightIter.iterator.map[Row] { r =>
-        joinedRow(leftNullRow, r).copy
+        joinedRow(leftNullRow, r).copy()
       }
     }
   }
@@ -182,13 +183,13 @@ case class HashOuterJoin(
     hashTable
   }
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     val joinedRow = new JoinedRow()
     left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
       // TODO this probably can be replaced by external sort (sort merged 
join?)
 
       joinType match {
-        case LeftOuter => {
+        case LeftOuter =>
           val rightHashTable = buildHashTable(rightIter, 
newProjection(rightKeys, right.output))
           val keyGenerator = newProjection(leftKeys, left.output)
           leftIter.flatMap( currentRow => {
@@ -196,8 +197,8 @@ case class HashOuterJoin(
             joinedRow.withLeft(currentRow)
             leftOuterIterator(rowKey, joinedRow, 
rightHashTable.getOrElse(rowKey, EMPTY_LIST))
           })
-        }
-        case RightOuter => {
+
+        case RightOuter =>
           val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, 
left.output))
           val keyGenerator = newProjection(rightKeys, right.output)
           rightIter.flatMap ( currentRow => {
@@ -205,8 +206,8 @@ case class HashOuterJoin(
             joinedRow.withRight(currentRow)
             rightOuterIterator(rowKey, leftHashTable.getOrElse(rowKey, 
EMPTY_LIST), joinedRow)
           })
-        }
-        case FullOuter => {
+
+        case FullOuter =>
           val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, 
left.output))
           val rightHashTable = buildHashTable(rightIter, 
newProjection(rightKeys, right.output))
           (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { 
key =>
@@ -214,7 +215,7 @@ case class HashOuterJoin(
               leftHashTable.getOrElse(key, EMPTY_LIST),
               rightHashTable.getOrElse(key, EMPTY_LIST), joinedRow)
           }
-        }
+
         case x => throw new Exception(s"HashOuterJoin should not take $x as 
the JoinType")
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 38b8993..2fa1cf5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -38,7 +38,7 @@ private[joins] sealed trait HashedRelation {
 private[joins] final class GeneralHashedRelation(hashTable: JavaHashMap[Row, 
CompactBuffer[Row]])
   extends HashedRelation with Serializable {
 
-  override def get(key: Row) = hashTable.get(key)
+  override def get(key: Row): CompactBuffer[Row] = hashTable.get(key)
 }
 
 
@@ -49,7 +49,7 @@ private[joins] final class GeneralHashedRelation(hashTable: 
JavaHashMap[Row, Com
 private[joins] final class UniqueKeyHashedRelation(hashTable: JavaHashMap[Row, 
Row])
   extends HashedRelation with Serializable {
 
-  override def get(key: Row) = {
+  override def get(key: Row): CompactBuffer[Row] = {
     val v = hashTable.get(key)
     if (v eq null) null else CompactBuffer(v)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index 60003d1..1fa7e7b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
@@ -35,12 +36,13 @@ case class LeftSemiJoinBNL(
 
   override def outputPartitioning: Partitioning = streamed.outputPartitioning
 
-  override def output = left.output
+  override def output: Seq[Attribute] = left.output
 
   /** The Streamed Relation */
-  override def left = streamed
+  override def left: SparkPlan = streamed
+
   /** The Broadcast relation */
-  override def right = broadcast
+  override def right: SparkPlan = broadcast
 
   @transient private lazy val boundCondition =
     InterpretedPredicate(
@@ -48,7 +50,7 @@ case class LeftSemiJoinBNL(
         .map(c => BindReferences.bindReference(c, left.output ++ right.output))
         .getOrElse(Literal(true)))
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     val broadcastedRelation =
       
sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index ea7babf..a04f2a6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.{Expression, Row}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
 import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 
@@ -34,14 +35,14 @@ case class LeftSemiJoinHash(
     left: SparkPlan,
     right: SparkPlan) extends BinaryNode with HashJoin {
 
-  override val buildSide = BuildRight
+  override val buildSide: BuildSide = BuildRight
 
-  override def requiredChildDistribution =
+  override def requiredChildDistribution: Seq[ClusteredDistribution] =
     ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
-  override def output = left.output
+  override def output: Seq[Attribute] = left.output
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, 
streamIter) =>
       val hashSet = new java.util.HashSet[Row]()
       var currentRow: Row = null

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index 418c1c2..a6cd833 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Partitioning}
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
@@ -38,10 +40,10 @@ case class ShuffledHashJoin(
 
   override def outputPartitioning: Partitioning = left.outputPartitioning
 
-  override def requiredChildDistribution =
+  override def requiredChildDistribution: Seq[ClusteredDistribution] =
     ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, 
streamIter) =>
       val hashed = HashedRelation(buildIter, buildSideKeyGenerator)
       hashJoin(streamIter, hashed)

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 33632b8..5b308d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution
 
 import java.util.{List => JList, Map => JMap}
 
+import org.apache.spark.rdd.RDD
+
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
@@ -48,11 +50,13 @@ private[spark] case class PythonUDF(
     dataType: DataType,
     children: Seq[Expression]) extends Expression with SparkLogging {
 
-  override def toString = s"PythonUDF#$name(${children.mkString(",")})"
+  override def toString: String = s"PythonUDF#$name(${children.mkString(",")})"
 
   def nullable: Boolean = true
 
-  override def eval(input: Row) = sys.error("PythonUDFs can not be directly 
evaluated.")
+  override def eval(input: Row): PythonUDF.this.EvaluatedType = {
+    sys.error("PythonUDFs can not be directly evaluated.")
+  }
 }
 
 /**
@@ -63,7 +67,7 @@ private[spark] case class PythonUDF(
  * multiple child operators.
  */
 private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan) = plan transform {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     // Skip EvaluatePython nodes.
     case p: EvaluatePython => p
 
@@ -107,7 +111,7 @@ private[spark] object ExtractPythonUdfs extends 
Rule[LogicalPlan] {
 }
 
 object EvaluatePython {
-  def apply(udf: PythonUDF, child: LogicalPlan) =
+  def apply(udf: PythonUDF, child: LogicalPlan): EvaluatePython =
     new EvaluatePython(udf, child, AttributeReference("pythonUDF", 
udf.dataType)())
 
   /**
@@ -205,10 +209,10 @@ case class EvaluatePython(
     resultAttribute: AttributeReference)
   extends logical.UnaryNode {
 
-  def output = child.output :+ resultAttribute
+  def output: Seq[Attribute] = child.output :+ resultAttribute
 
   // References should not include the produced attribute.
-  override def references = udf.references
+  override def references: AttributeSet = udf.references
 }
 
 /**
@@ -219,9 +223,10 @@ case class EvaluatePython(
 @DeveloperApi
 case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], 
child: SparkPlan)
   extends SparkPlan {
-  def children = child :: Nil
 
-  def execute() = {
+  def children: Seq[SparkPlan] = child :: Nil
+
+  def execute(): RDD[Row] = {
     // TODO: Clean up after ourselves?
     val childResults = child.execute().map(_.copy()).cache()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index 87304ce..3266b97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -306,7 +306,8 @@ private[sql] class JDBCRDD(
   /**
    * Runs the SQL query against the JDBC driver.
    */
-  override def compute(thePart: Partition, context: TaskContext) = new 
Iterator[Row] {
+  override def compute(thePart: Partition, context: TaskContext): 
Iterator[Row] = new Iterator[Row]
+  {
     var closed = false
     var finished = false
     var gotNext = false

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
index 1778d39..df687e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.sql.jdbc
 
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.types.StructType
+
 import scala.collection.mutable.ArrayBuffer
 import java.sql.DriverManager
 
@@ -122,9 +126,9 @@ private[sql] case class JDBCRelation(
   extends BaseRelation
   with PrunedFilteredScan {
 
-  override val schema = JDBCRDD.resolveTable(url, table)
+  override val schema: StructType = JDBCRDD.resolveTable(url, table)
 
-  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]) = {
+  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
     val driver: String = DriverManager.getDriver(url).getClass.getCanonicalName
     JDBCRDD.scanTable(
       sqlContext.sparkContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index b645199..b1e363d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.json
 import java.io.IOException
 
 import org.apache.hadoop.fs.Path
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Row
 
 import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
 import org.apache.spark.sql.sources._
@@ -104,10 +106,10 @@ private[sql] case class JSONRelation(
         samplingRatio,
         sqlContext.conf.columnNameOfCorruptRecord)))
 
-  override def buildScan() =
+  override def buildScan(): RDD[Row] =
     JsonRDD.jsonStringToRow(baseRDD, schema, 
sqlContext.conf.columnNameOfCorruptRecord)
 
-  override def insert(data: DataFrame, overwrite: Boolean) = {
+  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
     val filesystemPath = new Path(path)
     val fs = 
filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 7d62f37..f898e4b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -488,7 +488,7 @@ private[parquet] object CatalystTimestampConverter {
   // Also we use NanoTime and Int96Values from parquet-examples.
   // We utilize jodd to convert between NanoTime and Timestamp
   val parquetTsCalendar = new ThreadLocal[Calendar]
-  def getCalendar = {
+  def getCalendar: Calendar = {
     // this is a cache for the calendar instance.
     if (parquetTsCalendar.get == null) {
       parquetTsCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT")))

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index fd161ba..fcb9513 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -71,16 +71,22 @@ private[sql] case class ParquetRelation(
       sqlContext.conf.isParquetINT96AsTimestamp)
   lazy val attributeMap = AttributeMap(output.map(o => o -> o))
 
-  override def newInstance() = ParquetRelation(path, conf, 
sqlContext).asInstanceOf[this.type]
+  override def newInstance(): this.type = {
+    ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
+  }
 
   // Equals must also take into account the output attributes so that we can 
distinguish between
   // different instances of the same relation,
-  override def equals(other: Any) = other match {
+  override def equals(other: Any): Boolean = other match {
     case p: ParquetRelation =>
       p.path == path && p.output == output
     case _ => false
   }
 
+  override def hashCode: Int = {
+    com.google.common.base.Objects.hashCode(path, output)
+  }
+
   // TODO: Use data from the footers.
   override lazy val statistics = Statistics(sizeInBytes = 
sqlContext.conf.defaultSizeInBytes)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 62813a9..5130d8a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -152,8 +152,8 @@ private[sql] case class ParquetTableScan(
 
         if (primitiveRow) {
           new Iterator[Row] {
-            def hasNext = iter.hasNext
-            def next() = {
+            def hasNext: Boolean = iter.hasNext
+            def next(): Row = {
               // We are using CatalystPrimitiveRowConverter and it returns a 
SpecificMutableRow.
               val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
 
@@ -171,8 +171,8 @@ private[sql] case class ParquetTableScan(
           // Create a mutable row since we need to fill in values from 
partition columns.
           val mutableRow = new GenericMutableRow(outputSize)
           new Iterator[Row] {
-            def hasNext = iter.hasNext
-            def next() = {
+            def hasNext: Boolean = iter.hasNext
+            def next(): Row = {
               // We are using CatalystGroupConverter and it returns a 
GenericRow.
               // Since GenericRow is not mutable, we just cast it to a Row.
               val row = iter.next()._2.asInstanceOf[Row]
@@ -255,7 +255,7 @@ private[sql] case class InsertIntoParquetTable(
   /**
    * Inserts all rows into the Parquet file.
    */
-  override def execute() = {
+  override def execute(): RDD[Row] = {
     // TODO: currently we do not check whether the "schema"s are compatible
     // That means if one first creates a table and then INSERTs data with
     // and incompatible schema the execution will fail. It would be nice
@@ -302,7 +302,7 @@ private[sql] case class InsertIntoParquetTable(
     childRdd
   }
 
-  override def output = child.output
+  override def output: Seq[Attribute] = child.output
 
   /**
    * Stores the given Row RDD as a Hadoop file.

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index c38b6e8..10b8876 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -181,7 +181,7 @@ private[sql] case class ParquetRelation2(
   private val defaultPartitionName = parameters.getOrElse(
     ParquetRelation2.DEFAULT_PARTITION_NAME, "__HIVE_DEFAULT_PARTITION__")
 
-  override def equals(other: Any) = other match {
+  override def equals(other: Any): Boolean = other match {
     case relation: ParquetRelation2 =>
       // If schema merging is required, we don't compare the actual schemas 
since they may evolve.
       val schemaEquality = if (shouldMergeSchemas) {
@@ -198,6 +198,23 @@ private[sql] case class ParquetRelation2(
     case _ => false
   }
 
+  override def hashCode(): Int = {
+    if (shouldMergeSchemas) {
+      com.google.common.base.Objects.hashCode(
+        shouldMergeSchemas: java.lang.Boolean,
+        paths.toSet,
+        maybeMetastoreSchema,
+        maybePartitionSpec)
+    } else {
+      com.google.common.base.Objects.hashCode(
+        shouldMergeSchemas: java.lang.Boolean,
+        schema,
+        paths.toSet,
+        maybeMetastoreSchema,
+        maybePartitionSpec)
+    }
+  }
+
   private[sql] def sparkContext = sqlContext.sparkContext
 
   private class MetadataCache {
@@ -370,19 +387,19 @@ private[sql] case class ParquetRelation2(
   @transient private val metadataCache = new MetadataCache
   metadataCache.refresh()
 
-  def partitionSpec = metadataCache.partitionSpec
+  def partitionSpec: PartitionSpec = metadataCache.partitionSpec
 
-  def partitionColumns = metadataCache.partitionSpec.partitionColumns
+  def partitionColumns: StructType = 
metadataCache.partitionSpec.partitionColumns
 
-  def partitions = metadataCache.partitionSpec.partitions
+  def partitions: Seq[Partition] = metadataCache.partitionSpec.partitions
 
-  def isPartitioned = partitionColumns.nonEmpty
+  def isPartitioned: Boolean = partitionColumns.nonEmpty
 
   private def partitionKeysIncludedInDataSchema = 
metadataCache.partitionKeysIncludedInParquetSchema
 
   private def parquetSchema = metadataCache.parquetSchema
 
-  override def schema = metadataCache.schema
+  override def schema: StructType = metadataCache.schema
 
   private def isSummaryFile(file: Path): Boolean = {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
@@ -425,8 +442,10 @@ private[sql] case class ParquetRelation2(
       .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
 
     if (isPartitioned) {
-      def percentRead = selectedPartitions.size.toDouble / 
partitions.size.toDouble * 100
-      logInfo(s"Reading $percentRead% of partitions")
+      logInfo {
+        val percentRead = selectedPartitions.size.toDouble / 
partitions.size.toDouble * 100
+        s"Reading $percentRead% of partitions"
+      }
     }
 
     val requiredColumns = output.map(_.name)
@@ -703,7 +722,7 @@ private[sql] object ParquetRelation2 {
   private[parquet] def mergeMetastoreParquetSchema(
       metastoreSchema: StructType,
       parquetSchema: StructType): StructType = {
-    def schemaConflictMessage =
+    def schemaConflictMessage: String =
       s"""Converting Hive Metastore Parquet, but detected conflicting schemas. 
Metastore schema:
          |${metastoreSchema.prettyJson}
          |

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
index e244752..70bcca7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala
@@ -26,7 +26,7 @@ private[parquet] class NanoTime extends Serializable {
   private var julianDay = 0
   private var timeOfDayNanos = 0L
 
-  def set(julianDay: Int, timeOfDayNanos: Long) = {
+  def set(julianDay: Int, timeOfDayNanos: Long): this.type = {
     this.julianDay = julianDay
     this.timeOfDayNanos = timeOfDayNanos
     this
@@ -45,11 +45,11 @@ private[parquet] class NanoTime extends Serializable {
     Binary.fromByteBuffer(buf)
   }
 
-  def writeValue(recordConsumer: RecordConsumer) {
+  def writeValue(recordConsumer: RecordConsumer): Unit = {
     recordConsumer.addBinary(toBinary)
   }
 
-  override def toString =
+  override def toString: String =
     "NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + 
"}"
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala
index 12b59ba..f374abf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala
@@ -30,24 +30,28 @@ private[sql] case class LogicalRelation(relation: 
BaseRelation)
   override val output: Seq[AttributeReference] = relation.schema.toAttributes
 
   // Logical Relations are distinct if they have different output for the sake 
of transformations.
-  override def equals(other: Any) = other match {
+  override def equals(other: Any): Boolean = other match {
     case l @ LogicalRelation(otherRelation) => relation == otherRelation && 
output == l.output
     case  _ => false
   }
 
-  override def sameResult(otherPlan: LogicalPlan) = otherPlan match {
+  override def hashCode: Int = {
+    com.google.common.base.Objects.hashCode(relation, output)
+  }
+
+  override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
     case LogicalRelation(otherRelation) => relation == otherRelation
     case _ => false
   }
 
-  @transient override lazy val statistics = Statistics(
+  @transient override lazy val statistics: Statistics = Statistics(
     sizeInBytes = BigInt(relation.sizeInBytes)
   )
 
   /** Used to lookup original attribute capitalization */
-  val attributeMap = AttributeMap(output.map(o => (o, o)))
+  val attributeMap: AttributeMap[AttributeReference] = 
AttributeMap(output.map(o => (o, o)))
 
-  def newInstance() = LogicalRelation(relation).asInstanceOf[this.type]
+  def newInstance(): this.type = 
LogicalRelation(relation).asInstanceOf[this.type]
 
-  override def simpleString = s"Relation[${output.mkString(",")}] $relation"
+  override def simpleString: String = s"Relation[${output.mkString(",")}] 
$relation"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 0e540da..9bbe06e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -27,7 +27,7 @@ private[sql] case class InsertIntoDataSource(
     overwrite: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext) = {
+  override def run(sqlContext: SQLContext): Seq[Row] = {
     val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
     val data = DataFrame(sqlContext, query)
     // Apply the schema of the existing table to the new data.

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 76754a6..d574066 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -362,7 +362,7 @@ private[sql] case class CreateTableUsingAsSelect(
     mode: SaveMode,
     options: Map[String, String],
     child: LogicalPlan) extends UnaryNode {
-  override def output = Seq.empty[Attribute]
+  override def output: Seq[Attribute] = Seq.empty[Attribute]
   // TODO: Override resolved after we support databaseName.
   // override lazy val resolved = databaseName != None && childrenResolved
 }
@@ -373,7 +373,7 @@ private[sql] case class CreateTempTableUsing(
     provider: String,
     options: Map[String, String]) extends RunnableCommand {
 
-  def run(sqlContext: SQLContext) = {
+  def run(sqlContext: SQLContext): Seq[Row] = {
     val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, 
provider, options)
     sqlContext.registerDataFrameAsTable(
       DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
@@ -388,7 +388,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
     options: Map[String, String],
     query: LogicalPlan) extends RunnableCommand {
 
-  def run(sqlContext: SQLContext) = {
+  def run(sqlContext: SQLContext): Seq[Row] = {
     val df = DataFrame(sqlContext, query)
     val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df)
     sqlContext.registerDataFrameAsTable(

http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index cfa58f1..5a78001 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -53,7 +53,7 @@ private[sql] object PreInsertCastAndRename extends 
Rule[LogicalPlan] {
   def castAndRenameChildOutput(
       insertInto: InsertIntoTable,
       expectedOutput: Seq[Attribute],
-      child: LogicalPlan) = {
+      child: LogicalPlan): InsertIntoTable = {
     val newChildOutput = expectedOutput.zip(child.output).map {
       case (expected, actual) =>
         val needCast = !expected.dataType.sameType(actual.dataType)
@@ -79,7 +79,7 @@ private[sql] object PreInsertCastAndRename extends 
Rule[LogicalPlan] {
  * A rule to do various checks before inserting into or writing to a data 
source table.
  */
 private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan 
=> Unit) {
-  def failAnalysis(msg: String) = { throw new AnalysisException(msg) }
+  def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
 
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to