This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a10608c [SPARK-27680][CORE][SQL][GRAPHX] Remove usage of Traversable a10608c is described below commit a10608cb82907157b1858aa77eac585b1ed37baf Author: Sean Owen <sean.o...@databricks.com> AuthorDate: Tue May 14 09:14:56 2019 -0500 [SPARK-27680][CORE][SQL][GRAPHX] Remove usage of Traversable ## What changes were proposed in this pull request? This removes usage of `Traversable`, which is removed in Scala 2.13. This is mostly an internal change, except for the change in the `SparkConf.setAll` method. See additional comments below. ## How was this patch tested? Existing tests. Closes #24584 from srowen/SPARK-27680. Authored-by: Sean Owen <sean.o...@databricks.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- core/src/main/scala/org/apache/spark/SparkConf.scala | 9 +++++++++ core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- core/src/main/scala/org/apache/spark/util/Distribution.scala | 8 ++++---- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 +- .../scala/org/apache/spark/scheduler/SparkListenerSuite.scala | 2 +- .../org/apache/spark/kafka010/KafkaDelegationTokenTest.scala | 2 +- .../scala/org/apache/spark/graphx/lib/LabelPropagation.scala | 2 +- .../main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala | 2 +- .../org/apache/spark/sql/catalyst/expressions/AttributeSet.scala | 4 ++-- .../org/apache/spark/sql/catalyst/expressions/Expression.scala | 2 +- .../apache/spark/sql/catalyst/expressions/codegen/javaCode.scala | 2 +- .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 8 ++++---- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- 13 files changed, 28 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 913a170..240707e 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -168,6 +168,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria } /** Set multiple parameters together */ + def setAll(settings: Iterable[(String, String)]): SparkConf = { + settings.foreach { case (k, v) => set(k, v) } + this + } + + /** + * Set multiple parameters together + */ + @deprecated("Use setAll(Iterable) instead", "3.0.0") def setAll(settings: Traversable[(String, String)]): SparkConf = { settings.foreach { case (k, v) => set(k, v) } this diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8b74435..9979410 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2556,7 +2556,7 @@ object SparkContext extends Logging { private[spark] val DRIVER_IDENTIFIER = "driver" - private implicit def arrayToArrayWritable[T <: Writable : ClassTag](arr: Traversable[T]) + private implicit def arrayToArrayWritable[T <: Writable : ClassTag](arr: Iterable[T]) : ArrayWritable = { def anyToWritable[U <: Writable](u: U): Writable = u diff --git a/core/src/main/scala/org/apache/spark/util/Distribution.scala b/core/src/main/scala/org/apache/spark/util/Distribution.scala index 950b69f..240dcfb 100644 --- a/core/src/main/scala/org/apache/spark/util/Distribution.scala +++ b/core/src/main/scala/org/apache/spark/util/Distribution.scala @@ -31,7 +31,7 @@ import scala.collection.immutable.IndexedSeq */ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int) { require(startIdx < endIdx) - def this(data: Traversable[Double]) = this(data.toArray, 0, data.size) + def this(data: Iterable[Double]) = this(data.toArray, 0, data.size) java.util.Arrays.sort(data, startIdx, endIdx) val length = endIdx - startIdx @@ -42,7 +42,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va * given from 0 to 1 * @param probabilities */ - def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities) + def getQuantiles(probabilities: Iterable[Double] = defaultProbabilities) : IndexedSeq[Double] = { probabilities.toIndexedSeq.map { p: Double => data(closestIndex(p)) } } @@ -75,7 +75,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va private[spark] object Distribution { - def apply(data: Traversable[Double]): Option[Distribution] = { + def apply(data: Iterable[Double]): Option[Distribution] = { if (data.size > 0) { Some(new Distribution(data)) } else { @@ -83,7 +83,7 @@ private[spark] object Distribution { } } - def showQuantiles(out: PrintStream = System.out, quantiles: Traversable[Double]) { + def showQuantiles(out: PrintStream = System.out, quantiles: Iterable[Double]) { // scalastyle:off println out.println("min\t25%\t50%\t75%\tmax") quantiles.foreach{q => out.print(q + "\t")} diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index d84dd58..bab1d5f 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -309,7 +309,7 @@ private[spark] object JsonProtocol { private lazy val accumulableBlacklist = Set("internal.metrics.updatedBlockStatuses") - def accumulablesToJson(accumulables: Traversable[AccumulableInfo]): JArray = { + def accumulablesToJson(accumulables: Iterable[AccumulableInfo]): JArray = { JArray(accumulables .filterNot(_.name.exists(accumulableBlacklist.contains)) .toList.map(accumulableInfoToJson)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 2940aef..a7869d3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -535,7 +535,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match /** * Assert that the given list of numbers has an average that is greater than zero. */ - private def checkNonZeroAvg(m: Traversable[Long], msg: String) { + private def checkNonZeroAvg(m: Iterable[Long], msg: String) { assert(m.sum / m.size.toDouble > 0.0, msg) } diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala index ac59c61..2675085 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala @@ -83,7 +83,7 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach { UserGroupInformation.getCurrentUser.addCredentials(creds) } - protected def setSparkEnv(settings: Traversable[(String, String)]): Unit = { + protected def setSparkEnv(settings: Iterable[(String, String)]): Unit = { val conf = new SparkConf().setAll(settings) val env = mock(classOf[SparkEnv]) doReturn(conf).when(env).conf diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala index cb3025f..507d21d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala @@ -55,7 +55,7 @@ object LabelPropagation { val count1Val = count1.getOrElse(i, 0L) val count2Val = count2.getOrElse(i, 0L) i -> (count1Val + count2Val) - }(collection.breakOut) // more efficient alternative to [[collection.Traversable.toMap]] + }(collection.breakOut) } def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = { if (message.isEmpty) attr else message.maxBy(_._2)._1 diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala index aff0b93..9edfe58 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala @@ -36,7 +36,7 @@ object ShortestPaths extends Serializable { private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = { (spmap1.keySet ++ spmap2.keySet).map { k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)) - }(collection.breakOut) // more efficient alternative to [[collection.Traversable.toMap]] + }(collection.breakOut) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala index a7e09ee..038ebb2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala @@ -63,7 +63,7 @@ object AttributeSet { * when the transformation was a no-op). */ class AttributeSet private (val baseSet: Set[AttributeEquals]) - extends Traversable[Attribute] with Serializable { + extends Iterable[Attribute] with Serializable { override def hashCode: Int = baseSet.hashCode() @@ -99,7 +99,7 @@ class AttributeSet private (val baseSet: Set[AttributeEquals]) * Returns a new [[AttributeSet]] that does not contain any of the [[Attribute Attributes]] found * in `other`. */ - def --(other: Traversable[NamedExpression]): AttributeSet = { + def --(other: Iterable[NamedExpression]): AttributeSet = { other match { case otherSet: AttributeSet => new AttributeSet(baseSet -- otherSet.baseSet) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 2cd84b5..3529671 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -253,7 +253,7 @@ abstract class Expression extends TreeNode[Expression] { def prettyName: String = nodeName.toLowerCase(Locale.ROOT) protected def flatArguments: Iterator[Any] = productIterator.flatMap { - case t: Traversable[_] => t + case t: Iterable[_] => t case single => single :: Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala index 80999ef..c3b7990 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala @@ -183,7 +183,7 @@ trait Block extends TreeNode[Block] with JavaCode { def doTransform(arg: Any): AnyRef = arg match { case e: ExprValue => transform(e) case Some(value) => Some(doTransform(value)) - case seq: Traversable[_] => seq.map(doTransform) + case seq: Iterable[_] => seq.map(doTransform) case other: AnyRef => other } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 3d4de5c..a6968c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -119,7 +119,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT case m: Map[_, _] => m case d: DataType => d // Avoid unpacking Structs case stream: Stream[_] => stream.map(recursiveTransform).force - case seq: Traversable[_] => seq.map(recursiveTransform) + case seq: Iterable[_] => seq.map(recursiveTransform) case other: AnyRef => other case null => null } @@ -142,16 +142,16 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT /** Returns all of the expressions present in this query plan operator. */ final def expressions: Seq[Expression] = { // Recursively find all expressions from a traversable. - def seqToExpressions(seq: Traversable[Any]): Traversable[Expression] = seq.flatMap { + def seqToExpressions(seq: Iterable[Any]): Iterable[Expression] = seq.flatMap { case e: Expression => e :: Nil - case s: Traversable[_] => seqToExpressions(s) + case s: Iterable[_] => seqToExpressions(s) case other => Nil } productIterator.flatMap { case e: Expression => e :: Nil case s: Some[_] => seqToExpressions(s.toSeq) - case seq: Traversable[_] => seqToExpressions(seq) + case seq: Iterable[_] => seqToExpressions(seq) case other => Nil }.toSeq } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 66342af..84ca066 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -353,7 +353,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { }.view.force // `mapValues` is lazy and we need to force it to materialize case d: DataType => d // Avoid unpacking Structs case args: Stream[_] => args.map(mapChild).force // Force materialization on stream - case args: Traversable[_] => args.map(mapChild) + case args: Iterable[_] => args.map(mapChild) case nonChild: AnyRef => nonChild case null => null } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org