Repository: spark Updated Branches: refs/heads/master 257cde7c3 -> a95043b17
http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index ffe388c..e916e68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Attribute + import scala.collection.mutable.HashSet -import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext} +import org.apache.spark.{AccumulatorParam, Accumulator} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.SparkContext._ import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row} import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.types._ @@ -43,7 +45,7 @@ package object debug { * Augments [[SQLContext]] with debug methods. */ implicit class DebugSQLContext(sqlContext: SQLContext) { - def debug() = { + def debug(): Unit = { sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false") } } @@ -88,7 +90,7 @@ package object debug { } private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode { - def output = child.output + def output: Seq[Attribute] = child.output implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] { def zero(initialValue: HashSet[String]): HashSet[String] = { @@ -109,10 +111,10 @@ package object debug { */ case class ColumnMetrics( elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty)) - val tupleCount = sparkContext.accumulator[Int](0) + val tupleCount: Accumulator[Int] = sparkContext.accumulator[Int](0) - val numColumns = child.output.size - val columnStats = Array.fill(child.output.size)(new ColumnMetrics()) + val numColumns: Int = child.output.size + val columnStats: Array[ColumnMetrics] = Array.fill(child.output.size)(new ColumnMetrics()) def dumpStats(): Unit = { println(s"== ${child.simpleString} ==") @@ -123,11 +125,11 @@ package object debug { } } - def execute() = { + def execute(): RDD[Row] = { child.execute().mapPartitions { iter => new Iterator[Row] { - def hasNext = iter.hasNext - def next() = { + def hasNext: Boolean = iter.hasNext + def next(): Row = { val currentRow = iter.next() tupleCount += 1 var i = 0 @@ -180,18 +182,18 @@ package object debug { private[sql] case class TypeCheck(child: SparkPlan) extends SparkPlan { import TypeCheck._ - override def nodeName = "" + override def nodeName: String = "" /* Only required when defining this class in a REPL. override def makeCopy(args: Array[Object]): this.type = TypeCheck(args(0).asInstanceOf[SparkPlan]).asInstanceOf[this.type] */ - def output = child.output + def output: Seq[Attribute] = child.output - def children = child :: Nil + def children: List[SparkPlan] = child :: Nil - def execute() = { + def execute(): RDD[Row] = { child.execute().map { row => try typeCheck(row, child.schema) catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index 2dd22c0..926f5e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -17,13 +17,15 @@ package org.apache.spark.sql.execution.joins +import org.apache.spark.rdd.RDD + import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.{Row, Expression} -import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution} import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} /** @@ -42,7 +44,7 @@ case class BroadcastHashJoin( right: SparkPlan) extends BinaryNode with HashJoin { - val timeout = { + val timeout: Duration = { val timeoutValue = sqlContext.conf.broadcastTimeout if (timeoutValue < 0) { Duration.Inf @@ -53,7 +55,7 @@ case class BroadcastHashJoin( override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning - override def requiredChildDistribution = + override def requiredChildDistribution: Seq[Distribution] = UnspecifiedDistribution :: UnspecifiedDistribution :: Nil @transient @@ -64,7 +66,7 @@ case class BroadcastHashJoin( sparkContext.broadcast(hashed) } - override def execute() = { + override def execute(): RDD[Row] = { val broadcastRelation = Await.result(broadcastFuture, timeout) streamedPlan.execute().mapPartitions { streamedIter => http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala index 2ab064f..3ef1e0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.expressions.{Expression, Row} -import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} /** @@ -34,11 +34,11 @@ case class BroadcastLeftSemiJoinHash( left: SparkPlan, right: SparkPlan) extends BinaryNode with HashJoin { - override val buildSide = BuildRight + override val buildSide: BuildSide = BuildRight - override def output = left.output + override def output: Seq[Attribute] = left.output - override def execute() = { + override def execute(): RDD[Row] = { val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator val hashSet = new java.util.HashSet[Row]() var currentRow: Row = null http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala index 36aad13..83b1a83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter} @@ -44,7 +45,7 @@ case class BroadcastNestedLoopJoin( override def outputPartitioning: Partitioning = streamed.outputPartitioning - override def output = { + override def output: Seq[Attribute] = { joinType match { case LeftOuter => left.output ++ right.output.map(_.withNullability(true)) @@ -63,7 +64,7 @@ case class BroadcastNestedLoopJoin( .map(c => BindReferences.bindReference(c, left.output ++ right.output)) .getOrElse(Literal(true))) - override def execute() = { + override def execute(): RDD[Row] = { val broadcastedRelation = sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index 76c14c0..1cbc983 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow} import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} /** @@ -26,9 +28,9 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} */ @DeveloperApi case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode { - override def output = left.output ++ right.output + override def output: Seq[Attribute] = left.output ++ right.output - override def execute() = { + override def execute(): RDD[Row] = { val leftResults = left.execute().map(_.copy()) val rightResults = right.execute().map(_.copy()) http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 4012d75..851de16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -41,7 +41,7 @@ trait HashJoin { case BuildRight => (rightKeys, leftKeys) } - override def output = left.output ++ right.output + override def output: Seq[Attribute] = left.output ++ right.output @transient protected lazy val buildSideKeyGenerator: Projection = newProjection(buildKeys, buildPlan.output) @@ -65,7 +65,7 @@ trait HashJoin { (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) || (streamIter.hasNext && fetchNext()) - override final def next() = { + override final def next(): Row = { val ret = buildSide match { case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow) http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala index 59ef904..a396c0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.joins import java.util.{HashMap => JavaHashMap} +import org.apache.spark.rdd.RDD + import scala.collection.JavaConversions._ import org.apache.spark.annotation.DeveloperApi @@ -49,10 +51,10 @@ case class HashOuterJoin( case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType") } - override def requiredChildDistribution = + override def requiredChildDistribution: Seq[ClusteredDistribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def output = { + override def output: Seq[Attribute] = { joinType match { case LeftOuter => left.output ++ right.output.map(_.withNullability(true)) @@ -78,12 +80,12 @@ case class HashOuterJoin( private[this] def leftOuterIterator( key: Row, joinedRow: JoinedRow, rightIter: Iterable[Row]): Iterator[Row] = { - val ret: Iterable[Row] = ( + val ret: Iterable[Row] = { if (!key.anyNull) { val temp = rightIter.collect { - case r if (boundCondition(joinedRow.withRight(r))) => joinedRow.copy + case r if boundCondition(joinedRow.withRight(r)) => joinedRow.copy() } - if (temp.size == 0) { + if (temp.size == 0) { joinedRow.withRight(rightNullRow).copy :: Nil } else { temp @@ -91,19 +93,19 @@ case class HashOuterJoin( } else { joinedRow.withRight(rightNullRow).copy :: Nil } - ) + } ret.iterator } private[this] def rightOuterIterator( key: Row, leftIter: Iterable[Row], joinedRow: JoinedRow): Iterator[Row] = { - val ret: Iterable[Row] = ( + val ret: Iterable[Row] = { if (!key.anyNull) { val temp = leftIter.collect { - case l if (boundCondition(joinedRow.withLeft(l))) => joinedRow.copy + case l if boundCondition(joinedRow.withLeft(l)) => joinedRow.copy } - if (temp.size == 0) { + if (temp.size == 0) { joinedRow.withLeft(leftNullRow).copy :: Nil } else { temp @@ -111,7 +113,7 @@ case class HashOuterJoin( } else { joinedRow.withLeft(leftNullRow).copy :: Nil } - ) + } ret.iterator } @@ -130,12 +132,12 @@ case class HashOuterJoin( // 1. For those matched (satisfy the join condition) records with both sides filled, // append them directly - case (r, idx) if (boundCondition(joinedRow.withRight(r)))=> { + case (r, idx) if boundCondition(joinedRow.withRight(r)) => matched = true // if the row satisfy the join condition, add its index into the matched set rightMatchedSet.add(idx) - joinedRow.copy - } + joinedRow.copy() + } ++ DUMMY_LIST.filter(_ => !matched).map( _ => { // 2. For those unmatched records in left, append additional records with empty right. @@ -143,22 +145,21 @@ case class HashOuterJoin( // as we don't know whether we need to append it until finish iterating all // of the records in right side. // If we didn't get any proper row, then append a single row with empty right. - joinedRow.withRight(rightNullRow).copy + joinedRow.withRight(rightNullRow).copy() }) } ++ rightIter.zipWithIndex.collect { // 3. For those unmatched records in right, append additional records with empty left. // Re-visiting the records in right, and append additional row with empty left, if its not // in the matched set. - case (r, idx) if (!rightMatchedSet.contains(idx)) => { - joinedRow(leftNullRow, r).copy - } + case (r, idx) if !rightMatchedSet.contains(idx) => + joinedRow(leftNullRow, r).copy() } } else { leftIter.iterator.map[Row] { l => - joinedRow(l, rightNullRow).copy + joinedRow(l, rightNullRow).copy() } ++ rightIter.iterator.map[Row] { r => - joinedRow(leftNullRow, r).copy + joinedRow(leftNullRow, r).copy() } } } @@ -182,13 +183,13 @@ case class HashOuterJoin( hashTable } - override def execute() = { + override def execute(): RDD[Row] = { val joinedRow = new JoinedRow() left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => // TODO this probably can be replaced by external sort (sort merged join?) joinType match { - case LeftOuter => { + case LeftOuter => val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output)) val keyGenerator = newProjection(leftKeys, left.output) leftIter.flatMap( currentRow => { @@ -196,8 +197,8 @@ case class HashOuterJoin( joinedRow.withLeft(currentRow) leftOuterIterator(rowKey, joinedRow, rightHashTable.getOrElse(rowKey, EMPTY_LIST)) }) - } - case RightOuter => { + + case RightOuter => val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output)) val keyGenerator = newProjection(rightKeys, right.output) rightIter.flatMap ( currentRow => { @@ -205,8 +206,8 @@ case class HashOuterJoin( joinedRow.withRight(currentRow) rightOuterIterator(rowKey, leftHashTable.getOrElse(rowKey, EMPTY_LIST), joinedRow) }) - } - case FullOuter => { + + case FullOuter => val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output)) val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output)) (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key => @@ -214,7 +215,7 @@ case class HashOuterJoin( leftHashTable.getOrElse(key, EMPTY_LIST), rightHashTable.getOrElse(key, EMPTY_LIST), joinedRow) } - } + case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType") } } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 38b8993..2fa1cf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -38,7 +38,7 @@ private[joins] sealed trait HashedRelation { private[joins] final class GeneralHashedRelation(hashTable: JavaHashMap[Row, CompactBuffer[Row]]) extends HashedRelation with Serializable { - override def get(key: Row) = hashTable.get(key) + override def get(key: Row): CompactBuffer[Row] = hashTable.get(key) } @@ -49,7 +49,7 @@ private[joins] final class GeneralHashedRelation(hashTable: JavaHashMap[Row, Com private[joins] final class UniqueKeyHashedRelation(hashTable: JavaHashMap[Row, Row]) extends HashedRelation with Serializable { - override def get(key: Row) = { + override def get(key: Row): CompactBuffer[Row] = { val v = hashTable.get(key) if (v eq null) null else CompactBuffer(v) } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala index 60003d1..1fa7e7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} @@ -35,12 +36,13 @@ case class LeftSemiJoinBNL( override def outputPartitioning: Partitioning = streamed.outputPartitioning - override def output = left.output + override def output: Seq[Attribute] = left.output /** The Streamed Relation */ - override def left = streamed + override def left: SparkPlan = streamed + /** The Broadcast relation */ - override def right = broadcast + override def right: SparkPlan = broadcast @transient private lazy val boundCondition = InterpretedPredicate( @@ -48,7 +50,7 @@ case class LeftSemiJoinBNL( .map(c => BindReferences.bindReference(c, left.output ++ right.output)) .getOrElse(Literal(true))) - override def execute() = { + override def execute(): RDD[Row] = { val broadcastedRelation = sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala index ea7babf..a04f2a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.expressions.{Expression, Row} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} @@ -34,14 +35,14 @@ case class LeftSemiJoinHash( left: SparkPlan, right: SparkPlan) extends BinaryNode with HashJoin { - override val buildSide = BuildRight + override val buildSide: BuildSide = BuildRight - override def requiredChildDistribution = + override def requiredChildDistribution: Seq[ClusteredDistribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def output = left.output + override def output: Seq[Attribute] = left.output - override def execute() = { + override def execute(): RDD[Row] = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => val hashSet = new java.util.HashSet[Row]() var currentRow: Row = null http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala index 418c1c2..a6cd833 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Partitioning} import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} @@ -38,10 +40,10 @@ case class ShuffledHashJoin( override def outputPartitioning: Partitioning = left.outputPartitioning - override def requiredChildDistribution = + override def requiredChildDistribution: Seq[ClusteredDistribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def execute() = { + override def execute(): RDD[Row] = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => val hashed = HashedRelation(buildIter, buildSideKeyGenerator) hashJoin(streamIter, hashed) http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 33632b8..5b308d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution import java.util.{List => JList, Map => JMap} +import org.apache.spark.rdd.RDD + import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -48,11 +50,13 @@ private[spark] case class PythonUDF( dataType: DataType, children: Seq[Expression]) extends Expression with SparkLogging { - override def toString = s"PythonUDF#$name(${children.mkString(",")})" + override def toString: String = s"PythonUDF#$name(${children.mkString(",")})" def nullable: Boolean = true - override def eval(input: Row) = sys.error("PythonUDFs can not be directly evaluated.") + override def eval(input: Row): PythonUDF.this.EvaluatedType = { + sys.error("PythonUDFs can not be directly evaluated.") + } } /** @@ -63,7 +67,7 @@ private[spark] case class PythonUDF( * multiple child operators. */ private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan) = plan transform { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Skip EvaluatePython nodes. case p: EvaluatePython => p @@ -107,7 +111,7 @@ private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] { } object EvaluatePython { - def apply(udf: PythonUDF, child: LogicalPlan) = + def apply(udf: PythonUDF, child: LogicalPlan): EvaluatePython = new EvaluatePython(udf, child, AttributeReference("pythonUDF", udf.dataType)()) /** @@ -205,10 +209,10 @@ case class EvaluatePython( resultAttribute: AttributeReference) extends logical.UnaryNode { - def output = child.output :+ resultAttribute + def output: Seq[Attribute] = child.output :+ resultAttribute // References should not include the produced attribute. - override def references = udf.references + override def references: AttributeSet = udf.references } /** @@ -219,9 +223,10 @@ case class EvaluatePython( @DeveloperApi case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: SparkPlan) extends SparkPlan { - def children = child :: Nil - def execute() = { + def children: Seq[SparkPlan] = child :: Nil + + def execute(): RDD[Row] = { // TODO: Clean up after ourselves? val childResults = child.execute().map(_.copy()).cache() http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index 87304ce..3266b97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -306,7 +306,8 @@ private[sql] class JDBCRDD( /** * Runs the SQL query against the JDBC driver. */ - override def compute(thePart: Partition, context: TaskContext) = new Iterator[Row] { + override def compute(thePart: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] + { var closed = false var finished = false var gotNext = false http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala index 1778d39..df687e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.jdbc +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.types.StructType + import scala.collection.mutable.ArrayBuffer import java.sql.DriverManager @@ -122,9 +126,9 @@ private[sql] case class JDBCRelation( extends BaseRelation with PrunedFilteredScan { - override val schema = JDBCRDD.resolveTable(url, table) + override val schema: StructType = JDBCRDD.resolveTable(url, table) - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]) = { + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { val driver: String = DriverManager.getDriver(url).getClass.getCanonicalName JDBCRDD.scanTable( sqlContext.sparkContext, http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index b645199..b1e363d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.json import java.io.IOException import org.apache.hadoop.fs.Path +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext} import org.apache.spark.sql.sources._ @@ -104,10 +106,10 @@ private[sql] case class JSONRelation( samplingRatio, sqlContext.conf.columnNameOfCorruptRecord))) - override def buildScan() = + override def buildScan(): RDD[Row] = JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord) - override def insert(data: DataFrame, overwrite: Boolean) = { + override def insert(data: DataFrame, overwrite: Boolean): Unit = { val filesystemPath = new Path(path) val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 7d62f37..f898e4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -488,7 +488,7 @@ private[parquet] object CatalystTimestampConverter { // Also we use NanoTime and Int96Values from parquet-examples. // We utilize jodd to convert between NanoTime and Timestamp val parquetTsCalendar = new ThreadLocal[Calendar] - def getCalendar = { + def getCalendar: Calendar = { // this is a cache for the calendar instance. if (parquetTsCalendar.get == null) { parquetTsCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT"))) http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index fd161ba..fcb9513 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -71,16 +71,22 @@ private[sql] case class ParquetRelation( sqlContext.conf.isParquetINT96AsTimestamp) lazy val attributeMap = AttributeMap(output.map(o => o -> o)) - override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type] + override def newInstance(): this.type = { + ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type] + } // Equals must also take into account the output attributes so that we can distinguish between // different instances of the same relation, - override def equals(other: Any) = other match { + override def equals(other: Any): Boolean = other match { case p: ParquetRelation => p.path == path && p.output == output case _ => false } + override def hashCode: Int = { + com.google.common.base.Objects.hashCode(path, output) + } + // TODO: Use data from the footers. override lazy val statistics = Statistics(sizeInBytes = sqlContext.conf.defaultSizeInBytes) } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 62813a9..5130d8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -152,8 +152,8 @@ private[sql] case class ParquetTableScan( if (primitiveRow) { new Iterator[Row] { - def hasNext = iter.hasNext - def next() = { + def hasNext: Boolean = iter.hasNext + def next(): Row = { // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow. val row = iter.next()._2.asInstanceOf[SpecificMutableRow] @@ -171,8 +171,8 @@ private[sql] case class ParquetTableScan( // Create a mutable row since we need to fill in values from partition columns. val mutableRow = new GenericMutableRow(outputSize) new Iterator[Row] { - def hasNext = iter.hasNext - def next() = { + def hasNext: Boolean = iter.hasNext + def next(): Row = { // We are using CatalystGroupConverter and it returns a GenericRow. // Since GenericRow is not mutable, we just cast it to a Row. val row = iter.next()._2.asInstanceOf[Row] @@ -255,7 +255,7 @@ private[sql] case class InsertIntoParquetTable( /** * Inserts all rows into the Parquet file. */ - override def execute() = { + override def execute(): RDD[Row] = { // TODO: currently we do not check whether the "schema"s are compatible // That means if one first creates a table and then INSERTs data with // and incompatible schema the execution will fail. It would be nice @@ -302,7 +302,7 @@ private[sql] case class InsertIntoParquetTable( childRdd } - override def output = child.output + override def output: Seq[Attribute] = child.output /** * Stores the given Row RDD as a Hadoop file. http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index c38b6e8..10b8876 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -181,7 +181,7 @@ private[sql] case class ParquetRelation2( private val defaultPartitionName = parameters.getOrElse( ParquetRelation2.DEFAULT_PARTITION_NAME, "__HIVE_DEFAULT_PARTITION__") - override def equals(other: Any) = other match { + override def equals(other: Any): Boolean = other match { case relation: ParquetRelation2 => // If schema merging is required, we don't compare the actual schemas since they may evolve. val schemaEquality = if (shouldMergeSchemas) { @@ -198,6 +198,23 @@ private[sql] case class ParquetRelation2( case _ => false } + override def hashCode(): Int = { + if (shouldMergeSchemas) { + com.google.common.base.Objects.hashCode( + shouldMergeSchemas: java.lang.Boolean, + paths.toSet, + maybeMetastoreSchema, + maybePartitionSpec) + } else { + com.google.common.base.Objects.hashCode( + shouldMergeSchemas: java.lang.Boolean, + schema, + paths.toSet, + maybeMetastoreSchema, + maybePartitionSpec) + } + } + private[sql] def sparkContext = sqlContext.sparkContext private class MetadataCache { @@ -370,19 +387,19 @@ private[sql] case class ParquetRelation2( @transient private val metadataCache = new MetadataCache metadataCache.refresh() - def partitionSpec = metadataCache.partitionSpec + def partitionSpec: PartitionSpec = metadataCache.partitionSpec - def partitionColumns = metadataCache.partitionSpec.partitionColumns + def partitionColumns: StructType = metadataCache.partitionSpec.partitionColumns - def partitions = metadataCache.partitionSpec.partitions + def partitions: Seq[Partition] = metadataCache.partitionSpec.partitions - def isPartitioned = partitionColumns.nonEmpty + def isPartitioned: Boolean = partitionColumns.nonEmpty private def partitionKeysIncludedInDataSchema = metadataCache.partitionKeysIncludedInParquetSchema private def parquetSchema = metadataCache.parquetSchema - override def schema = metadataCache.schema + override def schema: StructType = metadataCache.schema private def isSummaryFile(file: Path): Boolean = { file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || @@ -425,8 +442,10 @@ private[sql] case class ParquetRelation2( .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _)) if (isPartitioned) { - def percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100 - logInfo(s"Reading $percentRead% of partitions") + logInfo { + val percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100 + s"Reading $percentRead% of partitions" + } } val requiredColumns = output.map(_.name) @@ -703,7 +722,7 @@ private[sql] object ParquetRelation2 { private[parquet] def mergeMetastoreParquetSchema( metastoreSchema: StructType, parquetSchema: StructType): StructType = { - def schemaConflictMessage = + def schemaConflictMessage: String = s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema: |${metastoreSchema.prettyJson} | http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala index e244752..70bcca7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala @@ -26,7 +26,7 @@ private[parquet] class NanoTime extends Serializable { private var julianDay = 0 private var timeOfDayNanos = 0L - def set(julianDay: Int, timeOfDayNanos: Long) = { + def set(julianDay: Int, timeOfDayNanos: Long): this.type = { this.julianDay = julianDay this.timeOfDayNanos = timeOfDayNanos this @@ -45,11 +45,11 @@ private[parquet] class NanoTime extends Serializable { Binary.fromByteBuffer(buf) } - def writeValue(recordConsumer: RecordConsumer) { + def writeValue(recordConsumer: RecordConsumer): Unit = { recordConsumer.addBinary(toBinary) } - override def toString = + override def toString: String = "NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}" } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala index 12b59ba..f374abf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala @@ -30,24 +30,28 @@ private[sql] case class LogicalRelation(relation: BaseRelation) override val output: Seq[AttributeReference] = relation.schema.toAttributes // Logical Relations are distinct if they have different output for the sake of transformations. - override def equals(other: Any) = other match { + override def equals(other: Any): Boolean = other match { case l @ LogicalRelation(otherRelation) => relation == otherRelation && output == l.output case _ => false } - override def sameResult(otherPlan: LogicalPlan) = otherPlan match { + override def hashCode: Int = { + com.google.common.base.Objects.hashCode(relation, output) + } + + override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match { case LogicalRelation(otherRelation) => relation == otherRelation case _ => false } - @transient override lazy val statistics = Statistics( + @transient override lazy val statistics: Statistics = Statistics( sizeInBytes = BigInt(relation.sizeInBytes) ) /** Used to lookup original attribute capitalization */ - val attributeMap = AttributeMap(output.map(o => (o, o))) + val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o))) - def newInstance() = LogicalRelation(relation).asInstanceOf[this.type] + def newInstance(): this.type = LogicalRelation(relation).asInstanceOf[this.type] - override def simpleString = s"Relation[${output.mkString(",")}] $relation" + override def simpleString: String = s"Relation[${output.mkString(",")}] $relation" } http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 0e540da..9bbe06e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -27,7 +27,7 @@ private[sql] case class InsertIntoDataSource( overwrite: Boolean) extends RunnableCommand { - override def run(sqlContext: SQLContext) = { + override def run(sqlContext: SQLContext): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] val data = DataFrame(sqlContext, query) // Apply the schema of the existing table to the new data. http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 76754a6..d574066 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -362,7 +362,7 @@ private[sql] case class CreateTableUsingAsSelect( mode: SaveMode, options: Map[String, String], child: LogicalPlan) extends UnaryNode { - override def output = Seq.empty[Attribute] + override def output: Seq[Attribute] = Seq.empty[Attribute] // TODO: Override resolved after we support databaseName. // override lazy val resolved = databaseName != None && childrenResolved } @@ -373,7 +373,7 @@ private[sql] case class CreateTempTableUsing( provider: String, options: Map[String, String]) extends RunnableCommand { - def run(sqlContext: SQLContext) = { + def run(sqlContext: SQLContext): Seq[Row] = { val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) sqlContext.registerDataFrameAsTable( DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) @@ -388,7 +388,7 @@ private[sql] case class CreateTempTableUsingAsSelect( options: Map[String, String], query: LogicalPlan) extends RunnableCommand { - def run(sqlContext: SQLContext) = { + def run(sqlContext: SQLContext): Seq[Row] = { val df = DataFrame(sqlContext, query) val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df) sqlContext.registerDataFrameAsTable( http://git-wip-us.apache.org/repos/asf/spark/blob/a95043b1/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala index cfa58f1..5a78001 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala @@ -53,7 +53,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { def castAndRenameChildOutput( insertInto: InsertIntoTable, expectedOutput: Seq[Attribute], - child: LogicalPlan) = { + child: LogicalPlan): InsertIntoTable = { val newChildOutput = expectedOutput.zip(child.output).map { case (expected, actual) => val needCast = !expected.dataType.sameType(actual.dataType) @@ -79,7 +79,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { * A rule to do various checks before inserting into or writing to a data source table. */ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) { - def failAnalysis(msg: String) = { throw new AnalysisException(msg) } + def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } def apply(plan: LogicalPlan): Unit = { plan.foreach { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org